How can I use "for" loop in spark with pyspark

39,711

Your return statement cannot be inside the loop; otherwise, it returns after the first iteration, never to make it to the second iteration.

What you could try is this

result = []
for i in value:
    result.append((i,label))
return result

and then result would be a list of all of the tuples created inside the loop.

Share:
39,711
Linghao
Author by

Linghao

Updated on July 09, 2022

Comments

  • Linghao
    Linghao almost 2 years

    I met a problem while using spark with python3 in my project. In a Key-Value pair, like ('1','+1 2,3'), the part "2,3" was the content I wanted to check. So I wrote the following code:
    (Assume this key-Value pair was saved in a RDD called p_list)


    def add_label(x):   
        label=x[1].split()[0]  
        value=x[1].split()[1].split(",")  
        for i in value:     
            return (i,label)  
    p_list=p_list.map(add_label)
    

    After doing like that, I could only get the result: ('2','+1') and it should be ('2','+1') and ('3','+1'). It seems like that the "for" loop in map operation just did once. How can I let it do multiple times? Or is there any other way I can use to implement such a function like "for" loop in map operation or reduce operation?

    I want to mention that what I really deal with is a large dataset. So I have to use AWS cluster and implement the loop with parallelization. The slave nodes in the cluster seem not to understand the loop. How can I let them know that with Spark RDD function? Or how can have such a loop operation in another pipeline way (which is one of the main design of Spark RDD)?

    • OneCricketeer
      OneCricketeer over 7 years
      I'm tempted to remove the spark tags from this question because nothing about this is really spark-specific other than the RDD.map function
    • Linghao
      Linghao over 7 years
      Sorry for not mentioning the details of my project. It is really a spark application. The problem above is just an abstract of the main problem I met. There will be a bunch of key-value pairs, like ('1','+1 2,3'), saved in the rdd. I need to compare the label and the following child nodes, and return each (child node, label) for all key-value pairs. The whole operation may be RDD.map().filter().reduceByKey() and others. It should be done on AWS with spark cluster.
    • Kristian
      Kristian over 7 years
      have you tried rdd.foreach() ?
    • Linghao
      Linghao over 7 years
      rdd.foreach() applies a function to all elements in RDD. I can also apply a function to every element with rdd.map() or rdd.reduce(). Any difference? Actually I tried to set a for loop in rdd.foreach(), but it still just did once for every element. I need to do an operation multiple times for every element, not just once.
    • Linghao
      Linghao over 7 years
      I ran this on a single computer, and it worked well. But when I ran it in pseudo mode or fully distributed mode(like AWS), the result showed that this operation was only done once. Why didn't the for loop work in distributed computing?
    • Linghao
      Linghao over 7 years
      Thanks guys, you inspired me. I will try to put the return outside and use rdd.flatMap() to produce mutiple results for every element.
    • migas
      migas almost 7 years
      I know I'm late for the party, but you could just replace your return with yield.
  • Linghao
    Linghao over 7 years
    I get your point and will try to use rdd.flatMap() to flatten a list of results for every element in rdd. Thank you.