Why does Yarn on EMR not allocate all nodes to running Spark jobs?

11,051

Solution 1

Okay, with the help of @sean_r_owen, I was able to track this down.

The problem was this: when setting spark.dynamicAllocation.enabled to true, spark.executor.instances shouldn't be set - an explicit value for that will override dynamic allocation and turn it off. It turns out that EMR sets it in the background if you do not set it yourself. To get the desired behaviour, you need to explicitly set spark.executor.instances to 0.

For the records, here is the contents of one of the files we pass to the --configurations flag when creating an EMR cluster:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },

    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },

    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.executor.instances": "0"
        }
    } 
]

This gives us an EMR cluster where Spark uses all the nodes, including added nodes, when running jobs. It also appears to use all/most of the memory and all (?) the cores.

(I'm not entirely sure that it's using all the actual cores; but it is definitely using more than 1 VCore, which it wasn't before, but following Glennie Helles's advice it is now behaving better and using half of the listed VCores, which seems to equal the actual number of cores...)

Solution 2

I observed the same behavior with nearly the same settings using emr-5.20.0. I didn't try to add nodes when the cluster is already running but using TASK nodes (together with just one CORE node). I'm using InstanceFleets to define MASTER, CORE and TASK nodes (with InstanceFleets I don't know which exact InstanceTypes I get and that is why I don't want to define the number of executors, cores and memory per executor myself but want that to be maximized/optimized automatically).

With this, it only uses two TASK nodes (probably the first two nodes which are ready to use?) but never scales up while more TASK nodes get provisioned and finishing the bootstrap phase.

What made it work in my case was to set the spark.default.parallelism parameter (to the number of total number of cores of my TASK nodes), which is the same number used for the TargetOnDemandCapacity or TargetSpotCapacity of the TASK InstanceFleet:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },
    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.default.parallelism", <Sum_of_Cores_of_all_TASK_nodes>
        }
    } 
]

For the sake of completeness: I'm using one CORE node and several TASK nodes mainly to make sure the cluster has at least 3 nodes (1 MASTER, 1 CORE and at least one TASK node). Before I tried to used only CORE nodes, but as in my case the number of cores is calculated depending on the actual task it was possible to end up with a cluster consisting of just one MASTER and one CORE node. Using the maximizeResourceAllocation option such a cluster runs for ever doing nothing because the executor running the yarn application master is occupying that single CORE node completely.

Share:
11,051

Related videos on Youtube

retnuH
Author by

retnuH

Updated on March 26, 2020

Comments

  • retnuH
    retnuH about 4 years

    I'm running a job on Apache Spark on Amazon Elastic Map Reduce (EMR). Currently I'm running on emr-4.1.0 which includes Amazon Hadoop 2.6.0 and Spark 1.5.0.

    When I start the job, YARN correctly has allocated all the worker nodes to the spark job (with one for the driver, of course).

    I have the magic "maximizeResourceAllocation" property set to "true", and the spark property "spark.dynamicAllocation.enabled" also set to "true".

    However, if I resize the emr cluster by adding nodes to the CORE pool of worker machines, YARN only adds some of the new nodes to the spark job.

    For example, this morning I had a job that was using 26 nodes (m3.2xlarge, if that matters) - 1 for the driver, 25 executors. I wanted to speed up the job so I tried adding 8 more nodes. YARN has picked up all of the new nodes, but only allocated 1 of them to the Spark job. Spark did successfully pick up the new node and is using it as an executor, but my question is why is YARN letting the other 7 nodes just sit idle?

    It's annoying for obvious reasons - I have to pay for the resources even though they're not being used, and my job hasn't sped up at all!

    Anybody know how YARN decides when to add nodes to running spark jobs? What variables come into play? Memory? V-Cores? Anything?

    Thanks in advance!

    • Glennie Helles Sindholt
      Glennie Helles Sindholt over 8 years
      Yes, welcome to the annoying world of YARN! Have you set yarn.scheduler.capacity.resource-calculator=org.apache.hadoo‌​p.yarn.util.resource‌​.DominantResourceCal‌​culator in the capacity-scheduler.xml?
    • retnuH
      retnuH over 8 years
      I have not! I can give that a try (probably not until next week) but I am starting to suspect that Spark itself won't request more nodes than there are at the time it is started - but I could be wrong!
    • Glennie Helles Sindholt
      Glennie Helles Sindholt over 8 years
      Good luck :) Personally, I think that YARN - not Spark - is the problem. I have never had any problems with resources not being utilized when I ran Spark in Standalone mode (before EMR 4.x). However, since upgrading to EMR 4.x (and hence YARN) I have had a million problems - including underutilization of cores...
  • Nipun
    Nipun about 8 years
    I am using EMR 4.4. I tried this configuration with c4.xlarge machines with 2 worker nodes and 1 master and running with yarn client mode but my resources are underutilized. Also spark only runs 1 executor while there are 2 executors will all 4 cores used for 1 executor and other executor is sitting idle
  • retnuH
    retnuH about 8 years
    Keep in mind that the driver of your program will take one worker node. So if you only have 2 worker nodes, 1 will be allocated to the driver, and the other will be used as an executor. If you want two executors, you need 3 workers. You basically end up paying for an almost entirely idle master and, depending on your spark program, a largely idle worker as well.
  • Madhava Carrillo
    Madhava Carrillo over 7 years
    dynamicAllocation is now the default configuration for EMRs > 4.0.0 (docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/…)
  • Jivan
    Jivan about 6 years
    Am I the only one getting the error Number of executors must be a positive number ?
  • retnuH
    retnuH about 6 years
    @Jivan I'm not working with this stack anymore at the moment; all I can attest to was the versions of things specified in the original questions. Everything worked with those versions of things at the time of the question + answer...
  • Jivan
    Jivan about 6 years
    Yeah that’s quite a common problem with this stack: what was valid 6 months ago may not be anymore
  • Hiranya Deka
    Hiranya Deka over 5 years
    I tried with : { "Classification": "spark-defaults", "Properties": { "spark.dynamicAllocation.enabled": "true", "spark.executor.instances": "0" } but it failed to start the spark session saying number of executor should be positive non zero number.Can let me know if it worked for you?
  • cozos
    cozos over 4 years
    @Jivan Number of executors must be a positive number should be fixed in Spark 2.4 issues.apache.org/jira/browse/SPARK-24241