TypeError when converting Pandas to Spark

10,873

Solution 1

You could use reflection to infer the schema from an RDD of Row objects, e.g.,

from pyspark.sql import Row
mdfRows = mdf.map(lambda p: Row(dbn=p[0], boro=p[1], bus=p[2]))
dfOut = sqlContext.createDataFrame(mdfRows)

Does that achieve the desired result?

Solution 2

I had the same issue and was able to track it down to a single entry which had a value of length 0 (or empty). The _inferScheme command runs on each row of the dataframe and determines the types. By default assumption is that the empty value is a Double while the other is a String. These two types cannot be merged by the _merge_type command. The issue has been filed https://issues.apache.org/jira/browse/SPARK-18178, but the best way around is probably supplying a schema to the createDataFrame command.

The code below reproduces the problem in PySpark 2.0

import pandas as pd
from io import StringIO
test_df = pd.read_csv(StringIO(',Scan Options\n15,SAT2\n16,\n'))
sqlContext.createDataFrame(test_df).registerTempTable('Test')
o_qry = sqlContext.sql("SELECT * FROM Test LIMIT 1")
o_qry.first()

Solution 3

You can try this as well:

def create_spark_dataframe(file_name):
   """
   will return the spark dataframe input pandas dataframe
   """
   pandas_data_frame = pd.read_csv(file_name, converters= {"PRODUCT": str})
   for col in pandas_data_frame.columns:
   if ((pandas_data_frame[col].dtypes != np.int64) & 
      (pandas_data_frame[col].dtypes != np.float64)):
    pandas_data_frame[col] = pandas_data_frame[col].fillna('')

   spark_data_frame = sqlContext.createDataFrame(pandas_data_frame)
   return spark_data_frame

This will solve your problem.

Share:
10,873
gold_cy
Author by

gold_cy

BY DAY: Software Engineer BY NIGHT: Depends

Updated on July 09, 2022

