Merging multiple rows in a spark dataframe into a single row

13,532

Solution 1

You can simply group and aggregate. With data as:

val df = sc.parallelize(Seq(
  (1441637160, 10.0),
  (1441637170, 20.0),
  (1441637180, 30.0),
  (1441637210, 40.0),
  (1441637220, 10.0),
  (1441637230, 0.0))).toDF("timestamp", "value")

import required functions and classes:

import org.apache.spark.sql.functions.{lit, floor}
import org.apache.spark.sql.types.IntegerType

create interval column:

val tsGroup = (floor($"timestamp" / lit(60)) * lit(60))
  .cast(IntegerType)
  .alias("timestamp")

and use it to perform aggregation:

df.groupBy(tsGroup).agg(mean($"value").alias("value")).show

// +----------+-----+
// | timestamp|value|
// +----------+-----+
// |1441637160| 25.0|
// |1441637220|  5.0|
// +----------+-----+

Solution 2

First map the timestamp to the minute bucket, then use groupByKey to calculate the averages. For example:

rdd.map(x=>{val round = x._1%60; (x._1-round, x._2);})
.groupByKey
.map(x=>(x._1, (x._2.sum.toDouble/x._2.size)))
.collect()
Share:
13,532

Related videos on Youtube

polo
Author by

polo

Updated on June 22, 2022

Comments

  • polo
    polo almost 2 years

    I have a data frame with 2 columns: timestamp, value timestamp is a time since the epoch and value is a float value. I want to merge rows to average values by min. That means that I want to take all rows where timestamp is from the same round minute (60 seconds intervals since the epoch) and merge them into a single row, where the value column will be the mean of all the values.

    To give an example, lets assume that my dataframe looks like this:

    timestamp      value
    ---------      -----
    1441637160      10.0
    1441637170      20.0
    1441637180      30.0
    1441637210      40.0
    1441637220      10.0
    1441637230      0.0
    

    The first 4 rows are part of the same min (1441637160 % 60 == 0, 1441637160 + 60 == 1441637220) The last 2 rows are part of another min. I would like to merge all rows of the same min. to get a result that looks like:

    timestamp      value
    ---------      -----
    1441637160      25.0  (since (10+20+30+40)/4 = 25)
    1441637220      5.0   (since (10+0)/2 = 5)
    

    What's the best way to do that?