Exceeding `spark.driver.maxResultSize` without bringing any data to the driver

11,557

Solution 1

Just because you don't collect anything explicitly it doesn't mean that nothing is collected. Since the problem occurs during a join, the most likely explanation is that execution plan uses broadcast join. In that case Spark will collect data first, and then broadcast it.

Depending on the configuration and pipeline:

  • Make sure that spark.sql.autoBroadcastJoinThreshold is smaller than spark.driver.maxResultSize.
  • Make sure you don't force broadcast join on a data of unknown size.
  • While nothing indicates it is the problem here, be careful when using Spark ML utilities. Some of these (most notably indexers) can bring significant amounts of data to the driver.

To determine if broadcasting is indeed the problem please check the execution plan, and if needed, remove broadcast hints and disable automatic broadcasts:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Solution 2

In theory, exception is not always related with customer data.

Technical information about tasks execution results send to Driver Node in serialized form, and this information can take more memory then threshold.

Prove: Error message located in org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults

val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +

Method called in org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask

        val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
        case directResult: DirectTaskResult[_] =>
          if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
            return
          }

If tasks number is huge, mentioned exception can occurs.

Share:
11,557

Related videos on Youtube

user4601931
Author by

user4601931

Updated on February 09, 2020

Comments

  • user4601931
    user4601931 about 4 years

    I have a Spark application that performs a large join

    val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
    

    and then aggregates the resulting DataFrame down to one with maybe 13k rows. In the course of the join, the job fails with the following error message:

    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
    

    This was happening before without setting spark.driver.maxResultSize, and so I set spark.driver.maxResultSize=2G. Then, I made a slight change to the join condition, and the error resurfaces.

    Edit: In resizing the cluster, I also doubled the number of partitions the DataFrame assumes in a .coalesce(256) to a .coalesce(512), so I can't be sure it's not because of that.

    My question is, since I am not collecting anything to the driver, why should spark.driver.maxResultSize matter at all here? Is the driver's memory being used for something in the join that I'm not aware of?

    • Valentin P.
      Valentin P. over 6 years
      Have same issue, do you have any progress here?
    • user4601931
      user4601931 over 6 years
      @ValentinP. For my job, I simply increased the option to 3G and it worked. This still doesn't answer the question of what this parameter does and why it's necessary when --deploy-mode client is the setup.
    • moriarty007
      moriarty007 over 5 years
      @user4601931 Can you paste the actual scala code whiich you are running? val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date") line won't run any job. You must be doing some transformation which triggers the job.
    • Jasper-M
      Jasper-M over 5 years
      I've had this issue after doing a df.write.csv(...).
    • Jacek Laskowski
      Jacek Laskowski over 5 years
      Could you check how many partitions do you have in joined? Something like joined.queryExecution.toRdd.getNumPartitions. I'm curious why you had 78021 tasks. Could be that a better solution is to lower the number of partitions for the datasets in join?
    • Jacek Laskowski
      Jacek Laskowski over 5 years
      Could you also attach the physical query plan (from web UI or df.explain)?
    • user4601931
      user4601931 over 5 years
      @JacekLaskowski Unfortunately, I don't have the code for this project anymore, and it's been so long that I forgot most of what it was about anyway. Sorry, but thanks for the renewed interest in the question.
    • Jacek Laskowski
      Jacek Laskowski over 5 years
      @Jasper-M Could you shed more light on your case that seems similar? Could you edit the question and throw in more info? Thanks.
    • Jasper-M
      Jasper-M over 5 years
      @JacekLaskowski I can't show the query plan here, but the stage where it crashes consists of +3000 tasks. It's a lot of FileScanRDD followed by MapPartitionsRDD. Then a lot of UnionRDD. With finally a distinct operation on the result of all the unions. But no (broadcast) joins or collect... I can of course see why this execution plan is not ideal, but not where spark.driver.maxResultSize comes in. When --deploy-mode cluster is set there is no crash.
  • Jasper-M
    Jasper-M over 5 years
    I suppose Spark should be smart enough not to use a broadcast join that exceeds maxResultSize unless you explicitly tell it to or unless you stupidly change autoBroadcastJoinThreshold to a higher value?
  • Jasper-M
    Jasper-M over 5 years
    I'm pretty sure you are on to something, but could you expand on what that technical information is and how that piece of code proves it? I am not extremely familiar with Spark's internals, so to me it looks like the purpose of TaskResultGetter is returning the actual results of the computations that a task performed to the driver. Which should only be necessary for actions like collect, take, perhaps count, ...
  • pasha701
    pasha701 over 5 years
    how to check: just run small join in debug in local mode, set break point on specified place, and some technical information appeared (at least some accumulators)
  • thentangler
    thentangler over 3 years
    How do you disable automatic broadcasts?
  • alex
    alex over 3 years
    @thentangler setting spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) disables automatic broadcasts