How can I parallelize a for loop in spark with scala?
Solution 1
The program you write runs in a driver ("master") spark node. Expressions in this program can only be parallelized if you are operating on parallel structures (RDDs).
Try this:
marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg) }.foreach{ case (symbol,averages) => averages.save() }
where symbolize
takes a Row of symbol x day and returns a tuple (symbol, day).
Solution 2
For the first part of the answer I don't agree with Carlos. The program does not run in the driver ("master").
The loop does run sequentially, but for each symbol the execution of:
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
is done in parallel since markedData
is a Spark DataFrame and it is distributed.
Rongjie Zhang
Updated on July 03, 2022Comments
-
Rongjie Zhang almost 2 years
For example, we have a parquet file with 2000 stock symbols' closing price in the past 3 years, and we want to calculate the 5-day moving average for each symbol.
So I create a spark SQLContext and then
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
To get the symbol list,
val symbols = marketData.select("SYMBOL").distinct().collect()
and here is the for loop:
for (symbol <- symbols) { marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save() }
Obviously, doing the for loop on spark is slow, and
save()
for each small result also slows down the process (I have tried define avar result
outside the for loop and union all the output to make the IO operation together, but I got a stackoverflow exception), so how can I parallelize the for loop and optimize the IO operation? -
Rongjie Zhang about 8 yearsThanks for your answer. However, the
marketdata
contains all the market data (2000 symbol × 900 day = 1800000 rows) and if wesliding(5)
on this rdd withoutfilter(symbol)
seems will got a wrong result about moving average? Did I make myself clear? -
Rongjie Zhang about 8 yearsThanks for you patience. And as far as I know, if we have a
symbolize
like{ row => (row.getAs[String]("SYMBOL"), row) }
andreduceByKey
on the rdd thatmap(symbolize)
returns we will have toreduceByKey{ case (row_x, row_y) => ...}
instead ofreduceByKey{ case (symbol, days) => ...}
, finally, IgroupByKey()
on the rdd thatmap(symbolize)
returns andmapValues(x => x.sliding(5).map(makeAvg)).save()
and that works. Thank you again for your help!