How does Spark aggregate function - aggregateByKey work?

60,522

Solution 1

aggregateByKey() is almost identical to reduceByKey() (both calling combineByKey() behind the scenes), except you give a starting value for aggregateByKey(). Most people are familiar with reduceByKey(), so I will use that in the explanation.

The reason reduceByKey() is so much better is because it makes use of a MapReduce feature called a combiner. Any function like + or * can be used in this fashion because the order of the elements it is called on doesn't matter. This allows Spark to start "reducing" values with the same key even if they are not all in the same partition yet.

On the flip side groupByKey() gives you more versatility since you write a function that takes an Iterable, meaning you could even pull all the elements into an array. However it is inefficient because for it to work the full set of (K,V,) pairs have to be in one partition.

The step that moves the data around on a reduce type operation is generally called the shuffle, at the very simplest level the data is partitioned to each node (often with a hash partitioner), and then sorted on each node.

Solution 2

aggregateByKey() is quite different from reduceByKey. What happens is that reduceByKey is sort of a particular case of aggregateByKey.

aggregateByKey() will combine the values for a particular key, and the result of such combination can be any object that you specify. You have to specify how the values are combined ("added") inside one partition (that is executed in the same node) and how you combine the result from different partitions (that may be in different nodes). reduceByKey is a particular case, in the sense that the result of the combination (e.g. a sum) is of the same type that the values, and that the operation when combined from different partitions is also the same as the operation when combining values inside a partition.

An example: Imagine you have a list of pairs. You parallelize it:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))

Now you want to "combine" them by key producing a sum. In this case reduceByKey and aggregateByKey are the same:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))

Now, imagine that you want the aggregation to be a Set of the values, that is a different type that the values, that are integers (the sum of integers is also integers):

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))
Share:
60,522

Related videos on Youtube

EdwinGuo
Author by

EdwinGuo

Apache Spark: The number of cores vs. the number of executors Configuring Executor memory and number of executors per Worker node What factors decide the number of executors in a stand alone mode? clj zipfile: Reading a zip file using java api from clojure sbt https://github.com/CSUG/real_world_scala/blob/master/02_sbt.markdown

Updated on July 15, 2022

Comments

  • EdwinGuo
    EdwinGuo almost 2 years

    Say I have a distribute system on 3 nodes and my data is distributed among those nodes. for example, I have a test.csv file which exists on all 3 nodes and it contains 2 columns of:

    **row   | id,  c.**
    ---------------
    row1  | k1 , c1  
    row2  | k1 , c2  
    row3  | k1 , c3  
    row4  | k2 , c4  
    row5  | k2 , c5  
    row6  | k2 , c6  
    row7  | k3 , c7  
    row8  | k3 , c8  
    row9  | k3 , c9  
    row10 | k4 , c10   
    row11 | k4 , c11  
    row12 | k4 , c12 
    

    Then I use SparkContext.textFile to read the file out as rdd and so. So far as I understand, each spark worker node will read the a portion out from the file. So right now let's say each node will store:

    • node 1: row 1~4
    • node 2: row 5~8
    • node 3: row 9~12

    My question is that let's say I want to do computation on those data, and there is one step that I need to group the key together, so the key value pair would be [k1 [{k1 c1} {k1 c2} {k1 c3}]].. and so on.

    There is a function called groupByKey() which is very expensive to use, and aggregateByKey() is recommended to use. So I'm wondering how does groupByKey() and aggregateByKey() works under the hood? Can someone using the example I provided above to explain please? After shuffling where does the rows reside on each node?

  • EdwinGuo
    EdwinGuo almost 10 years
    ok, so lets' go back to my example, if node1 has row1~row3, node2 has row4~row6, and node3 has row7 to row12. and when I do groupByKey, will data move around at all or nothing moved since rdd with the same key is already on the same node? thanks
  • aaronman
    aaronman almost 10 years
    @EdwinGuo no the data could still move around, let's say you're using a hash partitioner, if all of k1 is on node 1 but k1's hash partitioner result is 3, it will still go to the third node
  • Adriano Almeida
    Adriano Almeida almost 9 years
    But if I don't care about the order, I just want to return an array with all the values, like groupByKey does. Is it possible with other syntax than groupbykey?
  • aaronman
    aaronman almost 9 years
    @AdrianoAlmeida if you don't even want to put identical keys into the same array you can use glom
  • SparkleGoat
    SparkleGoat almost 7 years
    Very thorough answer on how the two work, appreciate it!
  • rohanagarwal
    rohanagarwal almost 7 years
    can you please also post a java code, it is hard to understand scala