Spark - load CSV file as DataFrame?
Solution 1
spark-csv is part of core Spark functionality and doesn't require a separate library. So you could just do for example
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
In scala,(this works for any format-in delimiter mention "," for csv, "\t" for tsv etc)
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("delimiter", ",")
.load("csvfile.csv")
Solution 2
Parse CSV and load as DataFrame/DataSet with Spark 2.x
First, initialize SparkSession
object by default it will available in shells as spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;
Use any one of the following ways to load CSV as
DataFrame/DataSet
1. Do it in a programmatic way
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
Update: Adding all options from here in case the link will be broken in future
- path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.
- header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. The default value is false.
- delimiter: by default columns are delimited using, but delimiter can be set to any character
- quote: by default the quote character is ", but can be set to any character. Delimiters inside quotes are ignored
- escape: by default, the escape character is , but can be set to any character. Escaped quote characters are ignored
- parserLib: by default, it is "commons" that can be set to "univocity" to use that library for CSV parsing.
-
mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
- PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.
- DROPMALFORMED: drops lines that have fewer or more tokens than expected or tokens which do not match the schema
- FAILFAST: aborts with a RuntimeException if encounters any malformed line charset: defaults to 'UTF-8' but can be set to other valid charset names
- inferSchema: automatically infers column types. It requires one extra pass over the data and is false by default comment: skip lines beginning with this character. Default is "#". Disable comments by setting this to null.
- nullValue: specifies a string that indicates a null value, any fields matching this string will be set as nulls in the DataFrame
- dateFormat: specifies a string that indicates the date format to use when reading dates or timestamps. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to both DateType and TimestampType. By default, it is null which means trying to parse times and date by java.sql.Timestamp.valueOf() and java.sql.Date.valueOf().
2. You can do this SQL way as well
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
Dependencies:
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Spark version < 2.0
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
Dependencies:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
Solution 3
It's for whose Hadoop is 2.6 and Spark is 1.6 and without "databricks" package.
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
Solution 4
With Spark 2.0, following is how you can read CSV
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
Solution 5
In Java 1.8 This code snippet perfectly working to read CSV files
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Related videos on Youtube
Donbeo
Updated on September 12, 2021Comments
-
Donbeo over 2 years
I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with
df.registerTempTable("table_name")
I have tried:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Error which I got:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
What is the right command to load CSV file as DataFrame in Apache Spark?
-
mrsrinivas over 7 yearscheck this link for doing it in Spark 2.0
-
-
OneCricketeer over 7 yearsWhile this may be useful to someone. The question has a Scala tag.
-
Puneet over 7 yearsdo this session require hive? I am getting hive errors.
-
mrsrinivas over 7 yearsNo need. Only
spark-core_2.11
andspark-sql_2.11
of2.0.1
version is fine. If possible add the error message. -
Omkar Puttagunta about 7 yearscan we convert a pipe delimited file to a dataframe?
-
mrsrinivas about 7 years@OmkarPuttagunta: Yes, off course! try some thing like this
spark.read.format("csv").option("delimiter ", "|") ...
-
Davos about 6 yearsI hadn't seen this csv method or passing a map to options. Agreed always better off providing explicit schema, inferSchema is fine for quick n dirty (aka data science) but terrible for ETL.
-
Davos about 6 yearsThe other option for
programmatic way
is to leave off the.format("csv")
and replace.load(...
with.csv(...
. Theoption
method belongs to the DataFrameReader class as returned by theread
method, where theload
andcsv
methods return a dataframe so can't have options tagged on after they are called. This answer is pretty thorough but you should link to the documentation so people can see all the other CSV options available spark.apache.org/docs/latest/api/scala/…*):org.apache.spark.sql.DataFrame -
Davos about 6 years@OmkarPuttagunta The documentation I linked in above comment mentions
sep (default ,): sets a single character as a separator for each field and value.
and says nothing aboutdelimiter
so I would usesep
even ifdelimiter
is working. -
Eric almost 6 yearsIs there a difference between
spark.read.csv(path)
andspark.read.format("csv").load(path)
? -
mrsrinivas over 4 yearsThis is same as existing answers
-
mrsrinivas over 4 yearsThis is same as existing answers
-
Purushothaman Srikanth over 2 years@mrsrinivas Will method 2 be faster than method 1?
-
mrsrinivas over 2 yearsBoth should methods should be the same in terms of speed.