How do I return multiple key-value pairs in Scala using Spark's map transformation?

11,542

Solution 1

You are forgetting the brackets after the arrow. You can only omit them if it is a simple expression (one expression).

file.map(line => {
    //multiple lines of code here
})

Full answer after edits:

case class Index(i:Integer, j:Integer)
case class Val(x:Double, y:Double)

val data = sc.parallelize(List("1,2,3", "4,5,6", "7,8,9"))
data.flatMap(line=>{
val arr = line.split(",")
 val doubleSeq = for(i <- 0 until arr.length) yield {
  val x = arr(i).toDouble
  for(j <- (i+1) until arr.length) yield {
   val y = arr(j).toDouble
   val k = Index(i,j)
   val v = Val(x,y)
   (k,v)
  }
 }
 doubleSeq.flatten
})

There were a multitude of problems actually:

  • Notice that I changed your classes to be case classes as they are serializable. Otherwise, you will need to implement Serializable
  • I changed map to flatMap, as well as flattened your array as one flatMap would still leave you with an inner array. Now, the combination of the two will yield you your RDD[(Index, Val)], which can now be implicitly used with reduceByKey
  • I turned your for loop into a for comprehension by using yield. You were getting a final type of Unit because the return type of a for loop is Unit

Solution 2

Use RDD.flatMap and yield a list from the for loop:

val file = sc.textFile("/path/to/test.csv")
file.flatMap { line =>
  val arr = line.split(",")
  for {
    i <- 0 until arr.length
    j <- (i + 1) until arr.length
  } yield {
    val x = arr(i).toDouble
    val y = arr(j).toDouble
    val k = new Index(i, j)
    val v = new Val(x, y)
    (k, v)
  }
}.collect
Share:
11,542

Related videos on Youtube

Jane Wayne
Author by

Jane Wayne

Updated on June 04, 2022

Comments

  • Jane Wayne
    Jane Wayne almost 2 years

    I'm new to Scala and Spark. I'm trying to return multiple key-value pairs during the map transformation. My input data is a simple CSV file.

    1, 2, 3
    4, 5, 6
    7, 8, 9
    

    My Scala script looks like the following.

    class Key(_i:Integer, _j:Integer) {
     def i = _i
     def j = _j
    }
    class Val(_x:Double, _y:Double) {
     def x = _x
     def y = _y
    }
    val arr = "1,2,3".split(",")
    for(i <- 0 until arr.length) {
     val x = arr(i).toDouble
     for(j <- 0 until arr.length) {
      val y = arr(j).toDouble
      val k = new Key(i, j)
      val v = new Val(x, y)
      //note that i want to return the tuples, (k, v)
     }
    }
    

    I want to be able to use the for loop and data structures above to return multiple tuples (k, v). Something similar to the code below.

    val file = sc.textFile("/path/to/test.csv")
    file.map(line => {
     val arr = line.split(",")
     for(i <- 0 until arr.length) {
      val x = arr(i).toDouble
      for(j <- (i+1) until arr.length) {
       val y = arr(j).toDouble
       val k = new Index(i,j)
       val v = new Val(x,y)
       (k,v)
      }
     }
    }).collect //reduceByKey is not there, reduce is there, but not what i want
    

    When I copy/paste the code above into the lambda expression (and run on the Scala REPL shell) I get the following error:

    error: illegal start of simple expression
    val arr = line.split(",")
    ^
    

    I realize also that I am still stuck in imperative/procedural style programming thinking, so please bear with me (and a newbie at Scala/Spark).

  • Jane Wayne
    Jane Wayne about 9 years
    Your suggestion helped. Now that error goes away. But when I add the return statement, return (k,v), I get the following: error: return outside method definition.
  • Justin Pihony
    Justin Pihony about 9 years
    I did not see that...DO NOT return in scala, the final statement is the return value. That will fix it I would think
  • Jane Wayne
    Jane Wayne about 9 years
    Do you know how I can check to see if the lambda function is correct? When I do file.map(line => {...}).collect, all I see is Array[Unit] = Array((),(),...()). What I want to do next is reduce all the values by the same key. However, autocomplete (hitting tab) indicates that reduceByKey is not a member of org.apache.spark.rdd.RDD[Unit]. I'm still stuck in the MapReduce state of mind.
  • Jane Wayne
    Jane Wayne about 9 years
    I posted the code that works now with your help. Note that in the example above, I use collect to try and inspect what's actually in that RDD. Meanwhile, I'm reading this article blog.cloudera.com/blog/2014/09/… and it seems to suggest that the map function in Scala/Spark has 1 input and 1 output, and because of what I want to do, I might have to use the flatMap function.
  • Alexander Aleksandrovič Klimov
    Alexander Aleksandrovič Klimov about 9 years
    Yes, flatMap seems the right thing. Not exactly the same as your code, but this question stackoverflow.com/questions/29472603/… also uses flatMap to generate more than one output row from each input row. It may point you in the right direction?
  • Justin Pihony
    Justin Pihony about 9 years
    @JaneWayne I just posted a full answer...you actually had a multitude of problems. And, the code can probably be cleaned up even further, but should work for your purposes :)
  • Daniel Darabos
    Daniel Darabos about 9 years
    Scala for loops are magic. I've never found the documentation for them and at this point I'm afraid to ask.