Comments

  • gold_cy
    gold_cy almost 2 years

    So I have looked up this question on here but previous solutions have not worked for me. I have a DataFrame in this format

    mdf.head()
        dbn       boro       bus
    0   17K548  Brooklyn    B41, B43, B44-SBS, B45, B48, B49, B69
    1   09X543  Bronx       Bx13, Bx15, Bx17, Bx21, Bx35, Bx4, Bx41, Bx4A,...
    4   28Q680  Queens      Q25, Q46, Q65
    6   14K474  Brooklyn    B24, B43, B48, B60, Q54, Q59
    

    There are a couple more columns but I have excluded them (subway lines and test scores). When I try to convert this DataFrame into a Spark DataFrame I am given an error which is this.

    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    <ipython-input-30-1721be5c2987> in <module>()
    ----> 1 sparkdf = sqlc.createDataFrame(mdf)
    
    /usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
        423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
        424         else:
    --> 425             rdd, schema = self._createFromLocal(data, schema)
        426         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
        427         jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
    
    /usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _createFromLocal(self, data, schema)
        339 
        340         if schema is None or isinstance(schema, (list, tuple)):
    --> 341             struct = self._inferSchemaFromList(data)
        342             if isinstance(schema, (list, tuple)):
        343                 for i, name in enumerate(schema):
    
    /usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/context.pyc in _inferSchemaFromList(self, data)
        239             warnings.warn("inferring schema from dict is deprecated,"
        240                           "please use pyspark.sql.Row instead")
    --> 241         schema = reduce(_merge_type, map(_infer_schema, data))
        242         if _has_nulltype(schema):
        243             raise ValueError("Some of types cannot be determined after inferring")
    
    /usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
        860         nfs = dict((f.name, f.dataType) for f in b.fields)
        861         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType())))
    --> 862                   for f in a.fields]
        863         names = set([f.name for f in fields])
        864         for n in nfs:
    
    /usr/local/Cellar/apache-spark/1.6.2/libexec/python/pyspark/sql/types.pyc in _merge_type(a, b)
        854     elif type(a) is not type(b):
        855         # TODO: type cast (such as int -> long)
    --> 856         raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
        857 
        858     # same type
    
    TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
    

    From what I have read this might be a problem with the headers being treated as data. It is my understanding you can't remove the headers from a DataFrame so how would I proceed with solving this error and converting this DataFrame into a Spark one?

    Edit: Here is the code for how I created the Pandas DF and worked my way around the problem.

    sqlc = SQLContext(sc)
    df = pd.DataFrame(pd.read_csv('hsdir.csv', encoding = 'utf_8_sig'))
    df = df[['dbn', 'boro', 'bus', 'subway', 'total_students']]
    df1 = pd.DataFrame(pd.read_csv('sat_r.csv', encoding = 'utf_8_sig'))
    df1 = df1.rename(columns = {'Num of SAT Test Takers': 'num_test_takers', 'SAT Critical Reading Avg. Score': 'read_avg', 'SAT Math Avg. Score' : 'math_avg', 'SAT Writing Avg. Score' : 'write_avg'})
    mdf = pd.merge(df, df1, left_on = 'dbn', right_on = 'DBN', how = 'left')
    mdf = mdf[pd.notnull(mdf['DBN'])]
    mdf.to_csv('merged.csv', encoding = 'utf-8')
    ndf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("merged.csv")
    

    The last line of this code, loading it from my local machine ended up allowing me to convert the CSV properly to a Data Frame however my question still remains. Why did it not work in the first place?

  • gold_cy
    gold_cy over 7 years
    I'm getting an error AttributeError: 'DataFrame' object has no attribute 'map'
  • user4601931
    user4601931 over 7 years
    Oh. mdf is a pandas DataFrame? I assumed wrongly that it was a Spark RDD. Do you need to use pandas? Or can you create a Spark RDD and then convert it to a Spark DataFrame as above?
  • gold_cy
    gold_cy over 7 years
    So this is the issue I face. If I load it as an RDD using com.databricks.spark.csv to read it as a CSV, it completely disregards the dbn column and moves everything one column to the left. I'm not sure how to avoid this issue so I loaded it through Pandas read_csv which preserved the formatting of the original CSV.
  • user4601931
    user4601931 over 7 years
    Is what you're saying is that you tried spark.read.csv("/path/to/file.csv", header=True), and that did not work?
  • user4601931
    user4601931 over 7 years
    I'm honestly not quite sure what the issue is... I have made a pandas DataFrame from the sample data you gave and executed sparkDF = spark.createDataFrame(df) without problem. I've also made a CSV file from the sample data and ran sparkDF = spark.read.csv("sample.csv", header=True), also without issue. Maybe you could include in your question a little about how you created the pandas DataFrame?
  • gold_cy
    gold_cy over 7 years
    I used the spark-cdv package since I'm using Spark 1.6.2 (thats the latest version available on HomeBrew). Should I update to 2.0 since I know that they inlined the read.csv directly into the program? The problem is I am assuming the CSV has encoded characters and/or trailing/leading whitespace. I'll update the post with how I created the pandas frame. I also was able to get around the problem by saving it on my local machine with proper encoding, which is probably not good practice for Apache Spark.
  • gold_cy
    gold_cy over 7 years
    I upgraded my spark and spark.read.csv worked like a charm. I was able to bypass Pandas and avoid this whole issue. Much thanks!
  • user4601931
    user4601931 over 7 years
    No problem, Dmitry!
  • Itachi
    Itachi over 6 years
    I would be glad if anyone could suggest me a direct way to convert nan to None
  • LePuppy
    LePuppy almost 5 years
    I filled NaN with 0s and it didn't solve the error.
  • Itachi
    Itachi almost 5 years
    @LePuppy, what is the datatype of your column, also, check my updated solution, it's independent of datatype of a column and should work
  • LePuppy
    LePuppy almost 5 years
    I have string and double types. I figured out converting all to string enabled me to create the spark dataframe. Then, I still can use cast to convert column types.
  • Itachi
    Itachi almost 5 years
    @LePuppy that's just too much work, isn't this one takes care of all of that in more systematic manner
  • LePuppy
    LePuppy almost 5 years
    I get a ValuError 'The truth value of a Series is ambiguous.' Indeed your lambda function apply to each Series then x != x raises this error
  • Itachi
    Itachi almost 5 years
    am i correct in assuming, you are running apply on pd.DataFrame and not pd.Series. use apply or applymap appropriately
  • LePuppy
    LePuppy almost 5 years
    Yes I am. But isn't that what your code does ? I'm lost
  • Itachi
    Itachi almost 5 years
    In my code, it is a pandas Series, you need to get ur Dataframe to series level using new_df_1['column_name'].apply(func)
  • LePuppy
    LePuppy almost 5 years
    Of course, I just assumed new_df_1 was a dataframe. You should consider changing its name to something closer than a Series
  • LePuppy
    LePuppy almost 5 years
    And still, it didn't solve my issue. Maybe we're talking about two different things without knowing