How to write RDD[String] to parquet file with schema inference?
It could be possible to do this in Spark 1.6.x, as the csv library from Databricks supports a method to transform a RDD[String]
using the csv parser. In Spark version >= 2.0 this support has been merged onto the main project, and this method was removed from the interface. Also many of the methods are private, so it's harder to get around, but maybe it's worth exploring the underlying univocity parsing library
Using Databricks' Spark CSV support on Spark 1.6.1, we can do something like this:
import com.databricks.spark.csv.CsvParser
val sqlContext = new SQLContext(sparkContext)
val parser = new CsvParser().withInferSchema(true)
val rdd = sparkContext.textFile("/home/maasg/playground/data/sample-no-header.csv")
rdd.take(1) // show a sample data
// Array[String] = Array(2000,JOHN,KINGS,50)
val df = parser.csvRdd(sqlContext, rdd)
df.schema() // let's inspect the inferred schema
// org.apache.spark.sql.types.StructType = StructType(StructField(C0,IntegerType,true), StructField(C1,StringType,true), StructField(C2,StringType,true), StructField(C3,IntegerType,true))
df.write.parquet("/tmp/sample.parquet") // write it to parquet
It should be trivial to integrate such code in Spark Streaming, within a foreachRDD{rdd => ...}
call.
Andrea T. Bonanno
Updated on June 13, 2022Comments
-
Andrea T. Bonanno almost 2 years
My
Spark Streaming
job needs to handle aRDD[String]
where String corresponds to a row of acsv
file. I don't know the schema beforehand so I need to infer the schema from the RDD then write its content to aparquet
file. If I was reading acsv
file from disk, I could just load everything into aDataFrame
with schema inference and write it to parquet straight away. In my scenario though, my starting point is aRDD[String]
that I get as a result of a stream.