Sequences in Spark dataframe

23,577

Edited to answer @Tim's comment + fix patterns of the type "AABE"

Yep, using a window function helps, but I created an id to have an ordering:

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2)
).toDF("id","value","group","ts")

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)

Then lag will collect what is needed, but a function is required to generate the Column expression (note the split to eliminate double counting of "AABE". WARNING: this rejects patterns of the type "ABAEXX"):

def createSeq(m:Int) = split(
  concat(
    (1 to 2*m)
      .map(i => coalesce(lag('value,-i).over(w),lit("")))
  :_*),"A")(0)


val m=2
val tmp = df
  .withColumn("seq",createSeq(m))

+---+-----+-----+---+----+
| id|value|group| ts| seq|
+---+-----+-----+---+----+
|  6|    A|    Y|  1|   C|
|  7|    C|    Y|  2|    |
|  1|    A|    X|  1|BBDE|
|  2|    B|    X|  2| BDE|
|  3|    B|    X|  3|  DE|
|  4|    D|    X|  4|   E|
|  5|    E|    X|  5|    |
+---+-----+-----+---+----+

Because of the poor set of collection functions available in the Column API, avoiding regex altogether is much easier using a UDF

def patternInSeq(m: Int) = udf((str: String) => {
  var notFound = str
    .split("B")
    .filter(_.contains("E"))
    .filter(_.indexOf("E") <= m)
    .isEmpty
  !notFound
})

val res = tmp
  .filter(('value === "A") && (locate("B",'seq) > 0))
  .filter(locate("B",'seq) <= m && (locate("E",'seq) > 1))
  .filter(patternInSeq(m)('seq))
  .groupBy('group)
  .count
res.show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

Generalisation (out of scope)

If you want to generalise it sequence of letter that are longer, the question has to be generalised. It could be trivial, but in this case a pattern of the type ("ABAE") should be rejected (see comments). So the easiest way to generalise is to have a pair-wise rule as in the following implementation (I added a group "Z" to illustrate the behaviour of this algo)

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2),
  ( 8,"A","Z",1),
  ( 9,"B","Z",2),
  (10,"D","Z",3),
  (11,"B","Z",4),
  (12,"E","Z",5)
).toDF("id","value","group","ts")

First we define the logic for a pair

import org.apache.spark.sql.DataFrame
def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,-i).over(w),lit(""))):_*)
def filterPairUdf(m: Int, t: (String,String)) = udf((ar: Array[String]) => {
  val (a,b) = t
  val foundAt = ar
    .dropWhile(_ != a)
    .takeWhile(_ != a)
    .indexOf(b)
  foundAt != -1 && foundAt <= m
})

Then we define a function that applies this logic is applied iteratively on the dataframe

def filterSeq(seq: List[String], m: Int)(df: DataFrame): DataFrame = {
  var a = seq(0)
  seq.tail.foldLeft(df){(df: DataFrame, b: String) => {
    val res  = df.filter(filterPairUdf(m,(a,b))('seq))
    a = b
    res
  }}
}

A simplification and optimisation is obtained because we first filter on sequence beginning with the first character

val m = 2
val tmp = df
  .filter('value === "A") // reduce problem
  .withColumn("seq",createSeq(m))

scala> tmp.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  6|    A|    Y|  1|   [A, C, , , ]|
|  8|    A|    Z|  1|[A, B, D, B, E]|
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

val res = tmp.transform(filterSeq(List("A","B","E"),m))

scala> res.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

(transform is a simple sugar-coating of DataFrame => DataFrame transformation)

res
  .groupBy('group)
  .count
  .show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

As I said, there are different way to generalise the "resetting rules" when scanning a sequence,but this exemple hopefully helps in the implementation of more complex ones.

Share:
23,577
Tim
Author by

Tim

Updated on October 26, 2020

Comments

  • Tim
    Tim over 3 years

    I have dataframe in Spark. Looks like this:

    +-------+----------+-------+
    |  value|     group|     ts|
    +-------+----------+-------+
    |      A|         X|      1|
    |      B|         X|      2|
    |      B|         X|      3|
    |      D|         X|      4|
    |      E|         X|      5|
    |      A|         Y|      1|
    |      C|         Y|      2|
    +-------+----------+-------+
    

    Endgoal: I'd like to find how many sequences A-B-E (a sequence is just a list of subsequent rows) there are. With the added constraint that subsequent parts of the sequence can be maximum n rows apart. Let's consider for this example that n is 2.

    Consider group X. In this case there is exactly 1 D between B and E (multiple consecutive Bs are ignored). Which means B and E are 1 row apart and thus there is a sequence A-B-E

    I have thought about using collect_list(), creating a string (like DNA) and using substring search with regex. But I was wondering if there's a more elegant distributed way, perhaps using window functions?

    Edit:

    Note that the provided dataframe is just an example. The real dataframe (and thus groups) can be arbitrary long.

  • Tim
    Tim over 7 years
    I see what you did. However the dataframe I provided was just an example. Imagine the groups being any length of 1 to m where m might be as large as 50. Also I don't see the benefit of using the window here as in the end you still resort to the regex solution :).
  • Wilmerton
    Wilmerton over 7 years
    For this specific question, the advantage is that you have a distributed algorithm which finally makes you match 4 characters on only a fraction of the original number of line. Also, such a simple regex can easily be optimised in a few tests. So it is much better than collecting everything. It is true that it would require a little work to generalise it to arbitrarily long sequences, but it is beyond the scope of the question. I'll edit tomorrow.
  • Wilmerton
    Wilmerton over 7 years
    My bad, the generalisation to n steps between letters was in the question.
  • Tim
    Tim over 7 years
    It appears to working working great. However, I'm going to need some to time understand what's going on here and to figure out how to generalize it (not part of the question) to any sequence given in a list. Could you (before I try to disect it) clarify if this solution finds multiple occurences of a sequence in a group (also not part of the question, but just curious)
  • Tim
    Tim over 7 years
    So, I already figured out that the answer to my last question is 'yes', please ignore that :).
  • Wilmerton
    Wilmerton over 7 years
    @Tim yes, this solution is very ad hoc on the provided list. I have already been thinking about a more elegant way to handle the sequence, which would be generalisable. Maybe I'll post something, this is an interesting problem :-)
  • Wilmerton
    Wilmerton over 7 years
    @Tim I just realised there is a bug: my algo wouldn't count "ABAE"... That's probably due to the too ad hoc character of my solution. I'll try to update with a more generic approach if I find time to do it.
  • Tim
    Tim over 7 years
    Not counting ABAE is actually fine in this case. A new A would restart the sequence which is what I prefer anyway, it's also something a regex can't do :). I'll accept this as answer since it perfectly does what I aksed. If you find time to post a generic solution, that would be awesome though (even though it's not part of the main question).
  • Wilmerton
    Wilmerton over 7 years
    Ok, cool. But then the requirements for the generalisation are not clear to me: let's say you count sequence of List(Z,A,B,E) with m=2, "ZABAE" should maybe be counted, but not "ZAZBE", which would be weird... So maybe all second occurence of letters of the sequence restart the sequence except repetitions (which are ignored), but this is very restrictive. Is it desirable? But maybe we can have this discussion out of this question (maybe to another one).