How do I return multiple key-value pairs in Scala using Spark's map transformation?
Solution 1
You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).
file.map(line => {
//multiple lines of code here
})
Full answer after edits:
case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)
val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
val doubleSeq = for(i <- 0 until arr.length) yield {
val x = arr(i).toDouble
for(j <- (i+1) until arr.length) yield {
val y = arr(j).toDouble
val k = Index(i,j)
val v = Val(x,y)
(k,v)
}
}
doubleSeq.flatten
})
There were a multitude of problems actually:
- Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement
Serializable
- I changed
map
toflatMap
, as well asflatten
ed your array as oneflatMap
would still leave you with an inner array. Now, the combination of the two will yield you yourRDD[(Index, Val)]
, which can now be implicitly used withreduceByKey
- I turned your
for
loop into afor
comprehension by usingyield
. You were getting a final type ofUnit
because the return type of afor
loop isUnit
Solution 2
Use RDD.flatMap
and yield
a list from the for
loop:
val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
val arr = line.split(",")
for {
i <- 0 until arr.length
j <- (i + 1) until arr.length
} yield {
val x = arr(i).toDouble
val y = arr(j).toDouble
val k = new Index(i, j)
val v = new Val(x, y)
(k, v)
}
}.collect
Related videos on Youtube
Jane Wayne
Updated on June 04, 2022Comments
-
Jane Wayne almost 2 years
I'm new to Scala and Spark. I'm trying to return multiple key-value pairs during the map transformation. My input data is a simple CSV file.
1, 2, 3 4, 5, 6 7, 8, 9
My Scala script looks like the following.
class Key(_i:Integer, _j:Integer) { def i = _i def j = _j } class Val(_x:Double, _y:Double) { def x = _x def y = _y } val arr = "1,2,3".split(",") for(i <- 0 until arr.length) { val x = arr(i).toDouble for(j <- 0 until arr.length) { val y = arr(j).toDouble val k = new Key(i, j) val v = new Val(x, y) //note that i want to return the tuples, (k, v) } }
I want to be able to use the for loop and data structures above to return multiple tuples (k, v). Something similar to the code below.
val file = sc.textFile("/path/to/test.csv") file.map(line => { val arr = line.split(",") for(i <- 0 until arr.length) { val x = arr(i).toDouble for(j <- (i+1) until arr.length) { val y = arr(j).toDouble val k = new Index(i,j) val v = new Val(x,y) (k,v) } } }).collect //reduceByKey is not there, reduce is there, but not what i want
When I copy/paste the code above into the lambda expression (and run on the Scala REPL shell) I get the following error:
error: illegal start of simple expression val arr = line.split(",") ^
I realize also that I am still stuck in imperative/procedural style programming thinking, so please bear with me (and a newbie at Scala/Spark).
-
Jane Wayne about 9 yearsYour suggestion helped. Now that error goes away. But when I add the return statement, return (k,v), I get the following: error: return outside method definition.
-
Justin Pihony about 9 yearsI did not see that...DO NOT return in scala, the final statement is the return value. That will fix it I would think
-
Jane Wayne about 9 yearsDo you know how I can check to see if the lambda function is correct? When I do file.map(line => {...}).collect, all I see is Array[Unit] = Array((),(),...()). What I want to do next is reduce all the values by the same key. However, autocomplete (hitting tab) indicates that reduceByKey is not a member of org.apache.spark.rdd.RDD[Unit]. I'm still stuck in the MapReduce state of mind.
-
Jane Wayne about 9 yearsI posted the code that works now with your help. Note that in the example above, I use collect to try and inspect what's actually in that RDD. Meanwhile, I'm reading this article blog.cloudera.com/blog/2014/09/… and it seems to suggest that the map function in Scala/Spark has 1 input and 1 output, and because of what I want to do, I might have to use the flatMap function.
-
Alexander Aleksandrovič Klimov about 9 yearsYes, flatMap seems the right thing. Not exactly the same as your code, but this question stackoverflow.com/questions/29472603/… also uses flatMap to generate more than one output row from each input row. It may point you in the right direction?
-
Justin Pihony about 9 years@JaneWayne I just posted a full answer...you actually had a multitude of problems. And, the code can probably be cleaned up even further, but should work for your purposes :)
-
Daniel Darabos about 9 yearsScala
for
loops are magic. I've never found the documentation for them and at this point I'm afraid to ask.