Efficiently running a "for" loop in Apache spark so that execution is parallel

11,517

Looping in spark in always sequential and also not a good idea to use it in code. As per your code, you are using while and reading single record at a time which will not allow spark to run in parallel.

Spark code should be design without for and while loop if you have large data set.

As per my understand of your problem, I have written sample code in scala which give your desire output without using any loop. Please take below code as a reference and try to design a code in same way.

Note: I have written code in Scala that can be implemented in Python also with same logic.

scala> import org.apache.spark.sql.expressions.UserDefinedFunction

scala> def sampleUDF:UserDefinedFunction = udf((flagCol:String) => {var out = ""
     |       val flagColList = flagCol.reverse.split(s""",""").map(x => x.trim).mkString(",").reverse.split(s",").toList
     |       var i = 0
     |     var ss = flagColList.size
     |     flagColList.foreach{ x =>
     |        i =  i + 1
     |      val xs = List(flagColList(i-1))
     |      val ys =  flagColList.slice(i, ss)
     |      for (x <- xs; y <- ys)  
     |           out = out +","+x + "~" + y
     |         }
     |             if(out == "") { out = flagCol}
     |    out.replaceFirst(s""",""","")})

//Input DataSet 
scala> df.show
+-------+-------+
|bill_id|item_id|
+-------+-------+
|    ABC|      1|
|    ABC|      2|
|    DEF|      1|
|    DEF|      2|
|    DEF|      3|
|    GHI|      1|
+-------+-------+

//Collectin all item_id corresponding to bill_id

scala> val df1 = df.groupBy("bill_id")
               .agg(concat_ws(",",collect_list(col("item_id"))).alias("item"))

scala> df1.show
+-------+-----+
|bill_id| item|
+-------+-----+
|    DEF|1,2,3|
|    GHI|    1|
|    ABC|  1,2|
+-------+-----+


//Generating combination of all item_id and filter out for correct data

scala>   val df2 = df1.withColumn("item", sampleUDF(col("item")))
                      .withColumn("item", explode(split(col("item"), ",")))
                      .withColumn("Item_1", split(col("item"), "~")(0))
                      .withColumn("Item_2", split(col("item"), "~")(1))
                      .groupBy(col("Item_1"),col("Item_2"))
                      .agg(count(lit(1)).alias("Num_of_bills"))
                      .filter(col("Item_2").isNotNull)

scala> df2.show
+------+------+------------+
|Item_1|Item_2|Num_of_bills|
+------+------+------------+
|     2|     3|           1|
|     1|     2|           2|
|     1|     3|           1|
+------+------+------------+
Share:
11,517
Kamal Nandan
Author by

Kamal Nandan

Updated on August 21, 2022

