Spark - load CSV file as DataFrame?

501,909

Solution 1

spark-csv is part of core Spark functionality and doesn't require a separate library. So you could just do for example

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

In scala,(this works for any format-in delimiter mention "," for csv, "\t" for tsv etc)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

Solution 2

Parse CSV and load as DataFrame/DataSet with Spark 2.x

First, initialize SparkSession object by default it will available in shells as spark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Use any one of the following ways to load CSV as DataFrame/DataSet

1. Do it in a programmatic way

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Update: Adding all options from here in case the link will be broken in future

  • path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.
  • header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. The default value is false.
  • delimiter: by default columns are delimited using, but delimiter can be set to any character
  • quote: by default the quote character is ", but can be set to any character. Delimiters inside quotes are ignored
  • escape: by default, the escape character is , but can be set to any character. Escaped quote characters are ignored
  • parserLib: by default, it is "commons" that can be set to "univocity" to use that library for CSV parsing.
  • mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
    • PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.
    • DROPMALFORMED: drops lines that have fewer or more tokens than expected or tokens which do not match the schema
    • FAILFAST: aborts with a RuntimeException if encounters any malformed line charset: defaults to 'UTF-8' but can be set to other valid charset names
  • inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default comment: skip lines beginning with this character. Default is "#". Disable comments by setting this to null.
  • nullValue: specifies a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame
  • dateFormat: specifies a string that indicates the date format to use when reading dates or timestamps. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to both DateType and TimestampType. By default, it is null which means trying to parse times and date by java.sql.Timestamp.valueOf() and java.sql.Date.valueOf().

2. You can do this SQL way as well

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dependencies:

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Spark version < 2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dependencies:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

Solution 3

It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)

Solution 4

With Spark 2.0, following is how you can read CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)

Solution 5

In Java 1.8 This code snippet perfectly working to read CSV files

POM.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Java

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Share:
501,909

Related videos on Youtube

Donbeo
Author by

Donbeo

Updated on September 12, 2021

Comments

  • Donbeo
    Donbeo over 2 years

    I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")

    I have tried:

    scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
    

    Error which I got:

    java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
        at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
        at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
        at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
        at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    

    What is the right command to load CSV file as DataFrame in Apache Spark?

  • OneCricketeer
    OneCricketeer over 7 years
    While this may be useful to someone. The question has a Scala tag.
  • Puneet
    Puneet over 7 years
    do this session require hive? I am getting hive errors.
  • mrsrinivas
    mrsrinivas over 7 years
    No need. Only spark-core_2.11 and spark-sql_2.11 of 2.0.1 version is fine. If possible add the error message.
  • Omkar Puttagunta
    Omkar Puttagunta about 7 years
    can we convert a pipe delimited file to a dataframe?
  • mrsrinivas
    mrsrinivas about 7 years
    @OmkarPuttagunta: Yes, off course! try some thing like this spark.read.format("csv").option("delimiter ", "|") ...
  • Davos
    Davos about 6 years
    I hadn't seen this csv method or passing a map to options. Agreed always better off providing explicit schema, inferSchema is fine for quick n dirty (aka data science) but terrible for ETL.
  • Davos
    Davos about 6 years
    The other option for programmatic way is to leave off the .format("csv") and replace .load(... with .csv(.... The option method belongs to the DataFrameReader class as returned by the read method, where the load and csv methods return a dataframe so can't have options tagged on after they are called. This answer is pretty thorough but you should link to the documentation so people can see all the other CSV options available spark.apache.org/docs/latest/api/scala/…*):org.apache.spark.‌​sql.DataFrame
  • Davos
    Davos about 6 years
    @OmkarPuttagunta The documentation I linked in above comment mentions sep (default ,): sets a single character as a separator for each field and value. and says nothing about delimiter so I would use sep even if delimiter is working.
  • Eric
    Eric almost 6 years
    Is there a difference between spark.read.csv(path) and spark.read.format("csv").load(path)?
  • mrsrinivas
    mrsrinivas over 4 years
    This is same as existing answers
  • mrsrinivas
    mrsrinivas over 4 years
    This is same as existing answers
  • Purushothaman Srikanth
    Purushothaman Srikanth over 2 years
    @mrsrinivas Will method 2 be faster than method 1?
  • mrsrinivas
    mrsrinivas over 2 years
    Both should methods should be the same in terms of speed.