How to match Dataframe column names to Scala case class attributes?

18,412

Basically, all the mapping you need to do can be achieved with DataFrame.select(...). (Here, I assume, that no type conversions need to be done.) Given the forward- and backward-mapping as maps, the essential part is

val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe  
val mappedDF = personsDF.select( mapping: _* )

where mapping is an array of Columns with alias.

Example code

object Example {   

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkContext, SparkConf}

  case class Person(name: String, age: Int)

  object Mapping {
    val from = Map("name" -> "a", "age" -> "b")
    val to = Map("a" -> "name", "b" -> "age")
  }

  def main(args: Array[String]) : Unit = {
    // init
    val conf = new SparkConf()
      .setAppName( "Example." )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // create persons
    val persons = Seq(Person("bob", 35), Person("alice", 27))
    val personsRDD = sc.parallelize(persons, 4)
    val personsDF = personsRDD.toDF

    writeParquet( personsDF, "persons.parquet", sc, sqlContext)

    val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
  }

  def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.from

    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray

    val mappedDF = personsDF.select( mapping: _* )
    mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
  }

  def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.to
    val df = sqlContext.read.parquet(path) // this df has columns a and b

    val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
    df.select( mapping: _* )
  }
}

Remark

If you need to convert a dataframe back to an RDD[Person], then

val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = rdd.map { r: Row => 
  Person( r.getAs("person"), r.getAs("age") )
}

Alternatives

Have also a look at How to convert spark SchemaRDD into RDD of my case class?

Share:
18,412
BAR
Author by

BAR

No expertise, just good arguments.

Updated on June 17, 2022

Comments

  • BAR
    BAR almost 2 years

    The column names in this example from spark-sql come from the case class Person.

    case class Person(name: String, age: Int)
    
    val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
    
    // The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
    people.saveAsParquetFile("people.parquet")
    

    https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

    However in many cases the parameter names may be changed. This would cause columns to not be found if the file has not been updated to reflect the change.

    How can I specify an appropriate mapping?

    I am thinking something like:

      val schema = StructType(Seq(
        StructField("name", StringType, nullable = false),
        StructField("age", IntegerType, nullable = false)
      ))
    
    
      val ps: Seq[Person] = ???
    
      val personRDD = sc.parallelize(ps)
    
      // Apply the schema to the RDD.
      val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
    
  • BAR
    BAR over 8 years
    Nice approach. Do you think this would impact performance, or should it not a be factor since this is compiled and optimized once in the internal pipeline?
  • Martin Senne
    Martin Senne over 8 years
    I assume the latter. First, as there is Catalyst optimization / compilation. Second, selects (with alias) don't seem to be expensive operations. Though, would be interested to see some performance measurements ....
  • Asiri Liyana Arachchi
    Asiri Liyana Arachchi about 5 years
    @BAR can we use jave here ? for the example given? In java dataset select method doesn't have capability to take a map?