How to write the resulting RDD to a csv file in Spark python

94,977

Solution 1

Just map the lines of the RDD (labelsAndPredictions) into strings (the lines of the CSV) then use rdd.saveAsTextFile().

def toCSVLine(data):
  return ','.join(str(d) for d in data)

lines = labelsAndPredictions.map(toCSVLine)
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')

Solution 2

I know this is an old post. But to help someone searching for the same, here's how I write a two column RDD to a single CSV file in PySpark 1.6.2

The RDD:

>>> rdd.take(5)
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]

Now the code:

# First I convert the RDD to dataframe
from pyspark import SparkContext
df = sqlContext.createDataFrame(rdd, ['count', 'word'])

The DF:

>>> df.show()
+-----+-----------+
|count|       word|
+-----+-----------+
|73342|      cells|
|62861|       cell|
|61714|    studies|
|61377|        aim|
|60168|   clinical|
|59275|          2|
|59221|          1|
|58274|       data|
|58087|development|
|56579|     cancer|
|50243|    disease|
|49817|   provided|
|49216|   specific|
|48857|     health|
|48536|      study|
|47827|    project|
|45573|description|
|45455|  applicant|
|44739|    program|
|44522|   patients|
+-----+-----------+
only showing top 20 rows

Now write to CSV

# Write CSV (I have HDFS storage)
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')

P.S: I am just a beginner learning from posts here in Stackoverflow. So I don't know whether this is the best way. But it worked for me and I hope it will help someone!

Solution 3

It's not good to just join by commas because if fields contain commas, they won't be properly quoted, e.g. ','.join(['a', 'b', '1,2,3', 'c']) gives you a,b,1,2,3,c when you'd want a,b,"1,2,3",c. Instead, you should use Python's csv module to convert each list in the RDD to a properly-formatted csv string:

# python 3
import csv, io

def list_to_csv_str(x):
    """Given a list of strings, returns a properly-csv-formatted string."""
    output = io.StringIO("")
    csv.writer(output).writerow(x)
    return output.getvalue().strip() # remove extra newline

# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")

Since the csv module only writes to file objects, we have to create an empty "file" with io.StringIO("") and tell the csv.writer to write the csv-formatted string into it. Then, we use output.getvalue() to get the string we just wrote to the "file". To make this code work with Python 2, just replace io with the StringIO module.

If you're using the Spark DataFrames API, you can also look into the DataBricks save function, which has a csv format.

Share:
94,977

Related videos on Youtube

Jason Donnald
Author by

Jason Donnald

Updated on July 09, 2022

Comments

  • Jason Donnald
    Jason Donnald almost 2 years

    I have a resulting RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions). This has output in this format:

    [(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
    

    What I want is to create a CSV file with one column for labels (the first part of the tuple in above output) and one for predictions(second part of tuple output). But I don't know how to write to a CSV file in Spark using Python.

    How can I create a CSV file with the above output?

  • Jason Donnald
    Jason Donnald over 8 years
    Just as a doubt, so where will this save the csv file? In the same directory as where the code is present?Can I save it to other directory (using saveAsTextFile('/home/files/labels-and-predictions.csv') )?
  • Daniel Darabos
    Daniel Darabos over 8 years
    You can use any path the Hadoop client library understands. Typically it will be a path on a distributed file system. I've updated the example to illustrate this. But if you're just testing on one machine, you can use a local path as well.
  • Jason Donnald
    Jason Donnald over 8 years
    I tried to use it but when I execute it creates a directory named 'labels-and-predictions.csv' and inside that directory there are two files - _SUCCESS and part-00000
  • Daniel Darabos
    Daniel Darabos over 8 years
    Yeah, that's the output. It will be split into multiple files if the RDD is made up of multiple partitions. Is this a problem for you? This is perfectly normal and desired in big data use cases.
  • Jason Donnald
    Jason Donnald over 8 years
    so to read this csv file again somewhere else in ipython I just need to put the 'labels-and-predictions.csv' and all the part files inside that will be read automatically?
  • Daniel Darabos
    Daniel Darabos over 8 years
    It depends on the tool. Hadoop tools will read all the part-xxx files. Spark will also read it when you use sc.textFile. For conventional tools you may need to merge the data into a single file first. If the output is small enough to be handled by conventional tools though, there is no reason to save it via Spark. Just collect the RDD and write the data to a local file without Spark.
  • Moe Chughtai
    Moe Chughtai over 7 years
    I get a TypeError using this code. TypeError: can't write str to text stream.
  • Galen Long
    Galen Long over 7 years
    @Moe Chughtai Which version of Spark/Python are you using? Which line gives you the type error, and on what input?
  • Quetzalcoatl
    Quetzalcoatl almost 7 years
    It doesn't seem standard to assume that a .csv extension in the destination path guarantees that csv will be written, as illustrated in the 5th comment above. It seems more common that protocols involved with hdfs and s3 may encrypt and compress the output resulting in a non-csv file.
  • Andrew
    Andrew over 6 years
    I had to encode some columns w/ JSON but otherwise this works
  • Jie
    Jie about 2 years
    @lnsiloco, how to write this to a Snowflake or a Redshift database? Thanks!