You read the csv file first and tried to convert to it to dataset which has different schema. Its better to pass the schema created while reading the csv file as below

val spark = SparkSession.builder()

import org.apache.spark.sql.Encoders
val schema = Encoders.product[Record].schema

val ds =
  .option("header", "true")
  .schema(schema)  // passing schema 
  .option("timestampFormat", "MM/dd/yyyy HH:mm") // passing timestamp format
  .csv(path)// csv path
  .as[Record] // convert to DS

The default timestampFormat is yyyy-MM-dd'T'HH:mm:ss.SSSXXX so you also need to pass your custom timestampFormat.

Updated on June 04, 2022


  • shams
    shams almost 2 years

    I got this exception while playing with spark.

    Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot up cast price from string to int as it may truncate The type path of the target object is: - field (class: "scala.Int", name: "price") - root class: "org.spark.code.executable.Main.Record" You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;

    How Can this exception be solved? Here is the code

    object Main {
     case class Record(transactionDate: Timestamp, product: String, price: Int, paymentType: String, name: String, city: String, state: String, country: String,
                    accountCreated: Timestamp, lastLogin: Timestamp, latitude: String, longitude: String)
     def main(args: Array[String]) {
       System.setProperty("hadoop.home.dir", "C:\\winutils\\");
       val schema = Encoders.product[Record].schema
       val df =
      .option("header", "true")
       import SparkConfig.sparkSession.implicits._
       val ds =[Record]
      //ds.groupByKey(body => body.state).count().show()
      import org.apache.spark.sql.expressions.scalalang.typed.{
      count => typedCount,
      sum => typedSum
      ds.groupByKey(body => body.state)
      .withColumnRenamed("value", "group")
      .alias("Summary by state")
  • combinatorist
    combinatorist over 6 years
    Is there any reason this wouldn't work in spark-shell? When I add my schema as you suggested, my code compiles (:paste <file path>) and runs (main.main), but all fields in all the rows come back null.
  • koiralo
    koiralo over 6 years
    @combinatorist this should work on spark-shell as well.
  • combinatorist
    combinatorist over 6 years
    @Shankar, so what (in general) does it mean if it runs with the schema, but every field is null? Does that mean none of the rows in the file matched my schema, or could it be something else?
  • koiralo
    koiralo over 6 years
    Did it work with the normal class? One of the reason would be, not matching the schema for some of the fields, which loads all the columns of that row will be null.
  • combinatorist
    combinatorist over 6 years
    I'm not sure what you mean by "normal class". If I don't apply my schema, the dataframe populates, but everything is a string, and if I cast (.as) my schema on that dataframe, I get the nulls again. FWIW, this is my case class: case class OpioidScript(prescription_key:String, member_key:String, morphine_equivalent_dose:String, prescription_fill_date:Date, days_supply:Int)
  • WestCoastProjects
    WestCoastProjects over 4 years
    Nice! Did not know about extracting using Encoders.product[MyCaseClass].schema
  • Omkar Neogi
    Omkar Neogi about 4 years
    @combinatorist Possibly the schema is incorrectly defined.