How to group by multiple keys in spark?
My guess is that you want to transpose the data according to multiple fields.
A simple way is to concatenate the target fields that you will group by, and make it a key in a paired RDD. For example:
lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1']) rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x + ', ' + x, x)).reduceByKey(lambda a, b: a + ', ' + b) print rdd.collect()
Then you will get the transposed result.
[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]
I grouped ((id1,t1),((p1,5.0),(p2,6.0)) and so on ... as my map function. Later, I reduce using map_group which creates an array for [p1,p2, . . . ] and fills in values in their respective positions.
def map_group(pgroup): x = np.zeros(19) x = 1 value_list = pgroup for val in value_list: fno = val.split('.') x[int(fno)-5] = val return x tgbr = tfile.map(lambda d: ((d,d),[(d,d)])) \ .reduceByKey(lambda p,q:p+q) \ .map(lambda d: (d, map_group(d)))
This does feel like an expensive solution in terms of computation. But works for now.
Rahul 3 months
I have a bunch of tuples which are in form of composite keys and values. For example,
tfile.collect() = [(('id1','pd1','t1'),5.0), (('id2','pd2','t2'),6.0), (('id1','pd1','t2'),7.5), (('id1','pd1','t3'),8.1) ]
I want to perform sql like operations on this collection, where I can aggregate the information based on id[1..n] or pd[1..n] . I want to implement using the vanilla pyspark apis and not using SQLContext. In my current implementation I am reading from a bunch of files and merging the RDD.
def readfile(): fr = range(6,23) tfile = sc.union([sc.textFile(basepath+str(f)+".txt") .map(lambda view: set_feature(view,f)) .reduceByKey(lambda a, b: a+b) for f in fr]) return tfile
I intend to create an aggregated array as a value. For example,
agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]
where 5.0,7.5,8.1 represent [t1,t2,t3] . I am currently, achieving the same by vanilla python code using dictionaries. It works fine for smaller data sets. But I worry as this may not scale for larger data sets. Is there an efficient way achieving the same using pyspark apis ?
Rahul over 7 yearsThis is definitely an interesting way of solving this. I figured another way of achieving the same. But I guess, your method might be much faster than mine. I am sharing my own solution as well.
Daniel Darabos over 7 yearsDoes PySpark not have
dapangmao over 7 yearsPySpark has the method groupBykey. However, the question tends to group records together based on two fields, instead of doing aggregation such as
SELECT sum(value) FROM data GROUP BY id, pd. So
groupBykeymay not help.