How to optimize shuffling/sorting phase in a hadoop job

15,638

To begin with, since you are using a single node cluster, there is not much optimization you can do. You will have limited number of containers/slots on a single node cluster and for the amount of data you are processing (5 to 6 million keys), your jobs will always run slow and may also fail.

I am going to answer this question for a fully distributed Hadoop setup. There is a section ("Shuffle and Sort") in the book "Hadoop The Definitive Guide", which you should read for tuning the Shuffle and Sort phase. My answer is mainly influenced by the contents of this section and also my own experience with tuning the MapReduce jobs.

You can do the following to achieve the Shuffle and Sort efficiency:

  • Combiner: Using combiner will reduce the amount of data transferred to each of the the reducers, since combiner merges the output on the mapper side.
  • Number of reducers: Choose optimal number of reducers. If data size is huge, then one reducer is not a good idea. Also, setting the number of reducers to a high number, is not a good idea, since the number of reducers also determines the number of partitions on the mapper side. Look at the link here: https://github.com/paulhoule/infovore/wiki/Choosing-the-number-of-reducers
  • When to start the reducers:; You can control, when the reduce tasks are started. This is determined by configuration mapreduce.job.reduce.slowstart.completedmaps in YARN. It will not start the reducers until a certain percentage of mappers are completed. It is by default set to "0.05" (It means reducers start after 5% of mappers are completed). If the reducers are started early, then most of the reducers are idle, till all the mappers are completed. Also, the reducers may consume the slots, which could otherwise be used by the mappers for processing. By controlling this, you can use the mapper/reducers slots optimally and improve the time spent during the shuffle.
  • Compress Mapper Output: Its recommended to compress the mapper outputs (determined by configuration: mapreduce.map.output.compress), so that lesser data gets written to disk and gets transferred to reducers.
  • Tune config "mapreduce.task.io.sort.mb": Increase the buffer size used by the mappers during the sorting. This will reduce the number of spills to the disk.
  • Tune config "mapreduce.reduce.input.buffer.percent": If your reduce task has lesser memory requirements, then this value can be set to a high percentage. This means, higher amount of heap is used for retaining the map outputs during the reduce phase (after the shuffle phase), thus reducing the number of spills to disk.
  • Tune config "mapreduce.reduce.shuffle.parallelcopies": Number of threads used to copy map outputs to reducers. Check the link here: how to tune mapred.reduce.parallel.copies?

Following are the other configuration parameters which can be tuned to improve the Shuffle and Sort phase performance (see the description of these configurations here: https://hadoop.apache.org/docs/r2.4.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml):

  • mapreduce.map.sort.spill.percent: Determines the threshold for the in memory buffer used by the mapper. When this threshold is reached, the the contents of the buffer are spilled to disk. So this value determines the number of spills to disk
  • mapreduce.task.io.sort.factor: Minimum number of streams to be merged at once, during sorting. So, on the reducer side, if there are 50 mapper outputs and this value is set to 10, then there will be 5 rounds of merging (on an average 10 files for merge round).
  • mapreduce.shuffle.max.threads: Number of worker threads for copying the map outputs to reducers.
  • mapreduce.reduce.shuffle.input.buffer.percent: How much of heap should be used for storing the map output, during the shuffle phase in the reducer. This setting determines the amount of mapper output that can be held in memory, before it is spilled to disk.
  • mapreduce.reduce.shuffle.merge.percent: Threshold for starting the process of merge and spilling to disk
  • mapreduce.reduce.merge.inmem.threshold: Number of map outputs needed for starting the merge process. When either mapreduce.reduce.shuffle.merge.percent or mapreduce.reduce.merge.inmem.threshold is reached, then the map outputs are merged and spilled to disk.
Share:
15,638
HHH
Author by

HHH

Updated on June 04, 2022

Comments

  • HHH
    HHH about 2 years

    I'm doing some data preparation using a single node hadoop job. The mapper/combiner in my job outputs many keys (more than 5M or 6M) and obviously the job proceeds slowly or even fails. The mapping phase runs up to 120 mapper and there is just one reducer (these are automatically determined and I've not set any values for them). I want to optimize the job so that shuffling/sorting phase occurs more efficiently. I increased mapreduce.task.io.sort.mb to 300m but the job faile because its value was larger than mapper heap. I then set mapred.child.java.opts to -Xmx1024m but it again failed because it couldn't initialize an Output Collector. What are the best practices for these scenarios?

  • HHH
    HHH over 8 years
    that's right. but my main concern is about shuffling/sorting phase. It doesn't have anything to do with the Reducing phase.
  • Durga Viswanath Gadiraju
    Durga Viswanath Gadiraju over 8 years
    Shuffling/sorting are related number of reducers. As part of shuffling and sorting number of final intermediate files generated by mapper will be equivalent to number of reducers. If you have only one reducer, there will be only one final intermediate file for each mapper which is generated by other intermediate files. Merge operation happens in memory and it can run out of memory issue when there is only one reducer. Try increasing number of reducers, issue can be resolved. If you are doing some thing like count, which needs to be run using only one reducer, then you should consider combiner.
  • HHH
    HHH over 8 years
    Thanks. I'll, but how can I increase the shuffling memory?
  • Durga Viswanath Gadiraju
    Durga Viswanath Gadiraju over 8 years
    What is the configuration of your cluster? Are you using YARN/Classic?
  • HHH
    HHH over 8 years
    Yes, its hortonworks 2.2 running hadoop 2.6. Everything is the default value.
  • Durga Viswanath Gadiraju
    Durga Viswanath Gadiraju over 8 years
    Is it VM or cluster at office?
  • Ravindra babu
    Ravindra babu over 8 years
    Well written summary.