Spark 2.2.0 - How to write/read DataFrame to DynamoDB
I was following that "Using Spark SQL for ETL" link, and found the same "illegal cyclic reference" exception. The solution for that exception is quite simple (but it cost me 2 days to figure out) as below. The key point is to use map function on the RDD of the dataframe, not the dataframe itself.
val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "<myTableName>")
ddbConf.set("dynamodb.throughput.write.percent", "1.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
val df_ddb = spark.read.option("header","true").parquet("<myInputFile>")
val schema_ddb = df_ddb.dtypes
var ddbInsertFormattedRDD = df_ddb.rdd.map(a => {
val ddbMap = new HashMap[String, AttributeValue]()
for (i <- 0 to schema_ddb.length - 1) {
val value = a.get(i)
if (value != null) {
val att = new AttributeValue()
att.setS(value.toString)
ddbMap.put(schema_ddb(i)._1, att)
}
}
val item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
}
)
ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)
Béatrice Moissinac
Graduate Student in Machine Learning at Oregon State University
Updated on June 12, 2022Comments
-
Béatrice Moissinac almost 2 years
I want my Spark application to read a table from DynamoDB, do stuff, then write the result in DynamoDB.
Read the table into a DataFrame
Right now, I can read the table from DynamoDB into Spark as a
hadoopRDD
and convert it to a DataFrame. However, I had to use a regular expression to extract the value fromAttributeValue
. Is there a better/more elegant way? Couldn't find anything in the AWS API.package main.scala.util import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.rdd.RDD import scala.util.matching.Regex import java.util.HashMap import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.io.Text; import org.apache.hadoop.dynamodb.DynamoDBItemWritable /* Importing DynamoDBInputFormat and DynamoDBOutputFormat */ import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io.LongWritable object Tester { // {S: 298905396168806365,} def extractValue : (String => String) = (aws:String) => { val pat_value = "\\s(.*),".r val matcher = pat_value.findFirstMatchIn(aws) matcher match { case Some(number) => number.group(1).toString case None => "" } } def main(args: Array[String]) { val spark = SparkSession.builder().getOrCreate() val sparkContext = spark.sparkContext import spark.implicits._ // UDF to extract Value from AttributeValue val col_extractValue = udf(extractValue) // Configure connection to DynamoDB var jobConf_add = new JobConf(sparkContext.hadoopConfiguration) jobConf_add.set("dynamodb.input.tableName", "MyTable") jobConf_add.set("dynamodb.output.tableName", "MyTable") jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") // org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)] var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) // Convert HadoopRDD to RDD val rdd_add: RDD[(String, String)] = hadooprdd_add.map { case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString()) } // Convert RDD to DataFrame and extract Values from AttributeValue val df_add = rdd_add.toDF() .withColumn("PIN", col_extractValue($"_1")) .withColumn("Address", col_extractValue($"_2")) .select("PIN","Address") } }
Write the DataFrame to DynamoDB
Many answers in stackoverflow and elsewhere only point to the blog post and the emr-dynamodb-hadoop github. None of those resources actually demonstrate how to write to DynamoDB.
I tried converting my
DataFrame
toRDD[Row]
unsuccessfully.df_add.rdd.saveAsHadoopDataset(jobConf_add)
What are the steps to write this DataFrame to DynamoDB? (Bonus Points if you tell me how to control
overwrite
vsputItem
;)Note:
df_add
has the same schema asMyTable
in DynamoDB.EDIT: I am following the recommendation from this answer which points to this post on Using Spark SQL for ETL:
// Format table to DynamoDB format val output_rdd = df_add.as[(String,String)].rdd.map(a => { var ddbMap = new HashMap[String, AttributeValue]() // Field PIN var PINValue = new AttributeValue() // New AttributeValue PINValue.setS(a._1) // Set value of Attribute as String. First element of tuple ddbMap.put("PIN", PINValue) // Add to HashMap // Field Address var AddValue = new AttributeValue() // New AttributeValue AddValue.setS(a._2) // Set value of Attribute as String ddbMap.put("Address", AddValue) // Add to HashMap var item = new DynamoDBItemWritable() item.setItem(ddbMap) (new Text(""), item) }) output_rdd.saveAsHadoopDataset(jobConf_add)
However, now I am getting
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text
despite following the documentation ... Do you have any suggestion ?EDIT 2: Reading more carefully this post on Using Spark SQL for ETL:
After you have the DataFrame, perform a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and
DynamoDBItemWritable
types.Taking this into account, the code below is exactly what theAWS blog post suggest, except I cast
output_df
as an rdd otherwisesaveAsHadoopDataset
doesn't work. And now, I am gettingException in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
. I am at the end of my rope!// Format table to DynamoDB format val output_df = df_add.map(a => { var ddbMap = new HashMap[String, AttributeValue]() // Field PIN var PINValue = new AttributeValue() // New AttributeValue PINValue.setS(a.get(0).toString()) // Set value of Attribute as String ddbMap.put("PIN", PINValue) // Add to HashMap // Field Address var AddValue = new AttributeValue() // New AttributeValue AddValue.setS(a.get(1).toString()) // Set value of Attribute as String ddbMap.put("Address", AddValue) // Add to HashMap var item = new DynamoDBItemWritable() item.setItem(ddbMap) (new Text(""), item) }) output_df.rdd.saveAsHadoopDataset(jobConf_add)
-
sri hari kali charan Tummala almost 5 yearsgithub.com/kali786516/Spark2StructuredStreaming/blob/master/src/… , full example for any one
-
Jayesh Lalwani almost 4 yearsYour link is a 404