What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming?

112,375

Solution 1

First of all shuffling is the process of transfering data from the mappers to the reducers, so I think it is obvious that it is necessary for the reducers, since otherwise, they wouldn't be able to have any input (or input from every mapper). Shuffling can start even before the map phase has finished, to save some time. That's why you can see a reduce status greater than 0% (but less than 33%) when the map status is not yet 100%.

Sorting saves time for the reducer, helping it easily distinguish when a new reduce task should start. It simply starts a new reduce task, when the next key in the sorted input data is different than the previous, to put it simply. Each reduce task takes a list of key-value pairs, but it has to call the reduce() method which takes a key-list(value) input, so it has to group values by key. It's easy to do so, if input data is pre-sorted (locally) in the map phase and simply merge-sorted in the reduce phase (since the reducers get data from many mappers).

Partitioning, that you mentioned in one of the answers, is a different process. It determines in which reducer a (key, value) pair, output of the map phase, will be sent. The default Partitioner uses a hashing on the keys to distribute them to the reduce tasks, but you can override it and use your own custom Partitioner.

A great source of information for these steps is this Yahoo tutorial (archived).

A nice graphical representation of this is the following (shuffle is called "copy" in this figure):

enter image description here

Note that shuffling and sorting are not performed at all if you specify zero reducers (setNumReduceTasks(0)). Then, the MapReduce job stops at the map phase, and the map phase does not include any kind of sorting (so even the map phase is faster).

UPDATE: Since you are looking for something more official, you can also read Tom White's book "Hadoop: The Definitive Guide". Here is the interesting part for your question.
Tom White has been an Apache Hadoop committer since February 2007, and is a member of the Apache Software Foundation, so I guess it is pretty credible and official...

Solution 2

Let's revisit key phases of Mapreduce program.

The map phase is done by mappers. Mappers run on unsorted input key/values pairs. Each mapper emits zero, one, or multiple output key/value pairs for each input key/value pairs.

The combine phase is done by combiners. The combiner should combine key/value pairs with the same key. Each combiner may run zero, once, or multiple times.

The shuffle and sort phase is done by the framework. Data from all mappers are grouped by the key, split among reducers and sorted by the key. Each reducer obtains all values associated with the same key. The programmer may supply custom compare functions for sorting and a partitioner for data split.

The partitioner decides which reducer will get a particular key value pair.

The reducer obtains sorted key/[values list] pairs, sorted by the key. The value list contains all values with the same key produced by mappers. Each reducer emits zero, one or multiple output key/value pairs for each input key/value pair.

Have a look at this javacodegeeks article by Maria Jurcovicova and mssqltips article by Datta for a better understanding

Below is the image from safaribooksonline article

enter image description here

Solution 3

I thought of just adding some points missing in above answers. This diagram taken from here clearly states the what's really going on.

enter image description here

If I state again the real purpose of

  • Split: Improves the parallel processing by distributing the processing load across different nodes (Mappers), which would save the overall processing time.

  • Combine: Shrinks the output of each Mapper. It would save the time spending for moving the data from one node to another.

  • Sort (Shuffle & Sort): Makes it easy for the run-time to schedule (spawn/start) new reducers, where while going through the sorted item list, whenever the current key is different from the previous, it can spawn a new reducer.

Solution 4

Some of the data processing requirements doesn't need sort at all. Syncsort had made the sorting in Hadoop pluggable. Here is a nice blog from them on sorting. The process of moving the data from the mappers to the reducers is called shuffling, check this article for more information on the same.

Solution 5

I've always assumed this was necessary as the output from the mapper is the input for the reducer, so it was sorted based on the keyspace and then split into buckets for each reducer input. You want to ensure all the same values of a Key end up in the same bucket going to the reducer so they are reduced together. There is no point sending K1,V2 and K1,V4 to different reducers as they need to be together in order to be reduced.

Tried explaining it as simply as possible

Share:
112,375
Nithin
Author by

Nithin

Updated on July 08, 2022

