Load CSV file with Spark
Solution 1
Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
Alternatively, you could print the culprit (if any):
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
Solution 2
Spark 2.0.0+
You can use built-in csv data source directly:
spark.read.csv(
"some_input_file.csv",
header=True,
mode="DROPMALFORMED",
schema=schema
)
or
(
spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv")
)
without including any external dependencies.
Spark < 2.0.0:
Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv
:
Make sure that Spark CSV is included in the path (--packages
, --jars
, --driver-class-path
)
And load your data as follows:
df = (
sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.
Note:
If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader
. Assuming you have three columns - integer, double and string:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(
sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
Solution 3
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")
print(df.collect())
Solution 4
And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.
For example:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
Solution 5
Simply splitting by comma will also split commas that are within fields (e.g. a,b,"1,2,3",c
), so it's not recommended. zero323's answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv module:
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you'll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(make sure not to modify header
before the filter evaluates). But at this point, you're probably better off using a built-in csv parser.
Kernael
Updated on January 05, 2022Comments
-
Kernael over 2 years
I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :
sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect()
I would expect this call to give me a list of the two first columns of my file but I'm getting this error :
File "", line 1, in IndexError: list index out of range
although my CSV file as more than one column.
-
Kernael about 9 yearsThat was it, one line with only one column, thank you.
-
WoodChopper over 8 yearsWhy would OP would like to do on spark if he is able to load data in pandas
-
zero323 about 8 yearsYou don't need Hive to use DataFrames. Regarding your solution: a) There is no need for
StringIO
.csv
can use any iterable b)__next__
shouldn't be used directly and will fail on empty line. Take a look at flatMap c) It would be much more efficient to usemapPartitions
instead of initializing reader on each line :) -
Galen Long about 8 yearsThanks so much for the corrections! Before I edit my answer, I want to make sure I understand fully. 1) Why does
rdd.mapPartitions(lambda x: csv.reader(x))
work whilerdd.map(lambda x: csv.reader(x))
throws an error? I expected both to throw the sameTypeError: can't pickle _csv.reader objects
. It also seems likemapPartitions
automatically calls some equivalent to "readlines" on thecsv.reader
object, where withmap
, I needed to call__next__
explicitly to get the lists out of thecsv.reader
. 2) Where doesflatMap
come in? Just callingmapPartitions
alone worked for me. -
Galen Long about 8 yearsIf you do this, don't forget to include the databricks csv package when you open the pyspark shell or use spark-submit. For example,
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(make sure to change the databricks/spark versions to the ones you have installed). -
zero323 about 8 years
rdd.mapPartitions(lambda x: csv.reader(x))
works becausemapPartitions
expects anIterable
object. If you want to be explicit you could you comprehension or generator expression.map
alone doesn't work because it doesn't iterate over object. Hence my suggestion to useflatMap(lambda x: csv.reader([x]))
which will iterate over the reader. ButmapPartitions
is much better here. -
SummerEla almost 8 yearsNot wanting to install or specify dependencies on every spark cluster....
-
abby sobh over 7 yearsPanda allows file chunking when reading so there is still a use-case here for having Pandas handle initial file parsing. See my answer below for code.
-
muon about 7 yearsnote that this will read header as a row of data, not as header
-
sudo about 7 yearsIt's better to parse using the built-in
csv
library to handle all the escaping because simply splitting by comma won't work if, say, there are commas in the values. -
AntiPawn79 over 6 yearsCaution: Pandas also handles column schema way differently than spark especially when there are blanks involved. Safer to just load csv in as strings for each column.
-
Stephen over 6 yearsThere are plenty of tools to parse csv, don't reinvent the wheel
-
Geoffrey Anderson almost 6 yearsIs it csvContext or sqlContext in pyspark? Because in scala you need csvContext
-
Alceu Costa almost 6 yearsThis code will break if there is a comma inside quotes. Parsing csv is more complicated than just splitting at
","
. -
Grant Shannon over 5 yearsuse 'sep not 'separator' as follows: df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")
-
Jeril about 5 years@GalenLong if you dont mind, can you share the already existing answer
-
Galen Long almost 5 yearsWeird, I swear there was another answer with this solution. Maybe I got this confused with another question. My bad.
-
flow2k almost 5 years@WoodChopper You can use Pandas as a UDF in Spark, no?
-
Luk Aron over 4 yearswhere is the spark come from? is it
import pyspark as spark
? -
flow2k over 4 years@LukAron In a pyspark shell,
spark
is already initialized. In a script submitted byspark-submit
, you can instantiate it asfrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
. -
Tom N Tech about 4 yearsThis breaks for commas. This is very bad.
-
user2441441 over 3 years@zero323 I spent four plus hours trying to get spark to read csv coumns in numeric type but they would all be null. Until I tried your suggestion - .option("inferschema", "true"). Thank you! Not sure why spark is not able to reach explicit schema, even when it looks correct.
-
demongolem over 3 yearsYes, escape is very important. There is another related option quote='"' which makes things confusing but don't forget escape
-
Rookie007 over 2 years@GalenLong I cant find a
spar-csv
package which supportsscala - 2.12
we are upgrading our code withScala -2.12.12 Spark - 3.0.1
and we are facing issue with_corrupt_record
not being there when the time of trying to get count, but actually it's there I can see that column in the DataFrame,