Convert RDD[String] to RDD[Row] to Dataframe Spark Scala

14,797

Solution 1

First split/parse your strings into the fields.

rdd.map( line => parse(line)) where parse is some parsing function. It could be as simple as split but you may want something more robust. This will get you an RDD[Array[String]] or similar.

You can then convert to an RDD[Row] with rdd.map(a => Row.fromSeq(a))

From there you can convert to DataFrame wising sqlContext.createDataFrame(rdd, schema) where rdd is your RDD[Row] and schema is your schema StructType.

Solution 2

In your case simple way :

val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))

How to solve productarity issue if you are using scala 2.10 ?

However, I am not sure how to get it into a dataframe. sc.textFile returns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.

Yes, There are some limitations like productarity but we can overcome... you can do like below example for < versions 2.11 :

prepare a case class which extends Product and overrides methods.

like...

  • productArity():Int: This returns the size of the attributes. In our case, it's 33. So, our implementation looks like this:

  • productElement(n:Int):Any: Given an index, this returns the attribute. As protection, we also have a default case, which throws an IndexOutOfBoundsException exception:

  • canEqual (that:Any):Boolean: This is the last of the three functions, and it serves as a boundary condition when an equality check is being done against class:


Share:
14,797
Defcon
Author by

Defcon

Updated on June 04, 2022

Comments

  • Defcon
    Defcon almost 2 years

    I am reading in a file that has many spaces and need to filter out the space. Afterwards we need to convert it to a dataframe. Example input below.

    2017123 ¦     ¦10¦running¦00000¦111¦-EXAMPLE
    

    My solution to this was the following function which parses out all spaces and trims the file.

    def truncateRDD(fileName : String): RDD[String] = {
        val example = sc.textFile(fileName)
        example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
    }
    

    However, I am not sure how to get it into a dataframe. sc.textFile returns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.

    I was thinking of somehow converting RDD[String] to RDD[Row] so I can use the createDataFrame function.

    val DF = spark.createDataFrame(rowRDD, schema)
    

    Any suggestions on how to do this?

  • Ram Ghadiyaram
    Ram Ghadiyaram over 7 years
    scala 2.11 on wards arity issue is not there. below versions of scala above approach is applicable
  • Nirmal_stack
    Nirmal_stack almost 6 years
    I have an array of nested JSON to parse, how can I convert it into data frame?