How can I parallelize a for loop in spark with scala?

10,800

Solution 1

The program you write runs in a driver ("master") spark node. Expressions in this program can only be parallelized if you are operating on parallel structures (RDDs).

Try this:

marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg)  }.foreach{ case (symbol,averages) => averages.save() }

where symbolize takes a Row of symbol x day and returns a tuple (symbol, day).

Solution 2

For the first part of the answer I don't agree with Carlos. The program does not run in the driver ("master").

The loop does run sequentially, but for each symbol the execution of:

marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()

is done in parallel since markedData is a Spark DataFrame and it is distributed.

Share:
10,800
Rongjie Zhang
Author by

Rongjie Zhang

Updated on July 03, 2022

Comments

  • Rongjie Zhang
    Rongjie Zhang almost 2 years

    For example, we have a parquet file with 2000 stock symbols' closing price in the past 3 years, and we want to calculate the 5-day moving average for each symbol.

    So I create a spark SQLContext and then

    val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
    

    To get the symbol list,

    val symbols = marketData.select("SYMBOL").distinct().collect()
    

    and here is the for loop:

    for (symbol <- symbols) {
      marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
    }
    

    Obviously, doing the for loop on spark is slow, and save() for each small result also slows down the process (I have tried define a var result outside the for loop and union all the output to make the IO operation together, but I got a stackoverflow exception), so how can I parallelize the for loop and optimize the IO operation?

  • Rongjie Zhang
    Rongjie Zhang about 8 years
    Thanks for your answer. However, the marketdata contains all the market data (2000 symbol × 900 day = 1800000 rows) and if we sliding(5) on this rdd without filter(symbol) seems will got a wrong result about moving average? Did I make myself clear?
  • Rongjie Zhang
    Rongjie Zhang about 8 years
    Thanks for you patience. And as far as I know, if we have a symbolize like { row => (row.getAs[String]("SYMBOL"), row) } and reduceByKey on the rdd that map(symbolize) returns we will have to reduceByKey{ case (row_x, row_y) => ...} instead of reduceByKey{ case (symbol, days) => ...}, finally, I groupByKey() on the rdd that map(symbolize) returns and mapValues(x => x.sliding(5).map(makeAvg)).save() and that works. Thank you again for your help!