Spark Scala Get Data Back from rdd.foreachPartition

13,518

To create a transformation that uses resources local to the executor (such as a DB or network connection), you should use rdd.mapPartitions. It allows to initialize some code locally to the executor and use those local resources to process the data in the partition.

The code should look like:

 val lastRevs = distinctFileGidsRDD.
        mapPartitions{iter => 
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          iter.map{ element => 
            DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
              .map { resultSet => resultSet.string(1) }.single.apply()
            }
          }
        }
Share:
13,518
codeaperature
Author by

codeaperature

Updated on June 13, 2022

Comments

  • codeaperature
    codeaperature almost 2 years

    I have some code like this:

          println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
          val lastRevs = distinctFileGidsRDD.
            foreachPartition(iter => {
              SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
              while(iter.hasNext) {
                val item = iter.next()
                //println(item(0))
                println("String: "+item(0).toString())
                val jsonStr = DB.readOnly { implicit session =>
                  sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                    map { resultSet => resultSet.string(1) }.single.apply()
                }
                println("\nJSON: "+jsonStr)
              }
            })
          println("\nEND Last Revs Class: "+ lastRevs.getClass)
    

    The code outputs (with heavy edits) something like:

    BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
    String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
    JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
    String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
    JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
    ...
    JSON: None()
    END Last Revs Class: void
    

    QUESTION 1: How can I get the lastRevs value to be in a useful format like the JSON string/null or an option like Some / None?

    QUESTION 2: My preference: IS there another way get at partitions data that an RDD-like format (rather than the iterator format)?

    dstream.foreachRDD { (rdd, time) =>
      rdd.foreachPartition { partitionIterator =>
        val partitionId = TaskContext.get.partitionId()
        val uniqueId = generateUniqueId(time.milliseconds, partitionId)
        // use this uniqueId to transactionally commit the data in partitionIterator
      } 
    }
    

    from http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

    QUESTION 3: Is the method of getting data that I am using a sane method (given I am following the link above)? (Put aside the fact that this is a scalikejdbc system JDBC right now. This is going to be a key, value store of some type other than this prototype.)

  • lisak
    lisak almost 8 years
    you mean that it differs from foreachPartition in that it uses Executor's resources instead of Driver's resources? Ie. code foreachPartition code is executed on Driver whereas mapPartitions on Executor ... right?
  • maasg
    maasg almost 8 years
    @lisak No, Both foreachPartition and mapPartitions will let you run code on the executors. The difference is that foreachPartition only does side-effects (like write to a db), while mapPartitions returns a value. The key of this question is 'how to get data back' hence mapPartitions is the way to go.
  • BdEngineer
    BdEngineer over 5 years
    @maasg I have a code like this ' val company_model_vals_df = enriched_company_model_vals_df.repartition(col("model_id"),c‌​ol("fiscal_quarter")‌​,col("fiscal_year")) company_model_vals_df.foreachPartition( writeAsParquet(dataframe) ///will write as parquet in hdfs //// But how to use foreachPartition here ? )