Spark Scala Get Data Back from rdd.foreachPartition
To create a transformation that uses resources local to the executor (such as a DB or network connection), you should use rdd.mapPartitions
. It allows to initialize some code locally to the executor and use those local resources to process the data in the partition.
The code should look like:
val lastRevs = distinctFileGidsRDD.
mapPartitions{iter =>
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.map{ element =>
DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
.map { resultSet => resultSet.string(1) }.single.apply()
}
}
}
codeaperature
Updated on June 13, 2022Comments
-
codeaperature almost 2 years
I have some code like this:
println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass) val lastRevs = distinctFileGidsRDD. foreachPartition(iter => { SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) while(iter.hasNext) { val item = iter.next() //println(item(0)) println("String: "+item(0).toString()) val jsonStr = DB.readOnly { implicit session => sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar". map { resultSet => resultSet.string(1) }.single.apply() } println("\nJSON: "+jsonStr) } }) println("\nEND Last Revs Class: "+ lastRevs.getClass)
The code outputs (with heavy edits) something like:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... ) String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... ) ... JSON: None() END Last Revs Class: void
QUESTION 1: How can I get the lastRevs value to be in a useful format like the JSON string/null or an option like Some / None?
QUESTION 2: My preference: IS there another way get at partitions data that an RDD-like format (rather than the iterator format)?
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
from http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
QUESTION 3: Is the method of getting data that I am using a sane method (given I am following the link above)? (Put aside the fact that this is a scalikejdbc system JDBC right now. This is going to be a key, value store of some type other than this prototype.)
-
lisak almost 8 yearsyou mean that it differs from
foreachPartition
in that it uses Executor's resources instead of Driver's resources? Ie. codeforeachPartition
code is executed on Driver whereasmapPartitions
on Executor ... right? -
maasg almost 8 years@lisak No, Both
foreachPartition
andmapPartitions
will let you run code on the executors. The difference is thatforeachPartition
only does side-effects (like write to a db), whilemapPartitions
returns a value. The key of this question is 'how to get data back' hencemapPartitions
is the way to go. -
BdEngineer over 5 years@maasg I have a code like this ' val company_model_vals_df = enriched_company_model_vals_df.repartition(col("model_id"),col("fiscal_quarter"),col("fiscal_year")) company_model_vals_df.foreachPartition( writeAsParquet(dataframe) ///will write as parquet in hdfs //// But how to use foreachPartition here ? )