Create a dataframe from a list in pyspark.sql

11,761

Solution 1

You have a list of float64 and I think it doesn't like that type. On the other hand, when you hard code it it's just a list of float.
Here is a question with an answer that goes over on how to convert from numpy's datatype to python's native ones.

Solution 2

I have had this problem, the following is my solution that use 'float()' to convert the type:

1. At the beginning ,it's type is np.float64

my_rdd.collect()   
output ==>  [2.8,3.9,1.2]   

2. convert the type to python float

my_convert=my_rdd.map(lambda x: (float(x),)).collect()  
output ==> [(2.8,),(3.9,),(1.2,)]  

3. no error raise again

sqlContext.createDataFrame(my_convert).show()

4. for your sample ,I suggest :

li = example_data.map(lambda x: get_labeled_prediction(w,x)).map(lambda y:(float(y[0]),float(y[1]))).collect()
Share:
11,761

Related videos on Youtube

Q Yang
Author by

Q Yang

Updated on June 04, 2022

Comments

  • Q Yang
    Q Yang almost 2 years

    I am totally lost in a wired situation. Now I have a list li

    li = example_data.map(lambda x: get_labeled_prediction(w,x)).collect()
    print li, type(li)
    

    the output is like,

    [(0.0, 59.0), (0.0, 51.0), (0.0, 81.0), (0.0, 8.0), (0.0, 86.0), (0.0, 86.0), (0.0, 60.0), (0.0, 54.0), (0.0, 54.0), (0.0, 84.0)] <type 'list'>
    

    When I try to create a dataframe from this list:

    m = sqlContext.createDataFrame(l, ["prediction", "label"])
    

    It threw the error message:

    TypeError                                 Traceback (most recent call last)
    <ipython-input-90-4a49f7f67700> in <module>()
     56 l = example_data.map(lambda x: get_labeled_prediction(w,x)).collect()
     57 print l, type(l)
    ---> 58 m = sqlContext.createDataFrame(l, ["prediction", "label"])
     59 '''
     60 g = example_data.map(lambda x:gradient_summand(w, x)).sum()
    
    /databricks/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio)
    423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
    424         else:
    --> 425             rdd, schema = self._createFromLocal(data, schema)
    426         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    427         jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
    
    /databricks/spark/python/pyspark/sql/context.py in _createFromLocal(self, data, schema)
    339 
    340         if schema is None or isinstance(schema, (list, tuple)):
    --> 341             struct = self._inferSchemaFromList(data)
    342             if isinstance(schema, (list, tuple)):
    343                 for i, name in enumerate(schema):
    
    /databricks/spark/python/pyspark/sql/context.py in _inferSchemaFromList(self, data)
    239             warnings.warn("inferring schema from dict is deprecated,"
    240                           "please use pyspark.sql.Row instead")
    --> 241         schema = reduce(_merge_type, map(_infer_schema, data))
    242         if _has_nulltype(schema):
    243             raise ValueError("Some of types cannot be determined after inferring")
    
    /databricks/spark/python/pyspark/sql/types.py in _infer_schema(row)
    831         raise TypeError("Can not infer schema for type: %s" % type(row))
    832 
    --> 833     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    834     return StructType(fields)
    835 
    
    /databricks/spark/python/pyspark/sql/types.py in _infer_type(obj)
    808             return _infer_schema(obj)
    809         except TypeError:
    --> 810             raise TypeError("not supported type: %s" % type(obj))
    811 
    812 
    
    TypeError: not supported type: <type 'numpy.float64'>
    

    But when I hard code this list in line:

    tt = sqlContext.createDataFrame([(0.0, 59.0), (0.0, 51.0), (0.0, 81.0), (0.0, 8.0), (0.0, 86.0), (0.0, 86.0), (0.0, 60.0), (0.0, 54.0), (0.0, 54.0), (0.0, 84.0)], ["prediction", "label"])
    tt.collect()
    

    It works well.

    [Row(prediction=0.0, label=59.0),
     Row(prediction=0.0, label=51.0),
     Row(prediction=0.0, label=81.0),
     Row(prediction=0.0, label=8.0),
     Row(prediction=0.0, label=86.0),
     Row(prediction=0.0, label=86.0),
     Row(prediction=0.0, label=60.0),
     Row(prediction=0.0, label=54.0),
     Row(prediction=0.0, label=54.0),
     Row(prediction=0.0, label=84.0)]
    

    what caused this problem and how to fix it? Any hint will be appreciated.

  • Duong Trung Nghia
    Duong Trung Nghia almost 7 years
    I followed your proposed answer but it did not work for me. I get TypeError: not supported type: <class 'numpy.float32'>