How to resolve pickle error in pyspark?

11,642

The source of your problem is a following line:

null_cols[str(m)] = defaultdict(lambda: 0)

As you can read in the What can be pickled and unpickled? section of the pickle module documentation:

The following types can be pickled:

  • ...
  • functions defined at the top level of a module (using def, not lambda)
  • built-in functions defined at the top level of a module
  • ...

It should be clear that lambda: 0 doesn't meet above criteria. To make it work you can for example replace lambda expression with int:

null_cols[str(m)] = defaultdict(int)

How is it possible that we can pass lambda expression to the higher order functions in PySpark? The devil is in the detail. PySpark is using different serializers depending on a context. To serialize closures, including lambda expressions it is using custom cloudpickle which supports lambda expressions and nested functions. To handle data it is using default Python tools.


A few side notes:

  • I wouldn't use Python file objects to read data. It is not portable and won't work beyond local file system. You can use SparkContex.wholeTextFiles instead.
  • if you do make sure you close the connections. Using with statement is usually the best approach
  • you can safely strip newline characters before you split the line
Share:
11,642
makansij
Author by

makansij

I'm a PhD Student at University of Southern California.

Updated on June 04, 2022

Comments

  • makansij
    makansij almost 2 years

    I am iterating through files to gather information about the values in their columns and rows in a dictionary. I have the following code which works locally:

    def search_nulls(file_name):
        separator = ','
        nulls_dict = {}
        fp = open(file_name,'r')
        null_cols = {}
        lines = fp.readlines()
    
        for n,line in enumerate(lines):
            line = line.split(separator)
            for m,data in enumerate(line):
                data = data.strip('\n').strip('\r')
                if str(m) not in null_cols:
                    null_cols[str(m)] = defaultdict(lambda: 0)
                if len(data) <= 4:
                    null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1
    
        return null_cols
    
    
    files_to_process = ['tempfile.csv']
    results = map(lambda file: search_nulls(file), files_to_process)
    

    The above code works fine without spark. I comment the last two lines above, and I try with spark, since this is a prototype of something that will need to run distributed:

    os.environ['SPARK_HOME'] = <path_to_spark_folder>
    conf = SparkConf().setAppName("search_files").setMaster('local')
    
    sc = SparkContext(conf=conf)
    
    objects = sc.parallelize(files_to_process)
    resulting_object = \
        objects.map(lambda file_object: find_nulls(file_object))
    
    result = resulting_object.collect()
    

    When using spark, though, this results in the following error:

    File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
        process()
      File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
        bytes = self.serializer.dumps(vs)
      File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
        return pickle.dumps(obj, protocol)
    TypeError: expected string or Unicode object, NoneType found​
    

    I've been unable to find any obvious reason why this would fail, since it runs perfectly locally, and I am not sharing any files across worker nodes. In fact, I'm only running this on my local machine anyway.

    Does anyone know of a good reason why this might be failing?

  • makansij
    makansij over 8 years
    So, just to clarify, generally speaking, a lambda function that can be serialized locally should be able to be serialized by pyspark? It would be useful to know this for the purpose of testing things locally. Thanks for your persistence on this question.
  • zero323
    zero323 over 8 years
    Most of the time yes. You have to think when and where things happen and generally speaking I wouldn't overuse lambdas. Pretty much all common operations can be performed using built-in functions, without static typing there are error prone, inherently not testable, and surprisingly verbose.