Sequences in Spark dataframe
Edited to answer @Tim's comment + fix patterns of the type "AABE"
Yep, using a window function helps, but I created an id
to have an ordering:
val df = List(
(1,"A","X",1),
(2,"B","X",2),
(3,"B","X",3),
(4,"D","X",4),
(5,"E","X",5),
(6,"A","Y",1),
(7,"C","Y",2)
).toDF("id","value","group","ts")
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)
Then lag will collect what is needed, but a function is required to generate the Column
expression (note the split to eliminate double counting of "AABE". WARNING: this rejects patterns of the type "ABAEXX"):
def createSeq(m:Int) = split(
concat(
(1 to 2*m)
.map(i => coalesce(lag('value,-i).over(w),lit("")))
:_*),"A")(0)
val m=2
val tmp = df
.withColumn("seq",createSeq(m))
+---+-----+-----+---+----+
| id|value|group| ts| seq|
+---+-----+-----+---+----+
| 6| A| Y| 1| C|
| 7| C| Y| 2| |
| 1| A| X| 1|BBDE|
| 2| B| X| 2| BDE|
| 3| B| X| 3| DE|
| 4| D| X| 4| E|
| 5| E| X| 5| |
+---+-----+-----+---+----+
Because of the poor set of collection functions available in the Column
API, avoiding regex altogether is much easier using a UDF
def patternInSeq(m: Int) = udf((str: String) => {
var notFound = str
.split("B")
.filter(_.contains("E"))
.filter(_.indexOf("E") <= m)
.isEmpty
!notFound
})
val res = tmp
.filter(('value === "A") && (locate("B",'seq) > 0))
.filter(locate("B",'seq) <= m && (locate("E",'seq) > 1))
.filter(patternInSeq(m)('seq))
.groupBy('group)
.count
res.show
+-----+-----+
|group|count|
+-----+-----+
| X| 1|
+-----+-----+
Generalisation (out of scope)
If you want to generalise it sequence of letter that are longer, the question has to be generalised. It could be trivial, but in this case a pattern of the type ("ABAE") should be rejected (see comments). So the easiest way to generalise is to have a pair-wise rule as in the following implementation (I added a group "Z" to illustrate the behaviour of this algo)
val df = List(
(1,"A","X",1),
(2,"B","X",2),
(3,"B","X",3),
(4,"D","X",4),
(5,"E","X",5),
(6,"A","Y",1),
(7,"C","Y",2),
( 8,"A","Z",1),
( 9,"B","Z",2),
(10,"D","Z",3),
(11,"B","Z",4),
(12,"E","Z",5)
).toDF("id","value","group","ts")
First we define the logic for a pair
import org.apache.spark.sql.DataFrame
def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,-i).over(w),lit(""))):_*)
def filterPairUdf(m: Int, t: (String,String)) = udf((ar: Array[String]) => {
val (a,b) = t
val foundAt = ar
.dropWhile(_ != a)
.takeWhile(_ != a)
.indexOf(b)
foundAt != -1 && foundAt <= m
})
Then we define a function that applies this logic is applied iteratively on the dataframe
def filterSeq(seq: List[String], m: Int)(df: DataFrame): DataFrame = {
var a = seq(0)
seq.tail.foldLeft(df){(df: DataFrame, b: String) => {
val res = df.filter(filterPairUdf(m,(a,b))('seq))
a = b
res
}}
}
A simplification and optimisation is obtained because we first filter on sequence beginning with the first character
val m = 2
val tmp = df
.filter('value === "A") // reduce problem
.withColumn("seq",createSeq(m))
scala> tmp.show()
+---+-----+-----+---+---------------+
| id|value|group| ts| seq|
+---+-----+-----+---+---------------+
| 6| A| Y| 1| [A, C, , , ]|
| 8| A| Z| 1|[A, B, D, B, E]|
| 1| A| X| 1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+
val res = tmp.transform(filterSeq(List("A","B","E"),m))
scala> res.show()
+---+-----+-----+---+---------------+
| id|value|group| ts| seq|
+---+-----+-----+---+---------------+
| 1| A| X| 1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+
(transform
is a simple sugar-coating of DataFrame => DataFrame
transformation)
res
.groupBy('group)
.count
.show
+-----+-----+
|group|count|
+-----+-----+
| X| 1|
+-----+-----+
As I said, there are different way to generalise the "resetting rules" when scanning a sequence,but this exemple hopefully helps in the implementation of more complex ones.
Tim
Updated on October 26, 2020Comments
-
Tim over 3 years
I have dataframe in Spark. Looks like this:
+-------+----------+-------+ | value| group| ts| +-------+----------+-------+ | A| X| 1| | B| X| 2| | B| X| 3| | D| X| 4| | E| X| 5| | A| Y| 1| | C| Y| 2| +-------+----------+-------+
Endgoal: I'd like to find how many sequences
A-B-E
(a sequence is just a list of subsequent rows) there are. With the added constraint that subsequent parts of the sequence can be maximumn
rows apart. Let's consider for this example thatn
is 2.Consider group
X
. In this case there is exactly 1D
betweenB
andE
(multiple consecutiveB
s are ignored). Which meansB
andE
are 1 row apart and thus there is a sequenceA-B-E
I have thought about using
collect_list()
, creating a string (like DNA) and using substring search with regex. But I was wondering if there's a more elegant distributed way, perhaps using window functions?Edit:
Note that the provided dataframe is just an example. The real dataframe (and thus groups) can be arbitrary long.
-
Tim over 7 yearsI see what you did. However the dataframe I provided was just an example. Imagine the groups being any length of 1 to
m
wherem
might be as large as 50. Also I don't see the benefit of using the window here as in the end you still resort to the regex solution :). -
Wilmerton over 7 yearsFor this specific question, the advantage is that you have a distributed algorithm which finally makes you match 4 characters on only a fraction of the original number of line. Also, such a simple regex can easily be optimised in a few tests. So it is much better than collecting everything. It is true that it would require a little work to generalise it to arbitrarily long sequences, but it is beyond the scope of the question. I'll edit tomorrow.
-
Wilmerton over 7 yearsMy bad, the generalisation to
n
steps between letters was in the question. -
Tim over 7 yearsIt appears to working working great. However, I'm going to need some to time understand what's going on here and to figure out how to generalize it (not part of the question) to any sequence given in a list. Could you (before I try to disect it) clarify if this solution finds multiple occurences of a sequence in a group (also not part of the question, but just curious)
-
Tim over 7 yearsSo, I already figured out that the answer to my last question is 'yes', please ignore that :).
-
Wilmerton over 7 years@Tim yes, this solution is very ad hoc on the provided list. I have already been thinking about a more elegant way to handle the sequence, which would be generalisable. Maybe I'll post something, this is an interesting problem :-)
-
Wilmerton over 7 years@Tim I just realised there is a bug: my algo wouldn't count "ABAE"... That's probably due to the too ad hoc character of my solution. I'll try to update with a more generic approach if I find time to do it.
-
Tim over 7 yearsNot counting
ABAE
is actually fine in this case. A newA
would restart the sequence which is what I prefer anyway, it's also something a regex can't do :). I'll accept this as answer since it perfectly does what I aksed. If you find time to post a generic solution, that would be awesome though (even though it's not part of the main question). -
Wilmerton over 7 yearsOk, cool. But then the requirements for the generalisation are not clear to me: let's say you count sequence of
List(Z,A,B,E)
withm=2
, "ZABAE" should maybe be counted, but not "ZAZBE", which would be weird... So maybe all second occurence of letters of the sequence restart the sequence except repetitions (which are ignored), but this is very restrictive. Is it desirable? But maybe we can have this discussion out of this question (maybe to another one).