Reading a csv file as a spark dataframe

10,848

@Pooja Nayak, Not sure if this was solved; answering this in the interest of community.

sc: SparkContext
spark: SparkSession
sqlContext: SQLContext

// Read the raw file from localFS as-is.
val rdd_raw = sc.textFile("file:///home/xxxx/sample.csv")

// Drop the first line in first partition because it is the header.
val rdd = rdd_raw.mapPartitionsWithIndex{(idx,iter) => 
                      if(idx == 0) iter.drop(1) else iter
}

// A function to create schema dynamically.
def schemaCreator(header: String): StructType = {
  StructType(header
              .split(",")
              .map(field => StructField(field.trim, StringType, true))
  )
}

// Create the schema for the csv that was read and store it.
val csvSchema: StructType = schemaCreator(rdd_raw.first)

// As the input is CSV, split it at "," and trim away the whitespaces.
val rdd_curated = rdd.map(x => x.split(",").map(y => y.trim)).map(xy => Row(xy:_*))

// Create the DF from the RDD.
val df = sqlContext.createDataFrame(rdd_curated, csvSchema)

imports that are necessary

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark._
Share:
10,848
Pooja Nayak
Author by

Pooja Nayak

Updated on June 04, 2022

Comments

  • Pooja Nayak
    Pooja Nayak almost 2 years

    I have got a CSV file along with a header which has to be read through Spark(2.0.0 and Scala 2.11.8) as a dataframe.

    Sample csv data:

    Item,No. of items,Place
    abc,5,xxx
    def,6,yyy
    ghi,7,zzz
    .........
    

    I'm facing problem when I try to read this csv data in spark as a dataframe, because the header contains column(No. of items) having special character "."

    Code with which I try to read csv data is:

    val spark = SparkSession.builder().appName("SparkExample")
    import spark.implicits._    
    val df = spark.read.option("header", "true").csv("file:///INPUT_FILENAME")
    

    Error I'm facing:

    Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to resolve No. of items given [Item,No. of items,Place];
    

    If I remove the "." from the header, I wont get any error. Even tried with escaping the character,but it escapes all the "." characters even from the data.

    Is there any way to escape the special character "." only from the CSV header using spark code?