pyspark, Compare two rows in dataframe

11,526

Solution 1

The comment by @ShuaiYuan on the original answer is correct. Over the last year I've developed a much better understanding of how Spark works and have actually rewritten the program I was working on for this post.

NEW ANSWER (2017/03/27)
To accomplish comparing the two rows of the dataframe I ended up using an RDD. I group the data by key (in this case the item id) and ignore eventid as it's irrelevant in this equation. I then map a lambda function onto the rows, returning a tuple of the key and a list of tuples containing the start and end of event gaps, which is derived from "findGaps" function that iterates over the list of values (sorted timestamps) linked to each key. Once this is complete I filter out keys with no time gaps and then flatMapValues to return the data to a more sql like format. This is done with the following code:

# Find time gaps in list of datetimes where firings are longer than given duration.  
def findGaps(dates, duration):
    result = []
    length = len(dates)

    # convert to dates for comparison
    first = toDate(dates[0])
    last = toDate(dates[length - 1])
    for index, item in enumerate(dates):
        if index < length -1 and (dates[index + 1] - item).total_seconds() > duration:
            # build outage tuple and append to list
            # format (start, stop, duration)
            result.append(formatResult(item, dates[index + 1], kind))
    return result

outage_list = outage_join_df.rdd\
                            .groupByKey()\
                            .map(lambda row: (
                                     row[0],
                                     findGaps(
                                         sorted(list(row[1])), 
                                         limit
                                     )
                                  )
                            )\
                            .filter(lambda row: len(row[1]) > 0)\
                            .flatMapValues(lambda row: row)\
                            .map(lambda row: (
                                 row[0]['itemid'],     # itemid
                                 row[1][0].date(),     # date
                                 row[1][0],            # start
                                 row[1][1],            # stop
                                 row[1][2]             # duration
                            ))\
                            .collect()

ORIGINAL ANSWER (WRONG)
I managed to solve it using mapPartitions:

def findOutage(items):
    outages = []

    lastStamp = None
    for item in items:
        if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
            outages.append({"item": item.itemid, 
                            "start": item.stamp.isoformat(),
                            "stop": lastStamp.isoformat()})
        lastStamp = item.stamp
    return iter(outages)

items = df.limit(10).orderBy('itemid', desc('stamp'))

outages = items.mapPartitions(findOutage).collect()

Thanks everyone for the help!

Solution 2

Yes, you're using map function in a wrong way. map operates on a single element at the time. You can try to use window functions like this:

from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
        (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "eventid", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long")

df.withColumn("diff", diff)
Share:
11,526

Related videos on Youtube

ivywit
Author by

ivywit

Updated on June 04, 2022

Comments

  • ivywit
    ivywit over 1 year

    I'm attempting to compare one row in a dataframe with the next to see the difference in timestamp. Currently the data looks like:

     itemid | eventid | timestamp
     ----------------------------
     134    | 30      | 2016-07-02 12:01:40
     134    | 32      | 2016-07-02 12:21:23
     125    | 30      | 2016-07-02 13:22:56
     125    | 32      | 2016-07-02 13:27:07
    

    I've tried mapping a function onto the dataframe to allow for comparing like this: (note: I'm trying to get rows with a difference greater than 4 hours)

    items = df.limit(10)\
              .orderBy('itemid', desc('stamp'))\
              .map(lambda x,y: (x.stamp - y.stamp) > 14400).collect()
    

    But I'm getting the following error:

    Py4JJavaError: An error occurred while calling 
    z:org.apache.spark.api.python.PythonRDD.collectAndServe
    

    Which I believe is due to my using the map function incorrectly. Help with using map, or a different solution would be appreciated.

    UPDATE: @zero323's answer was informative on my improper use of mapping, however the system I'm using is running a Spark version before 2.02 and I'm working with data in Cassandra.

    I managed to solve it with mapPartitions. See my answer below.

    UPDATE(2017/03/27): Since originally marking the answer on this post my understanding of Spark has improved significantly. I've updated my answer below to show my current solution.

  • Jeff
    Jeff over 7 years
    Although make sure you're either using a HiveContext or Spark 2.02 here
  • ivywit
    ivywit over 7 years
    Thanks for the insight into map, but apparently a Hive context is required to use window. The system I'm working is just spark with Cassandra. I'll update the question to note that.
  • zero323
    zero323 over 7 years
    HiveContext doesn't require Hive per see. It only needs Spark built with Hive support (which is default in case of pre-build binaries).
  • Matthias
    Matthias over 7 years
    I got a question towards the line "diff = ". Isn't there a range-function on the window itself in which you can specify that you get the current and the next row? But you solved it differently.
  • zero323
    zero323 over 7 years
    @Matthias How would it help here?
  • Matthias
    Matthias over 7 years
    I'm just trying to understand the usage. I see that there is a Window.rowsBetween function that should be used to iterate over rows. And that there is a function using lag(...., 1) that indicates the following row. So do I need Window.rowsBetween or not or when? Following DataBricks post and this one.
  • shuaiyuancn
    shuaiyuancn over 7 years
    Needs to make sure the dataset is partitioned by timestamp.
  • zero323
    zero323 over 7 years
    @Matthias Short answer, you don't. Long one - SQL:03' if I remember correctly :)
  • Matthias
    Matthias over 7 years
    Works, but I get strange results. For timestamp T1: 2015-04-23 00:00:38.0 and T2: 2015-04-23 00:05:44.0 I get a delta of 306. But should 506, right? The timestamp comes in datetime type and not string.
  • zero323
    zero323 over 7 years
    @Matthias Why? 4 minutes, 22 seconds => 306 seconds in total.
  • Matthias
    Matthias over 7 years
    ahhh, me thinking in 10-step instead of 60 ;) thx for the hint
  • ivywit
    ivywit over 6 years
    @ShuaiYuan you're correct. I've updated my answer to show my current solution to the problem.