Comments

  • Kamal Nandan
    Kamal Nandan over 1 year

    How can we parallelize a loop in Spark so that the processing is not sequential and its parallel. To take an example - I have the following data contained in a csv file (called 'bill_item.csv')that contains the following data:

        |-----------+------------|
        | bill_id   | item_id    |
        |-----------+------------|
        | ABC       | 1          |
        | ABC       | 2          |
        | DEF       | 1          |
        | DEF       | 2          |
        | DEF       | 3          |
        | GHI       | 1          |
        |-----------+------------|
    

    I have to get the output as follows:

        |-----------+-----------+--------------|
        | item_1    | item_2    | Num_of_bills |
        |-----------+-----------+--------------|
        | 1         | 2         | 2            |
        | 2         | 3         | 1            |
        | 1         | 3         | 1            |
        |-----------+-----------+--------------|
    

    We see that items 1 and 2 have been found under 2 bills 'ABC' and 'DEF', hence the 'Num_of_bills' for items 1 and 2 is 2. Similarly items 2 and 3 have been found only under bill 'DEF' and hence 'Num_of_bills' column is '1' and so on.

    I am using spark to process the CSV file 'bill_item.csv' and I am using the following approaches:

    Approach 1:

    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    # define the schema for the data 
    bi_schema = StructType([
        StructField("bill_id", StringType(), True), 
        StructField("item_id", IntegerType(), True) 
    ]) 
    
    bi_df = (spark.read.schema(dataSchema).csv('bill_item.csv'))
    
    # find the list of all items in sorted order
    item_list = bi_df.select("item_id").distinct().orderBy("item_id").collect()
    
    item_list_len = len(item_list)
    i = 0
    # for each pair of items for e.g. (1,2), (1,3), (1,4), (1,5), (2,3), (2,4), (2,5), ...... (4,5)
    while i < item_list_len - 1:
        # find the list of all bill IDs that contain item '1'
        bill_id_list1 = bi_df.filter(bi_df.item_id == item_list[i].item_id).select("bill_id").collect()
        j = i+1
        while j < item_list_len:
            # find the list of all bill IDs that contain item '2'
            bill_id_list2 = bi_df.filter(bi_df.item_id == item_list[j].item_id).select("bill_id").collect()
    
            # find the common bill IDs in list bill_id_list1 and bill_id_list2 and then the no. of common items
            common_elements = set(basket_id_list1).intersection(bill_id_list2)
            num_bils = len(common_elements)
            if(num_bils > 0):
                print(item_list[i].item_id, item_list[j].item_id, num_bils)
            j += 1    
        i+=1
    

    However, this approach is not an efficient approach given the fact that in real life we have millions of records and there may be the following issues:

    1. There may not be enough memory to load the list of all items or bills
    2. It may take too long to get the results because the execution is sequential (thanks to the 'for' loop). (I ran the above algorithm with ~200000 records and it took more than 4 hrs to come up with the desired result. )

    Approach 2:

    I further optimized this by splitting the data on the basis of "item_id" and I used the following block of code to split the data:

    bi_df = (spark.read.schema(dataSchema).csv('bill_item.csv'))
    outputPath='/path/to/save'
    bi_df.write.partitionBy("item_id").csv(outputPath)
    

    After splitting I executed the same algorithm that I used in "Approach 1" and I see that in case of 200000 records, it still takes 1.03 hours(a significant improvement from 4 hours under 'Approach 1') to get the final output.

    And the above bottleneck is because of the sequential 'for' loop (and also because of 'collect()' method). So my questions are:

    • Is there a way to parallelize the for loop?
    • Or is there any other efficient way?
    • thebluephantom
      thebluephantom over 4 years
      I think you are missing the point of Spark, it is inherently parallel processing by default - in a given manner. Can you elaborate on item-1 and item-2 pls? Will you have an item-3?
    • Kamal Nandan
      Kamal Nandan over 4 years
      Item_1 and Item_2 are pairs of products that are bought in the same basket (bill).. For e.g. if a person p1 buys 3 products 1,2,3 then the pairs of products are (1,2), (1,3), (2,3). If a person p2 buys 3 products 1,2 4 then the pair(s) are (1,2), (1,4), (2,4)). So if we consider that only 2 persons p1 and p2 bought from the store then the pairs are (1,2), (1,3), (2,3), (1,4), (2,4) and the count of each pairs are: (1,2) - 2 (because both p1 and p2 bought this pair); (1,3) - 1 (only p1 bought ); (2,3) - 1 (only p1 bought ); (1,4) - 1 (only p2 bought ); (2,4) - 1 (only p1 bought);
    • thebluephantom
      thebluephantom over 4 years
      sure example is correct?
    • Kamal Nandan
      Kamal Nandan over 4 years
      Yes, spark is inherently parallel and thats the objective. However, the way I have implemented it, it looks like it has become sequential. How do i fix this?
    • thebluephantom
      thebluephantom over 4 years
      i cannot follow example
    • Kamal Nandan
      Kamal Nandan over 4 years
      Yes, the example is correct. Its a market-basket analysis problem where the retailer wants to find out the pairs of products that are bought very frequently (so that the retailer may take some action such as - the recommendation engine may provide recommendation for items/products to future customers according to their chosen items.) I hope it makes sense now :)
    • thebluephantom
      thebluephantom over 4 years
      Not well explained with the example then. I am familiar with that, then. Title should have reflected that. But only 2 items max?
    • Kamal Nandan
      Kamal Nandan over 4 years
      yes, only a pair of items. I will do it for more than 2 item sets too such as 3 items, or 4 items and so on. But right now I am focusing on pairs. If I am able to do it efficiently, I will follow the same approach for more items too. (The approach and solution that I have described above functions 'correctly', but its not efficient. I want the big-data/spark approach to make it efficient so that the results are quicker.)
    • thebluephantom
      thebluephantom over 4 years
      If I remember correctly an expensive operation, this is in the fP growth domain
    • Kamal Nandan
      Kamal Nandan over 4 years
      I just had a quick look at fp-growth term. I had never heard of this. Indeed its an fp-growth problem. But I am wondering if there is a way to fix this using conventional or spark approach.
    • thebluephantom
      thebluephantom over 4 years
      Spark offers fp growth
    • jxc
      jxc over 4 years
      @KamalNandan, if you just need pairs, then do a self join could be enough.
    • thebluephantom
      thebluephantom over 4 years
      But he may need more than that...
    • Kamal Nandan
      Kamal Nandan over 4 years
      @jxc, Yes, for the pairs I could do it using self join. But, I will need for 3 sets too.
    • Kamal Nandan
      Kamal Nandan over 4 years
      @thebluephantom, I will look for fp growth in spark. That's interesting. Thanks for letting me know about it.
    • thebluephantom
      thebluephantom over 4 years
      See the answer!!!
  • Kamal Nandan
    Kamal Nandan over 4 years
    Thanks a lot Nikk for the elegant solution! I am sorry - didnt see the solution sooner since I was on vacation. However, I have also implemented a solution of my own without the loops (using self-join approach). I will post that in a day or two.