Exceeding `spark.driver.maxResultSize` without bringing any data to the driver
Solution 1
Just because you don't collect anything explicitly it doesn't mean that nothing is collected. Since the problem occurs during a join, the most likely explanation is that execution plan uses broadcast join. In that case Spark will collect data first, and then broadcast it.
Depending on the configuration and pipeline:
- Make sure that
spark.sql.autoBroadcastJoinThreshold
is smaller thanspark.driver.maxResultSize
. - Make sure you don't force broadcast join on a data of unknown size.
- While nothing indicates it is the problem here, be careful when using Spark ML utilities. Some of these (most notably indexers) can bring significant amounts of data to the driver.
To determine if broadcasting is indeed the problem please check the execution plan, and if needed, remove broadcast hints and disable automatic broadcasts:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
Solution 2
In theory, exception is not always related with customer data.
Technical information about tasks execution results send to Driver Node in serialized form, and this information can take more memory then threshold.
Prove: Error message located in org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
Method called in org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
If tasks number is huge, mentioned exception can occurs.
Related videos on Youtube
user4601931
Updated on February 09, 2020Comments
-
user4601931 about 4 years
I have a Spark application that performs a large join
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
and then aggregates the resulting DataFrame down to one with maybe 13k rows. In the course of the join, the job fails with the following error message:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
This was happening before without setting
spark.driver.maxResultSize
, and so I setspark.driver.maxResultSize=2G
. Then, I made a slight change to the join condition, and the error resurfaces.Edit: In resizing the cluster, I also doubled the number of partitions the DataFrame assumes in a
.coalesce(256)
to a.coalesce(512)
, so I can't be sure it's not because of that.My question is, since I am not collecting anything to the driver, why should
spark.driver.maxResultSize
matter at all here? Is the driver's memory being used for something in the join that I'm not aware of?-
Valentin P. over 6 yearsHave same issue, do you have any progress here?
-
user4601931 over 6 years@ValentinP. For my job, I simply increased the option to 3G and it worked. This still doesn't answer the question of what this parameter does and why it's necessary when --deploy-mode client is the setup.
-
moriarty007 over 5 years@user4601931 Can you paste the actual scala code whiich you are running?
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
line won't run any job. You must be doing some transformation which triggers the job. -
Jasper-M over 5 yearsI've had this issue after doing a
df.write.csv(...)
. -
Jacek Laskowski over 5 yearsCould you check how many partitions do you have in
joined
? Something likejoined.queryExecution.toRdd.getNumPartitions
. I'm curious why you had78021 tasks
. Could be that a better solution is to lower the number of partitions for the datasets in join? -
Jacek Laskowski over 5 yearsCould you also attach the physical query plan (from web UI or
df.explain
)? -
user4601931 over 5 years@JacekLaskowski Unfortunately, I don't have the code for this project anymore, and it's been so long that I forgot most of what it was about anyway. Sorry, but thanks for the renewed interest in the question.
-
Jacek Laskowski over 5 years@Jasper-M Could you shed more light on your case that seems similar? Could you edit the question and throw in more info? Thanks.
-
Jasper-M over 5 years@JacekLaskowski I can't show the query plan here, but the stage where it crashes consists of +3000 tasks. It's a lot of
FileScanRDD
followed byMapPartitionsRDD
. Then a lot ofUnionRDD
. With finally a distinct operation on the result of all the unions. But no (broadcast) joins or collect... I can of course see why this execution plan is not ideal, but not wherespark.driver.maxResultSize
comes in. When--deploy-mode cluster
is set there is no crash.
-
-
Jasper-M over 5 yearsI suppose Spark should be smart enough not to use a broadcast join that exceeds
maxResultSize
unless you explicitly tell it to or unless you stupidly changeautoBroadcastJoinThreshold
to a higher value? -
Jasper-M over 5 yearsI'm pretty sure you are on to something, but could you expand on what that technical information is and how that piece of code proves it? I am not extremely familiar with Spark's internals, so to me it looks like the purpose of
TaskResultGetter
is returning the actual results of the computations that a task performed to the driver. Which should only be necessary for actions likecollect
,take
, perhapscount
, ... -
pasha701 over 5 yearshow to check: just run small join in debug in local mode, set break point on specified place, and some technical information appeared (at least some accumulators)
-
thentangler over 3 yearsHow do you disable automatic broadcasts?
-
alex over 3 years@thentangler setting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
disables automatic broadcasts