Can't pickle _thread.lock objects Pyspark send request to elasticseach

10,006

Connections objects in general, are not serializable so cannot be passed by closure. You have to use foreachPartition pattern:

def sendPut(docs):
    es = ... # Initialize es object
    for doc in docs
        es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)

myJson = (dataStream
    .map(decodeJson)
    .map(addSentiment)
    # Here you need an action.
    # `map` is lazy, and `pprint` doesn't guarantee complete execution
    .foreachPartition(sendPut))

If you want to return something use mapPartitions:

def sendPut(docs):
    es = ... # Initialize es object
    for doc in docs
        yield es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)


myJson = (dataStream
   .map(decodeJson)
   .map(addSentiment)
   .mapPartitions(sendPut))

but you'll need an additional action to force execution.

Share:
10,006
Zhongwei WANG
Author by

Zhongwei WANG

Updated on June 19, 2022

Comments

  • Zhongwei WANG
    Zhongwei WANG almost 2 years

    I am using pyspark streaming to collect data from tweepy. After all the set up, I send the dict(json) to elasticsearch via elasticsearch.index(). But I get "can't pickle_thread.lock objects" error and other 63 errors. The track back log is too long to show in my console!

    The design is that I get a json/dict type file, convert it into an DStream, add another feature names "sentiment" to it by calling TextBlob in a map() function. It all works fine, but when I add another map function to call elasticsearch.index(), I get the error.

    Below is the part of the super long error log in my console.

    Blockquote During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/streaming/util.py", line 105, in dumps func.func, func.rdd_wrap_func, func.deserializers))) File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/serializers.py", line 460, in dumps return cloudpickle.dumps(obj, 2) File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 704, in dumps cp.dump(obj) File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 162, in dump raise pickle.PicklingError(msg) _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects at org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144) at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101) at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply(PythonDStream.scala:100) at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply(PythonDStream.scala:100) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 63 more

    Part of my code looks like this:

    def sendPut(doc):
      res = es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)
      return doc
    myJson = dataStream.map(decodeJson).map(addSentiment).map(sendPut)
    myJson.pprint()
    

    Here is the decodeJson function:

    def decodeJson(str):
      return json.loads(str)
    

    Here is the addSentiment function:

    def addSentiment(dic):
      dic['Sentiment'] = get_tweet_sentiment(dic['Text'])
      return dic
    

    And here is the get_tweet_sentiment function:

    def get_tweet_sentiment(tweet):
      analysis = TextBlob(tweet)
      if analysis.sentiment.polarity > 0:
        return 'positive'
      elif analysis.sentiment.polarity == 0:
        return 'neutral'
      else:
        return 'negative'
    
  • Zhongwei WANG
    Zhongwei WANG about 6 years
    Awesome! Problem solved. The problem is that I need to initialize es inside the function, not as a global variable. Otherwise, I will need to broadcast the es.