Set schema in pyspark dataframe read.csv with null elements

22,618

Solution 1

You can set a new null value in spark's csv loader using nullValue:

for a csv file looking like this:

col_01,col_02,col_03
111,2007-11-18,3
112,2002-12-03,4
113,2007-02-14,5
114,2003-04-16,NA
115,2011-08-24,2
116,2003-05-03,3
117,2001-06-11,4
118,2004-05-06,NA
119,2012-03-25,5
120,2006-10-13,4

and forcing schema:

from pyspark.sql.types import StructType, IntegerType, DateType

schema = StructType([
    StructField("col_01", IntegerType()),
    StructField("col_02", DateType()),
    StructField("col_03", IntegerType())
])

You'll get:

df = spark.read.csv(filename, header=True, nullValue='NA', schema=schema)
df.show()
df.printSchema()

    +------+----------+------+
    |col_01|    col_02|col_03|
    +------+----------+------+
    |   111|2007-11-18|     3|
    |   112|2002-12-03|     4|
    |   113|2007-02-14|     5|
    |   114|2003-04-16|  null|
    |   115|2011-08-24|     2|
    |   116|2003-05-03|     3|
    |   117|2001-06-11|     4|
    |   118|2004-05-06|  null|
    |   119|2012-03-25|     5|
    |   120|2006-10-13|     4|
    +------+----------+------+

    root
     |-- col_01: integer (nullable = true)
     |-- col_02: date (nullable = true)
     |-- col_03: integer (nullable = true)

Solution 2

Try this once - (But this will read every column as string type. You can type caste as per your requirement)

import csv
from pyspark.sql.types import IntegerType

data = []
with open('filename', 'r' ) as doc:
    reader = csv.DictReader(doc)
    for line in reader:
        data.append(line)

df = sc.parallelize(data).toDF()
df = df.withColumn("col_03", df["col_03"].cast(IntegerType()))
Share:
22,618
clumdee
Author by

clumdee

Updated on July 27, 2022

Comments

  • clumdee
    clumdee almost 2 years

    I have a data set (example) that when imported with

    df = spark.read.csv(filename, header=True, inferSchema=True)
    df.show()
    

    will assign the column with 'NA' as a stringType(), where I would like it to be IntegerType() (or ByteType()).

    inferSchema

    I then tried to set

    schema = StructType([
        StructField("col_01", IntegerType()),
        StructField("col_02", DateType()),
        StructField("col_03", IntegerType())
    ])
    df = spark.read.csv(filename, header=True, schema=schema)
    df.show()
    

    The output shows the entire row with 'col_03' = null to be null.

    entire_row_null

    However col_01 and col_02 return appropriate data if they are called with

    df.select(['col_01','col_02']).show()
    

    row_actually_not_null

    I can find a way around this by post casting the data type of col_3

    df = spark.read.csv(filename, header=True, inferSchema=True)
    df = df.withColumn('col_3',df['col_3'].cast(IntegerType()))
    df.show()
    

    import_then_cast

    , but I think it is not ideal and would be much better if I can assign the data type for each column directly with setting schema.

    Would anyone be able to guide me what I do incorrectly? Or casting the data types after importing is the only solution? Any comment regarding performance of the two approaches (if we can make assigning schema to work) is also welcome.

    Thank you,

  • clumdee
    clumdee about 6 years
    Thank you for sharing. I think it not different from my work-around solution though. Since both approaches read the data first and then cast the type. I am looking for a way that allows casting before reading the data and not giving the weird behavior.
  • clumdee
    clumdee about 6 years
    This is perfect. Thank you.