Spark 2.2.0 - How to write/read DataFrame to DynamoDB

11,824

I was following that "Using Spark SQL for ETL" link, and found the same "illegal cyclic reference" exception. The solution for that exception is quite simple (but it cost me 2 days to figure out) as below. The key point is to use map function on the RDD of the dataframe, not the dataframe itself.

val ddbConf = new JobConf(spark.sparkContext.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "<myTableName>")
ddbConf.set("dynamodb.throughput.write.percent", "1.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")


val df_ddb =  spark.read.option("header","true").parquet("<myInputFile>")
val schema_ddb = df_ddb.dtypes

var ddbInsertFormattedRDD = df_ddb.rdd.map(a => {
    val ddbMap = new HashMap[String, AttributeValue]()

    for (i <- 0 to schema_ddb.length - 1) {
        val value = a.get(i)
        if (value != null) {
            val att = new AttributeValue()
            att.setS(value.toString)
            ddbMap.put(schema_ddb(i)._1, att)
        }
    }

    val item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
}
)

ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)
Share:
11,824
Béatrice Moissinac
Author by

Béatrice Moissinac

Graduate Student in Machine Learning at Oregon State University

Updated on June 12, 2022

Comments

  • Béatrice Moissinac
    Béatrice Moissinac almost 2 years

    I want my Spark application to read a table from DynamoDB, do stuff, then write the result in DynamoDB.

    Read the table into a DataFrame

    Right now, I can read the table from DynamoDB into Spark as a hadoopRDD and convert it to a DataFrame. However, I had to use a regular expression to extract the value from AttributeValue. Is there a better/more elegant way? Couldn't find anything in the AWS API.

    package main.scala.util
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.spark.rdd.RDD
    import scala.util.matching.Regex
    import java.util.HashMap
    
    import com.amazonaws.services.dynamodbv2.model.AttributeValue
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.dynamodb.DynamoDBItemWritable
    /* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
    import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
    import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.io.LongWritable
    
    object Tester {
    
      // {S: 298905396168806365,} 
      def extractValue : (String => String) = (aws:String) => {
        val pat_value = "\\s(.*),".r
    
        val matcher = pat_value.findFirstMatchIn(aws)
                    matcher match {
                    case Some(number) => number.group(1).toString
                    case None => ""
            }
      }
    
    
       def main(args: Array[String]) {
        val spark = SparkSession.builder().getOrCreate()
        val sparkContext = spark.sparkContext
    
          import spark.implicits._
    
          // UDF to extract Value from AttributeValue 
          val col_extractValue = udf(extractValue)
    
      // Configure connection to DynamoDB
      var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
          jobConf_add.set("dynamodb.input.tableName", "MyTable")
          jobConf_add.set("dynamodb.output.tableName", "MyTable")
          jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
          jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    
    
          // org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
          var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    
          // Convert HadoopRDD to RDD
          val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
          case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
          }
    
          // Convert RDD to DataFrame and extract Values from AttributeValue
          val df_add = rdd_add.toDF()
                      .withColumn("PIN", col_extractValue($"_1"))
                      .withColumn("Address", col_extractValue($"_2"))
                      .select("PIN","Address")
       }
    }
    

    Write the DataFrame to DynamoDB

    Many answers in stackoverflow and elsewhere only point to the blog post and the emr-dynamodb-hadoop github. None of those resources actually demonstrate how to write to DynamoDB.

    I tried converting my DataFrame to RDD[Row] unsuccessfully.

    df_add.rdd.saveAsHadoopDataset(jobConf_add)
    

    What are the steps to write this DataFrame to DynamoDB? (Bonus Points if you tell me how to control overwrite vs putItem ;)

    Note: df_add has the same schema as MyTable in DynamoDB.

    EDIT: I am following the recommendation from this answer which points to this post on Using Spark SQL for ETL:

    // Format table to DynamoDB format
      val output_rdd =  df_add.as[(String,String)].rdd.map(a => {
        var ddbMap = new HashMap[String, AttributeValue]()
    
        // Field PIN
        var PINValue = new AttributeValue() // New AttributeValue
        PINValue.setS(a._1)                 // Set value of Attribute as String. First element of tuple
        ddbMap.put("PIN", PINValue)         // Add to HashMap
    
        // Field Address
        var AddValue = new AttributeValue() // New AttributeValue
        AddValue.setS(a._2)                 // Set value of Attribute as String
        ddbMap.put("Address", AddValue)     // Add to HashMap
    
        var item = new DynamoDBItemWritable()
        item.setItem(ddbMap)
    
        (new Text(""), item)
      })             
    
      output_rdd.saveAsHadoopDataset(jobConf_add) 
    

    However, now I am getting java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text despite following the documentation ... Do you have any suggestion ?

    EDIT 2: Reading more carefully this post on Using Spark SQL for ETL:

    After you have the DataFrame, perform a transformation to have an RDD that matches the types that the DynamoDB custom output format knows how to write. The custom output format expects a tuple containing the Text and DynamoDBItemWritable types.

    Taking this into account, the code below is exactly what theAWS blog post suggest, except I cast output_df as an rdd otherwise saveAsHadoopDataset doesn't work. And now, I am getting Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience. I am at the end of my rope!

          // Format table to DynamoDB format
      val output_df =  df_add.map(a => {
        var ddbMap = new HashMap[String, AttributeValue]()
    
        // Field PIN
        var PINValue = new AttributeValue() // New AttributeValue
        PINValue.setS(a.get(0).toString())                 // Set value of Attribute as String
        ddbMap.put("PIN", PINValue)         // Add to HashMap
    
        // Field Address
        var AddValue = new AttributeValue() // New AttributeValue
        AddValue.setS(a.get(1).toString())                 // Set value of Attribute as String
        ddbMap.put("Address", AddValue)     // Add to HashMap
    
        var item = new DynamoDBItemWritable()
        item.setItem(ddbMap)
    
        (new Text(""), item)
      })             
    
      output_df.rdd.saveAsHadoopDataset(jobConf_add)   
    
  • sri hari kali charan Tummala
    sri hari kali charan Tummala almost 5 years
  • Jayesh Lalwani
    Jayesh Lalwani almost 4 years
    Your link is a 404