Load CSV file with Spark

376,997

Solution 1

Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternatively, you could print the culprit (if any):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

Solution 2

Spark 2.0.0+

You can use built-in csv data source directly:

spark.read.csv(
    "some_input_file.csv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema
)

or

(
    spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv")
)

without including any external dependencies.

Spark < 2.0.0:

Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv:

Make sure that Spark CSV is included in the path (--packages, --jars, --driver-class-path)

And load your data as follows:

df = (
    sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

It can handle loading, schema inference, dropping malformed lines and doesn't require passing data from Python to the JVM.

Note:

If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader. Assuming you have three columns - integer, double and string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(
    sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

Solution 3

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")

print(df.collect())

Solution 4

And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.

For example:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

Solution 5

Simply splitting by comma will also split commas that are within fields (e.g. a,b,"1,2,3",c), so it's not recommended. zero323's answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv module:

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you'll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (make sure not to modify header before the filter evaluates). But at this point, you're probably better off using a built-in csv parser.

Share:
376,997
Kernael
Author by

Kernael

Updated on January 05, 2022

Comments

  • Kernael
    Kernael over 2 years

    I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :

    sc.textFile('file.csv')
        .map(lambda line: (line.split(',')[0], line.split(',')[1]))
        .collect()
    

    I would expect this call to give me a list of the two first columns of my file but I'm getting this error :

    File "", line 1, in IndexError: list index out of range

    although my CSV file as more than one column.

  • Kernael
    Kernael about 9 years
    That was it, one line with only one column, thank you.
  • WoodChopper
    WoodChopper over 8 years
    Why would OP would like to do on spark if he is able to load data in pandas
  • zero323
    zero323 about 8 years
    You don't need Hive to use DataFrames. Regarding your solution: a) There is no need for StringIO. csv can use any iterable b) __next__ shouldn't be used directly and will fail on empty line. Take a look at flatMap c) It would be much more efficient to use mapPartitions instead of initializing reader on each line :)
  • Galen Long
    Galen Long about 8 years
    Thanks so much for the corrections! Before I edit my answer, I want to make sure I understand fully. 1) Why does rdd.mapPartitions(lambda x: csv.reader(x)) work while rdd.map(lambda x: csv.reader(x)) throws an error? I expected both to throw the same TypeError: can't pickle _csv.reader objects. It also seems like mapPartitions automatically calls some equivalent to "readlines" on the csv.reader object, where with map, I needed to call __next__ explicitly to get the lists out of the csv.reader. 2) Where does flatMap come in? Just calling mapPartitions alone worked for me.
  • Galen Long
    Galen Long about 8 years
    If you do this, don't forget to include the databricks csv package when you open the pyspark shell or use spark-submit. For example, pyspark --packages com.databricks:spark-csv_2.11:1.4.0 (make sure to change the databricks/spark versions to the ones you have installed).
  • zero323
    zero323 about 8 years
    rdd.mapPartitions(lambda x: csv.reader(x)) works because mapPartitions expects an Iterable object. If you want to be explicit you could you comprehension or generator expression. map alone doesn't work because it doesn't iterate over object. Hence my suggestion to use flatMap(lambda x: csv.reader([x])) which will iterate over the reader. But mapPartitions is much better here.
  • SummerEla
    SummerEla almost 8 years
    Not wanting to install or specify dependencies on every spark cluster....
  • abby sobh
    abby sobh over 7 years
    Panda allows file chunking when reading so there is still a use-case here for having Pandas handle initial file parsing. See my answer below for code.
  • muon
    muon about 7 years
    note that this will read header as a row of data, not as header
  • sudo
    sudo about 7 years
    It's better to parse using the built-in csv library to handle all the escaping because simply splitting by comma won't work if, say, there are commas in the values.
  • AntiPawn79
    AntiPawn79 over 6 years
    Caution: Pandas also handles column schema way differently than spark especially when there are blanks involved. Safer to just load csv in as strings for each column.
  • Stephen
    Stephen over 6 years
    There are plenty of tools to parse csv, don't reinvent the wheel
  • Geoffrey Anderson
    Geoffrey Anderson almost 6 years
    Is it csvContext or sqlContext in pyspark? Because in scala you need csvContext
  • Alceu Costa
    Alceu Costa almost 6 years
    This code will break if there is a comma inside quotes. Parsing csv is more complicated than just splitting at ",".
  • Grant Shannon
    Grant Shannon over 5 years
    use 'sep not 'separator' as follows: df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")
  • Jeril
    Jeril about 5 years
    @GalenLong if you dont mind, can you share the already existing answer
  • Galen Long
    Galen Long almost 5 years
    Weird, I swear there was another answer with this solution. Maybe I got this confused with another question. My bad.
  • flow2k
    flow2k almost 5 years
    @WoodChopper You can use Pandas as a UDF in Spark, no?
  • Luk Aron
    Luk Aron over 4 years
    where is the spark come from? is it import pyspark as spark?
  • flow2k
    flow2k over 4 years
    @LukAron In a pyspark shell, spark is already initialized. In a script submitted by spark-submit, you can instantiate it as from pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate().
  • Tom N Tech
    Tom N Tech about 4 years
    This breaks for commas. This is very bad.
  • user2441441
    user2441441 over 3 years
    @zero323 I spent four plus hours trying to get spark to read csv coumns in numeric type but they would all be null. Until I tried your suggestion - .option("inferschema", "true"). Thank you! Not sure why spark is not able to reach explicit schema, even when it looks correct.
  • demongolem
    demongolem over 3 years
    Yes, escape is very important. There is another related option quote='"' which makes things confusing but don't forget escape
  • Rookie007
    Rookie007 over 2 years
    @GalenLong I cant find a spar-csv package which supports scala - 2.12 we are upgrading our code with Scala -2.12.12 Spark - 3.0.1 and we are facing issue with _corrupt_record not being there when the time of trying to get count, but actually it's there I can see that column in the DataFrame,