How can I count the average from Spark RDD?

10,389

Solution 1

you can use aggregateByKey.

val rdd = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val agg_rdd = rdd.aggregateByKey((0,0))((acc, value) => (acc._1 + value, acc._2 + 1),(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val sum = agg_rdd.mapValues(x => (x._1/x._2))
sum.collect

Solution 2

You can use the groupByKey in this case.like this

val rdd = spark.sparkContext.parallelize(List((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))
val processedRDD = rdd.groupByKey.mapValues{iterator => iterator.sum / iterator.size}
processedRDD.collect.toList

Here, groupByKey will return the RDD[(Int, Iterator[Int])] then you can simply apply average operation on Iterator

Hope this works for you

Thanks

Solution 3

You can use .combineByKey() to compute average:

val data = sc.parallelize(Seq((2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)))

val sumCountPair = data.combineByKey((x: Int) => (x.toDouble,1),
                                     (pair1: (Double, Int), x: Int) => (pair1._1 + x, pair1._2 + 1), 
                                     (pair1: (Double, Int), pair2: (Double, Int)) => (pair1._1 + pair2._1, pair1._2 + pair2._2))

val average = sumCountPair.map(x => (x._1, (x._2._1/x._2._2)))
average.collect()

here sumCountPair returns type RDD[(Int, (Double, Int))], denoting: (Key, (SumValue, CountValue)). The next step just divides sum by the count and returns (Key, AverageValue)

Share:
10,389
lee
Author by

lee

Updated on June 25, 2022

Comments

  • lee
    lee over 1 year

    I have a problem with Spark Scala which I want count the average from the Rdd data,I create a new RDD like this,

    [(2,110),(2,130),(2,120),(3,200),(3,206),(3,206),(4,150),(4,160),(4,170)]
    

    I want to count them like this,

    [(2,(110+130+120)/3),(3,(200+206+206)/3),(4,(150+160+170)/3)]
    

    then,get the result like this,

       [(2,120),(3,204),(4,160)]
    

    How can I do this with scala from RDD? I use spark version 1.6