Efficiently running a "for" loop in Apache spark so that execution is parallel
Looping in spark in always sequential and also not a good idea to use it in code. As per your code, you are using while
and reading single record at a time which will not allow spark to run in parallel.
Spark code should be design without for
and while
loop if you have large data set.
As per my understand of your problem, I have written sample code in scala which give your desire output without using any loop. Please take below code as a reference and try to design a code in same way.
Note: I have written code in Scala that can be implemented in Python also with same logic.
scala> import org.apache.spark.sql.expressions.UserDefinedFunction
scala> def sampleUDF:UserDefinedFunction = udf((flagCol:String) => {var out = ""
| val flagColList = flagCol.reverse.split(s""",""").map(x => x.trim).mkString(",").reverse.split(s",").toList
| var i = 0
| var ss = flagColList.size
| flagColList.foreach{ x =>
| i = i + 1
| val xs = List(flagColList(i-1))
| val ys = flagColList.slice(i, ss)
| for (x <- xs; y <- ys)
| out = out +","+x + "~" + y
| }
| if(out == "") { out = flagCol}
| out.replaceFirst(s""",""","")})
//Input DataSet
scala> df.show
+-------+-------+
|bill_id|item_id|
+-------+-------+
| ABC| 1|
| ABC| 2|
| DEF| 1|
| DEF| 2|
| DEF| 3|
| GHI| 1|
+-------+-------+
//Collectin all item_id corresponding to bill_id
scala> val df1 = df.groupBy("bill_id")
.agg(concat_ws(",",collect_list(col("item_id"))).alias("item"))
scala> df1.show
+-------+-----+
|bill_id| item|
+-------+-----+
| DEF|1,2,3|
| GHI| 1|
| ABC| 1,2|
+-------+-----+
//Generating combination of all item_id and filter out for correct data
scala> val df2 = df1.withColumn("item", sampleUDF(col("item")))
.withColumn("item", explode(split(col("item"), ",")))
.withColumn("Item_1", split(col("item"), "~")(0))
.withColumn("Item_2", split(col("item"), "~")(1))
.groupBy(col("Item_1"),col("Item_2"))
.agg(count(lit(1)).alias("Num_of_bills"))
.filter(col("Item_2").isNotNull)
scala> df2.show
+------+------+------------+
|Item_1|Item_2|Num_of_bills|
+------+------+------------+
| 2| 3| 1|
| 1| 2| 2|
| 1| 3| 1|
+------+------+------------+
Kamal Nandan
Updated on August 21, 2022Comments
-
Kamal Nandan over 1 year
How can we parallelize a loop in Spark so that the processing is not sequential and its parallel. To take an example - I have the following data contained in a csv file (called 'bill_item.csv')that contains the following data:
|-----------+------------| | bill_id | item_id | |-----------+------------| | ABC | 1 | | ABC | 2 | | DEF | 1 | | DEF | 2 | | DEF | 3 | | GHI | 1 | |-----------+------------|
I have to get the output as follows:
|-----------+-----------+--------------| | item_1 | item_2 | Num_of_bills | |-----------+-----------+--------------| | 1 | 2 | 2 | | 2 | 3 | 1 | | 1 | 3 | 1 | |-----------+-----------+--------------|
We see that items 1 and 2 have been found under 2 bills 'ABC' and 'DEF', hence the 'Num_of_bills' for items 1 and 2 is 2. Similarly items 2 and 3 have been found only under bill 'DEF' and hence 'Num_of_bills' column is '1' and so on.
I am using spark to process the CSV file 'bill_item.csv' and I am using the following approaches:
Approach 1:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType # define the schema for the data bi_schema = StructType([ StructField("bill_id", StringType(), True), StructField("item_id", IntegerType(), True) ]) bi_df = (spark.read.schema(dataSchema).csv('bill_item.csv')) # find the list of all items in sorted order item_list = bi_df.select("item_id").distinct().orderBy("item_id").collect() item_list_len = len(item_list) i = 0 # for each pair of items for e.g. (1,2), (1,3), (1,4), (1,5), (2,3), (2,4), (2,5), ...... (4,5) while i < item_list_len - 1: # find the list of all bill IDs that contain item '1' bill_id_list1 = bi_df.filter(bi_df.item_id == item_list[i].item_id).select("bill_id").collect() j = i+1 while j < item_list_len: # find the list of all bill IDs that contain item '2' bill_id_list2 = bi_df.filter(bi_df.item_id == item_list[j].item_id).select("bill_id").collect() # find the common bill IDs in list bill_id_list1 and bill_id_list2 and then the no. of common items common_elements = set(basket_id_list1).intersection(bill_id_list2) num_bils = len(common_elements) if(num_bils > 0): print(item_list[i].item_id, item_list[j].item_id, num_bils) j += 1 i+=1
However, this approach is not an efficient approach given the fact that in real life we have millions of records and there may be the following issues:
- There may not be enough memory to load the list of all items or bills
- It may take too long to get the results because the execution is sequential (thanks to the 'for' loop). (I ran the above algorithm with ~200000 records and it took more than 4 hrs to come up with the desired result. )
Approach 2:
I further optimized this by splitting the data on the basis of "item_id" and I used the following block of code to split the data:
bi_df = (spark.read.schema(dataSchema).csv('bill_item.csv')) outputPath='/path/to/save' bi_df.write.partitionBy("item_id").csv(outputPath)
After splitting I executed the same algorithm that I used in "Approach 1" and I see that in case of 200000 records, it still takes 1.03 hours(a significant improvement from 4 hours under 'Approach 1') to get the final output.
And the above bottleneck is because of the sequential 'for' loop (and also because of 'collect()' method). So my questions are:
- Is there a way to parallelize the for loop?
- Or is there any other efficient way?
-
thebluephantom over 4 yearsI think you are missing the point of Spark, it is inherently parallel processing by default - in a given manner. Can you elaborate on item-1 and item-2 pls? Will you have an item-3?
-
Kamal Nandan over 4 yearsItem_1 and Item_2 are pairs of products that are bought in the same basket (bill).. For e.g. if a person p1 buys 3 products 1,2,3 then the pairs of products are (1,2), (1,3), (2,3). If a person p2 buys 3 products 1,2 4 then the pair(s) are (1,2), (1,4), (2,4)). So if we consider that only 2 persons p1 and p2 bought from the store then the pairs are (1,2), (1,3), (2,3), (1,4), (2,4) and the count of each pairs are: (1,2) - 2 (because both p1 and p2 bought this pair); (1,3) - 1 (only p1 bought ); (2,3) - 1 (only p1 bought ); (1,4) - 1 (only p2 bought ); (2,4) - 1 (only p1 bought);
-
thebluephantom over 4 yearssure example is correct?
-
Kamal Nandan over 4 yearsYes, spark is inherently parallel and thats the objective. However, the way I have implemented it, it looks like it has become sequential. How do i fix this?
-
thebluephantom over 4 yearsi cannot follow example
-
Kamal Nandan over 4 yearsYes, the example is correct. Its a market-basket analysis problem where the retailer wants to find out the pairs of products that are bought very frequently (so that the retailer may take some action such as - the recommendation engine may provide recommendation for items/products to future customers according to their chosen items.) I hope it makes sense now :)
-
thebluephantom over 4 yearsNot well explained with the example then. I am familiar with that, then. Title should have reflected that. But only 2 items max?
-
Kamal Nandan over 4 yearsyes, only a pair of items. I will do it for more than 2 item sets too such as 3 items, or 4 items and so on. But right now I am focusing on pairs. If I am able to do it efficiently, I will follow the same approach for more items too. (The approach and solution that I have described above functions 'correctly', but its not efficient. I want the big-data/spark approach to make it efficient so that the results are quicker.)
-
thebluephantom over 4 yearsIf I remember correctly an expensive operation, this is in the fP growth domain
-
Kamal Nandan over 4 yearsI just had a quick look at fp-growth term. I had never heard of this. Indeed its an fp-growth problem. But I am wondering if there is a way to fix this using conventional or spark approach.
-
thebluephantom over 4 yearsSpark offers fp growth
-
jxc over 4 years@KamalNandan, if you just need pairs, then do a self join could be enough.
-
thebluephantom over 4 yearsBut he may need more than that...
-
Kamal Nandan over 4 years@jxc, Yes, for the pairs I could do it using self join. But, I will need for 3 sets too.
-
Kamal Nandan over 4 years@thebluephantom, I will look for fp growth in spark. That's interesting. Thanks for letting me know about it.
-
thebluephantom over 4 yearsSee the answer!!!
-
Kamal Nandan over 4 yearsThanks a lot Nikk for the elegant solution! I am sorry - didnt see the solution sooner since I was on vacation. However, I have also implemented a solution of my own without the loops (using self-join approach). I will post that in a day or two.