Pyspark: Serialized task exceeds max allowed. Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values
Solution 1
i'm in same trouble, then i solve it.
the cause is spark.rpc.message.maxSize
if default set 128M
, you can change it when launch a spark client, i'm work in pyspark and set the value to 1024, so i write like this:
pyspark --master yarn --conf spark.rpc.message.maxSize=1024
solve it.
Solution 2
I had the same issue and it wasted a day of my life that I am never getting back. I am not sure why this is happening, but here is how I made it work for me.
Step 1: Make sure that PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set. Turned out that python in worker(2.6) had a different version than in driver(3.6). You should check if environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
I fixed it by simply switching my kernel from Python 3 Spark 2.2.0 to Python Spark 2.3.1 in Jupyter. You may have to set it up manually. Here is how to make sure your PySpark is set up correctly https://mortada.net/3-easy-steps-to-set-up-pyspark.html
STEP 2: If that doesn't work, try working around it: This kernel switch worked for DFs that I haven't added any columns to: spark_df -> panda_df -> back_to_spark_df .... but it didn't work on the DFs where I had added 5 extra columns. So what I tried and it worked was the following:
# 1. Select only the new columns:
df_write = df[['hotel_id','neg_prob','prob','ipw','auc','brier_score']]
# 2. Convert this DF into Spark DF:
df_to_spark = spark.createDataFrame(df_write)
df_to_spark = df_to_spark.repartition(100)
df_to_spark.registerTempTable('df_to_spark')
# 3. Join it to the rest of your data:
final = df_to_spark.join(data,'hotel_id')
# 4. Then write the final DF.
final.write.saveAsTable('schema_name.table_name',mode='overwrite')
Hope that helps!
Solution 3
I had the same problem but using Watson studio. My solution was:
sc.stop()
configura=SparkConf().set('spark.rpc.message.maxSize','256')
sc=SparkContext.getOrCreate(conf=configura)
spark = SparkSession.builder.getOrCreate()
I hope it help someone...
![Wendy De Wit](https://lh4.googleusercontent.com/-tMZju7X9xWA/AAAAAAAAAAI/AAAAAAAAAAA/AAnnY7rt2UC3pCwT5YFBx85QiP8V36AecQ/mo/photo.jpg?sz=256)
Wendy De Wit
Updated on June 05, 2022Comments
-
Wendy De Wit about 2 years
I'm doing calculations on a cluster and at the end when I ask summary statistics on my Spark dataframe with df.describe().show() I get an error:
Serialized task 15:0 was 137500581 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values
In my Spark configuration I already tried to increase the aforementioned parameter:
spark = (SparkSession .builder .appName("TV segmentation - dataprep for scoring") .config("spark.executor.memory", "25G") .config("spark.driver.memory", "40G") .config("spark.dynamicAllocation.enabled", "true") .config("spark.dynamicAllocation.maxExecutors", "12") .config("spark.driver.maxResultSize", "3g") .config("spark.kryoserializer.buffer.max.mb", "2047mb") .config("spark.rpc.message.maxSize", "1000mb") .getOrCreate())
I also tried to repartition my dataframe using:
dfscoring=dfscoring.repartition(100)
but still I keep on getting the same error.
My environment: Python 3.5, Anaconda 5.0, Spark 2
How can I avoid this error ?