Pyspark: Serialized task exceeds max allowed. Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values

18,350

Solution 1

i'm in same trouble, then i solve it. the cause is spark.rpc.message.maxSize if default set 128M, you can change it when launch a spark client, i'm work in pyspark and set the value to 1024, so i write like this:

pyspark --master yarn --conf spark.rpc.message.maxSize=1024

solve it.

Solution 2

I had the same issue and it wasted a day of my life that I am never getting back. I am not sure why this is happening, but here is how I made it work for me.

Step 1: Make sure that PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set. Turned out that python in worker(2.6) had a different version than in driver(3.6). You should check if environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

I fixed it by simply switching my kernel from Python 3 Spark 2.2.0 to Python Spark 2.3.1 in Jupyter. You may have to set it up manually. Here is how to make sure your PySpark is set up correctly https://mortada.net/3-easy-steps-to-set-up-pyspark.html

STEP 2: If that doesn't work, try working around it: This kernel switch worked for DFs that I haven't added any columns to: spark_df -> panda_df -> back_to_spark_df .... but it didn't work on the DFs where I had added 5 extra columns. So what I tried and it worked was the following:

# 1. Select only the new columns: 

    df_write = df[['hotel_id','neg_prob','prob','ipw','auc','brier_score']]


# 2. Convert this DF into Spark DF:



     df_to_spark = spark.createDataFrame(df_write)
     df_to_spark = df_to_spark.repartition(100)
     df_to_spark.registerTempTable('df_to_spark')


# 3. Join it to the rest of your data:

    final = df_to_spark.join(data,'hotel_id')


# 4. Then write the final DF. 

    final.write.saveAsTable('schema_name.table_name',mode='overwrite')

Hope that helps!

Solution 3

I had the same problem but using Watson studio. My solution was:

sc.stop()
configura=SparkConf().set('spark.rpc.message.maxSize','256')
sc=SparkContext.getOrCreate(conf=configura)
spark = SparkSession.builder.getOrCreate()

I hope it help someone...

Share:
18,350
Wendy De Wit
Author by

Wendy De Wit

Updated on June 05, 2022

Comments

  • Wendy De Wit
    Wendy De Wit about 2 years

    I'm doing calculations on a cluster and at the end when I ask summary statistics on my Spark dataframe with df.describe().show() I get an error:

    Serialized task 15:0 was 137500581 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values

    In my Spark configuration I already tried to increase the aforementioned parameter:

    spark = (SparkSession
             .builder
             .appName("TV segmentation - dataprep for scoring")
             .config("spark.executor.memory", "25G")
             .config("spark.driver.memory", "40G")
             .config("spark.dynamicAllocation.enabled", "true")
             .config("spark.dynamicAllocation.maxExecutors", "12")
             .config("spark.driver.maxResultSize", "3g")
             .config("spark.kryoserializer.buffer.max.mb", "2047mb")
             .config("spark.rpc.message.maxSize", "1000mb")
             .getOrCreate())
    

    I also tried to repartition my dataframe using:

    dfscoring=dfscoring.repartition(100)
    

    but still I keep on getting the same error.

    My environment: Python 3.5, Anaconda 5.0, Spark 2

    How can I avoid this error ?