Comments

  • Nithin
    Nithin almost 2 years

    In Map Reduce programming the reduce phase has shuffling, sorting and reduce as its sub-parts. Sorting is a costly affair.

    What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming?

  • Nithin
    Nithin about 10 years
    If we want to send k1,v1 and k1,v4 to same reducer we can do shuffling . then what is the purpose of sorting ?
  • BasicHorizon
    BasicHorizon about 10 years
    It does the sorting for multiple reasons one reason is, when a MapReduce Job is sending all of the KV pairs to a reducer if the input is not sorted It would have to scan all of the Mapper outputs to pick up every instance of K1,VX. whereas if the Mapper output is sorted as soon as K2,VX is picked up you know that all of K1,VX has been picked up and that set can be sent off to a reducer for processing, the benefit of this is you don't have to wait for every reducer to be ready in order for each of them to start reducing.
  • BasicHorizon
    BasicHorizon about 10 years
    Also when it comes to aggregation, if you specify you want to Aggregate all of K1,V1 if the input to the reducer is sorted as soon as the reducer picks up on K2,V2 it knows that no more instances of K1,V1 exists so it can finish it's aggregation whereas if reducer input is not sorted it will have to scan the entire input for K1,V1
  • MaxNevermind
    MaxNevermind almost 8 years
    "Sorting saves time for the reducer, helping it easily distinguish when a new reduce task should start. It simply starts a new reduce task, when the next key in the sorted input data is different than the previous, to put it simply." I don't get this part. Mapper uses a partitioner to divide spills into partitions locally, each partition then send to a reduce. How sorting helps here?
  • vefthym
    vefthym almost 8 years
    @MaxNevermind If you have x reduce tasks (partitions), it doesn't mean that you will end up calling the reduce() method x times. It will be called once for every distinct key. So one reduce task can call the reduce() method several times.
  • MaxNevermind
    MaxNevermind almost 8 years
    "It will be called once for every distinct key" Why? Mapper forms partitions whatever way it wants(not necessary one partition for every distinct key), then each partition goes to reducer, is it wrong?
  • vefthym
    vefthym almost 8 years
    @MaxNevermind Mapper outputs keys and values, it does not form partitions. The partitions are defined by the number of reduce tasks that the user defines and the Partitioner implementation. The outputs of all Mappers that have the same key are going to the same reduce() method. This cannot be changed. But what can be changed is what other keys (if any) will be placed in the same partition and thus, will be handled by the same task. A reduce task can call the reduce() function more than once, but only once for every key.
  • MaxNevermind
    MaxNevermind almost 8 years
    ok I think I've got it. My problem was that I forgot that reduce takes a list of values as an argument not just one key-value pair. I think you should elaborate this in your answer: "Each reduce task takes a list of key-value pairs but it has to call reduce method which takes a key-List<value>, so it has to group values by key, it's easy to do if input data is pre-sorted in a mapper stage"
  • vefthym
    vefthym almost 8 years
    @MaxNevermind thanks, I have added that in my answer.
  • Jeff Evans
    Jeff Evans over 6 years
    I think there is a typo in the image (which I realize is just copied here). I believe the ie strings under Reducers and Output should actually be is.
  • Joel
    Joel almost 4 years
    Where would the partition step come into this graph? After map and before combine?
  • Supun Wijerathne
    Supun Wijerathne almost 4 years
    @Joel I hope you refer to 'split' step?
  • Joel
    Joel almost 4 years
    No I mean the partition step, it decides what reducer to send the data to, using a simple hash modulo by default, after some more research I believe it comes after the combine step, before shuffle & sort.
  • Supun Wijerathne
    Supun Wijerathne almost 4 years
    @Joel I'm not pretty much clear what you intend to describe. In a nutshell, the exact sequence of steps can be pretty much problem-specific. I can say that for some scenarios even sorting is not necessary. Coming back to your input, if I specifically talk about the above simple wordcount example, I don't really see any need for such a partitioning to decide reducers. Here it's quite straight forward to spawn reduces per key. But I can guess that your point can be valid for some scenarios. Frankly, I don't have an exact clear idea about that.
  • rahul sharma
    rahul sharma about 3 years
    can you tell which node is responsible for shuffle and sort?Is it same some specific node we specify or is it some map node or reduce node
  • Supun Wijerathne
    Supun Wijerathne about 3 years
    @rahulsharma the whole map-reduce system follows master-slave co-ordination. So each inter-node action is based on that.
  • user124
    user124 about 3 years
    can u tell me why do mapper needs offset,as value of that offset will be available in "value" part ?
  • MiloMinderbinder
    MiloMinderbinder over 2 years
    I think the reason is this: You hashmap the keys to a given reducer. So, one scan through the entire key space is enough to map each (k,v) to a reducer in such a way that same key goes to same partition. Sorting is done to get (k,v1,v2,v3,v4,...) that the reducer logic will be run on. This is the hadoop's way of groupby
  • user3426711
    user3426711 about 2 years
    @Joel the partition step doesnt decide to which reducer the data is sent. It just 'partitions' data which means splits it into micro-partitions that contain the same group of keys. After this step there is a sort operation which goes to each partition and sorts it to make the reducer's task even easier. Then these partitions are distributed in a group by manner to each reducer in order. The step of partitioning doesn't say this partition A goes to Reducer 1 explicitly.