How to group by multiple keys in spark?


Solution 1

My guess is that you want to transpose the data according to multiple fields.

A simple way is to concatenate the target fields that you will group by, and make it a key in a paired RDD. For example:

lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1'])
rdd = x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b)
print rdd.collect()

Then you will get the transposed result.

[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]

Solution 2

I grouped ((id1,t1),((p1,5.0),(p2,6.0)) and so on ... as my map function. Later, I reduce using map_group which creates an array for [p1,p2, . . . ] and fills in values in their respective positions.

def map_group(pgroup):
    x = np.zeros(19)
    x[0] = 1
    value_list = pgroup[1]
    for val in value_list:
        fno = val[0].split('.')[0]
        x[int(fno)-5] = val[1]
    return x

tgbr = d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \
                .reduceByKey(lambda p,q:p+q) \
                .map(lambda d: (d[0], map_group(d)))

This does feel like an expensive solution in terms of computation. But works for now.

Author by


Updated on June 27, 2022


  • Rahul
    Rahul 3 months

    I have a bunch of tuples which are in form of composite keys and values. For example,

    tfile.collect() = [(('id1','pd1','t1'),5.0), 
         (('id1','pd1','t3'),8.1)  ]

    I want to perform sql like operations on this collection, where I can aggregate the information based on id[1..n] or pd[1..n] . I want to implement using the vanilla pyspark apis and not using SQLContext. In my current implementation I am reading from a bunch of files and merging the RDD.

    def readfile():
        fr = range(6,23)
        tfile = sc.union([sc.textFile(basepath+str(f)+".txt")
                            .map(lambda view: set_feature(view,f)) 
                            .reduceByKey(lambda a, b: a+b)
                            for f in fr])
        return tfile

    I intend to create an aggregated array as a value. For example,

    agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]

    where 5.0,7.5,8.1 represent [t1,t2,t3] . I am currently, achieving the same by vanilla python code using dictionaries. It works fine for smaller data sets. But I worry as this may not scale for larger data sets. Is there an efficient way achieving the same using pyspark apis ?

  • Rahul
    Rahul over 7 years
    This is definitely an interesting way of solving this. I figured another way of achieving the same. But I guess, your method might be much faster than mine. I am sharing my own solution as well.
  • Daniel Darabos
    Daniel Darabos over 7 years
    Does PySpark not have groupByKey?
  • dapangmao
    dapangmao over 7 years
    PySpark has the method groupBykey. However, the question tends to group records together based on two fields, instead of doing aggregation such as SELECT sum(value) FROM data GROUP BY id, pd. So groupBykey may not help.