Reduce a key-value pair into a key-list pair with Apache Spark

104,842

Solution 1

Map and ReduceByKey

Input type and output type of reduce must be the same, therefore if you want to aggregate a list, you have to map the input to lists. Afterwards you combine the lists into one list.

Combining lists

You'll need a method to combine lists into one list. Python provides some methods to combine lists.

append modifies the first list and will always return None.

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extend does the same, but unwraps lists:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

Both methods return None, but you'll need a method that returns the combined list, therefore just use the plus sign.

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]

Spark

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
         .map(lambda actor: (actor.split(",")[0], actor)) \ 

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)

CombineByKey

It's also possible to solve this with combineByKey, which is used internally to implement reduceByKey, but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.

GroupByKey

It's also possible to solve this with groupByKey, but it reduces parallelization and therefore could be much slower for big data sets.

Solution 2

tl;dr If you really require operation like this use groupByKey as suggested by @MariusIon. Every other solution proposed here is either bluntly inefficient are at least suboptimal compared to direct grouping.

reduceByKey with list concatenation is not an acceptable solution because:

  • Requires initialization of O(N) lists.
  • Each application of + to a pair of lists requires full copy of both lists (O(N)) effectively increasing overall complexity to O(N2).
  • Doesn't address any of the problems introduced by groupByKey. Amount of data that has to be shuffled as well as the size of the final structure are the same.
  • Unlike suggested by one of the answers there is no difference in a level of parallelism between implementation using reduceByKey and groupByKey.

combineByKey with list.extend is a suboptimal solution because:

  • Creates O(N) list objects in MergeValue (this could be optimized by using list.append directly on the new item).
  • If optimized with list.append it is exactly equivalent to an old (Spark <= 1.3) implementation of a groupByKey and ignores all the optimizations introduced by SPARK-3074 which enables external (on-disk) grouping of the larger-than-memory structures.

Solution 3

I'm kind of late to the conversation, but here's my suggestion:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]

Solution 4

You can use the RDD groupByKey method.

Input:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

Output:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]

Solution 5

If you want to do a reduceByKey where the type in the reduced KV pairs is different than the type in the original KV pairs, then one can use the function combineByKey. What the function does is take KV pairs and combine them (by Key) into KC pairs where C is a different type than V.

One specifies 3 functions, createCombiner, mergeValue, mergeCombiners. The first specifies how to transform a type V into a type C, the second describes how to combine a type C with a type V, and the last specifies how to combine a type C with another type C. My code creates the K-V pairs:

Define the 3 functions as follows:

def Combiner(a):    #Turns value a (a tuple) into a list of a single tuple.
    return [a]

def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
    a.extend([b])
    return a

def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
    a.extend(b)
    return a

Then, My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

The best resource I found on using this function is: http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

As others have pointed out, a.append(b) or a.extend(b) return None. So the reduceByKey(lambda a, b: a.append(b)) returns None on the first pair of KV pairs, then fails on the second pair because None.append(b) fails. You could work around this by defining a separate function:

 def My_Extend(a,b):
      a.extend(b)
      return a

Then call reduceByKey(lambda a, b: My_Extend(a,b)) (The use of the lambda function here may be unnecessary, but I have not tested this case.)

Share:
104,842
TravisJ
Author by

TravisJ

I am a mathematician by training (Ph. D. work studying extremal problems related to hypergraphs, Turan type problems) who has been swallowed into the world of machine learning, data analytics, and high performance computing.

Updated on January 05, 2022

Comments

  • TravisJ
    TravisJ over 2 years

    I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn) into one Key-Multivalue pair (K, [V1, V2, ..., Vn]). I feel like I should be able to do this using the reduceByKey function with something of the flavor:

    My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
    

    The error that I get when this occurs is:

    'NoneType' object has no attribue 'append'.

    My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).

  • TravisJ
    TravisJ over 9 years
    So you have exactly the right idea for what I have, in terms of kv_input, and what I want, kmv_output. I believe your code would work find for serial python, but because I'm using Spark to do thing in parallel, my kv_input has type RDD (Resilient Distributed Data)... which is not iterable (so I cannot do something like for k,v in kv_input).
  • Dave J
    Dave J over 9 years
    ahh. ok. my fault, don't know spark. I let the answer here for those who don't know/notice that. like me :P
  • TravisJ
    TravisJ over 9 years
    No worries. I'm quite new to it and I appreciate that you took the time to demonstrate this solution.
  • TravisJ
    TravisJ over 9 years
    The P.S. is very helpful. I did a quick change to retList = a.append([b]) then return retList and this fixes the first problem, but I have a new minor problem which I should be able to fix (the code generates a list which contains both tuples and lists).
  • Christian Strempfer
    Christian Strempfer over 9 years
    @TravisJ: You need to use extend instead of append, like I did in my answer. See also Python - append vs. extend.
  • nikosd
    nikosd almost 9 years
    Using groupByKey is discouraged because it leads to excessive shuffling. You should use reduceByKey (see this link) or combineByKey instead, as suggested by @Christian_Strempfer
  • syfantid
    syfantid over 8 years
    Is ReduceByKey in this case faster than GroupByKey? It produces the same result, so which is better? Is there a way to remove duplicates from the final list produced by ReduceByKey?
  • Christian Strempfer
    Christian Strempfer over 8 years
    @Sofia: As said, GroupByKey reduces parallelization, but if you're working with small data sets, that might not be a problem. Only a performance test can give you a specific answer. Removing duplicate values isn't built-in when using ReduceByKey, but you could easily add another step which does that or create your own Create method which takes care about it.
  • Christian Strempfer
    Christian Strempfer about 8 years
    Oops, I meant "you can create your own Combine method".
  • Mj1992
    Mj1992 over 7 years
    Hi, can you also help with an equivalent Java code for this. I want to achieve a similar kind of thing in Java
  • Davis Herring
    Davis Herring over 6 years
    Using + forces the growing list to be copied on every append, taking time quadratic in the final length of each list. extend() is the right answer--you wrap it in a function that returns the (growing) left-hand-side list.
  • ascetic652
    ascetic652 over 5 years
    Will the order of the list be maintained?
  • Christian Strempfer
    Christian Strempfer over 5 years
    @ascetic652: no, you could sort the items in the reduce step. Normally when working with big data sets you don't sort data during computation. You could store the results in a database and only sort them when querying from the front end.
  • Admin
    Admin almost 5 years
    GroupByKey is the correct solution here. People often misuse GroupByKey followed by Map to imitate ReduceByKey in an inefficient manner which is why GroupByKey is said to be less efficient than ReduceByKey. However, this is exactly the problem for which GroupByKey as been designed and optimized for and therefore for this problem it is actually the good solution.
  • notilas
    notilas over 4 years
    map(lambda (x,y): (x, [y])) has solved the concatenation problem (instead of merging). Thanks.
  • Admin
    Admin over 2 years
    As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.