How do I skip a header from CSV files in Spark?
Solution 1
If there were just one header line in the first record, then the most efficient way to filter it out would be:
rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
This doesn't help if of course there are many files with many header lines inside. You can union three RDDs you make this way, indeed.
You could also just write a filter
that matches only a line that could be a header. This is quite simple, but less efficient.
Python equivalent:
from itertools import islice
rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
Solution 2
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header) #filter out header
Solution 3
In Spark 2.0 a CSV reader is build into Spark, so you can easily load a CSV file as follows:
spark.read.option("header","true").csv("filePath")
Solution 4
From Spark 2.0 onwards what you can do is use SparkSession to get this done as a one liner:
val spark = SparkSession.builder.config(conf).getOrCreate()
and then as @SandeepPurohit said:
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
I hope it solved your question !
P.S: SparkSession is the new entry point introduced in Spark 2.0 and can be found under spark_sql package
Solution 5
Working in 2018 (Spark 2.3)
Python
df = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
Scala
val myDf = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")
PD1: myManualSchema is a predefined schema written by me, you could skip that part of code
UPDATE 2021 The same code works for Spark 3.x
df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.csv("mycsv.csv")
Hafiz Mujadid
Updated on February 22, 2021Comments
-
Hafiz Mujadid about 3 years
Suppose I give three files paths to a Spark context to read and each file has a schema in the first row. How can we skip schema lines from headers?
val rdd=sc.textFile("file1,file2,file3")
Now, how can we skip header lines from this rdd?
-
maasg over 9 yearsThe filter method would still be more efficient than the
zipWithIndex
approach proposed on the other answer. -
Hafiz Mujadid over 9 yearsno there is not only a sinle line there may be a line for each file.
-
Sean Owen over 9 yearsYes i mean that you could make an RDD for each file and strip its single header this way, then union.
-
Julio about 9 yearsmissing and drop(n) method here
-
Jimmy over 8 yearsThe question asks about how to skip headers in a csv file,If headers are ever present they will be present in the first row.
-
OneCricketeer over 7 yearsHow is this any different from this given answer? Your answer would require you to know the first column name ahead of time.
-
ciri over 7 yearsAre you sure this works from 2.0 onwards? I'm using v2.0.1 and getting "AttributeError: 'SparkContext' object has no attribute 'read'".
-
Sandeep Purohit over 7 years@ciri spark is not a SparkContext object its SparkSession object so if you wanna use csv reader you need SparkSession object
-
Sal over 7 yearsThis is not always true. If you write out a csv with Spark there could be multiple files, each with their own header. Using this as input to another Spark program will give you multiple headers. Also, you can feed in multiple files at one with Spark.
-
Abdul Mannan almost 7 years@cricket_007 coz this will filter out multiple header columns as pointed by other users.
-
jack AKA karthik over 6 yearsintuitive approach
-
Hafiz Mujadid over 6 yearshi kartik, i guess your solution is dealing for single file but the question was different.
-
Sahan Jayasumana about 6 yearsalternatively context = new org.apache.spark.sql.SQLContext(sc); var data = context.read.option("header","true").csv("<path>");
-
Alper t. Turker almost 6 yearsOnly the first part of your code is correct. Monotonically increasing id doesn't guarantee consecutive numbers. Please be so kind and revise.
-
Shubham Agrawal almost 6 years
rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
How can you say that if the index value is 0 then it will be a header? It won't help, it can be a header or other value of CSV or header with the value -
Amit Sadafule almost 6 yearserror: recursive value data needs type. Change last line to dataFiltered = data.filter(row => row != header)
-
thebluephantom almost 6 yearsSo what is the -1 for here?
-
Enrique Benito Casado over 5 years@Antonio Cachuan you should give code that work, withoud your personal Example "schema(myManualSchema)" should not be in the solution at all.
-
mahmoud mehdi over 5 yearsI agree with @shubhamAgrawal .. you're not sure that the index 0 refers to the first line. the data is already partionned and you have no idea if the header is the first line of the partition or not.
-
Sean Owen over 5 yearsThe first partition will contain the first part of the file; that much definitely happens. It wouldn't necessarily if it had been repartitioned with a shuffle. This is definitely a fragile approach and depends heavily on what files have what header. In the 4 years since that answer, Spark has much much better ways of reading CSV that will handle headers, infer schema, etc.
-
iLikeKFC over 3 yearsDoes this solution scan the entire rdd and does a check on every row just to pop the header at the very top? Is this really the most efficient way?
-
iLikeKFC over 3 yearsDoes this solution scan the entire rdd and does a check on every row just to pop the header at the very top? Is this really the most efficient way?