Iterate through a Java RDD by row

19,819

Solution 1

As mattinbits said in the comments, you want a map instead of a foreach, since you want to return values. What a map does basically is to transform your data: for each row of your RDD you perform an operation and return one value for each row. What you need can be achieved like this:

import org.apache.spark.api.java.function.Function;

...

SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);

JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD();
JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() {
    @Override
    public double[] call(String row) throws Exception {
        return splitStringtoDoubles(row);
    }

    private double[] splitStringtoDoubles(String s) {
        String[] splitVals = s.split("\\t");
        Double[] vals = new Double[splitVals.length];
        for(int i=0; i < splitVals.length; i++) {
            vals[i] = Double.parseDouble(splitVals[i]);
        }
        return vals;
    }
});

List<double[]> whatYouWant = whatYouWantRdd.collect();

So that you know how Spark works, you perform actions or transformations on your RDD. For instance, here we are transforming our RDD using a map function. You need to create this function yourself, this time with an anonymous org.apache.spark.api.java.function.Function which forces you to override the method call, where you receive a row of your RDD and return a value.

Solution 2

Just because it's interesting to compare the verboseness of the Java vs Scala API for Spark, here's a Scala version:

import org.apache.spark.{SparkContext, SparkConf}

class example extends App {
  val conf = new SparkConf().setMaster("local").setAppName("Spark example")
  val sc = new SparkContext(conf)

  val inputData = List(
    "1.2\t2.7\t3.8",
    "4.3\t5.1\t6.3"
  )

  val inputRDD = sc.parallelize(inputData)
  val arrayOfDoubleRDD = inputRDD.map(_.split("\t").map(_.toDouble))
}
Share:
19,819
Katya Willard
Author by

Katya Willard

Programmer and mathematician working in machine learning, especially natural language processing, information retrieval, and search. Python all day.

Updated on July 25, 2022

Comments

  • Katya Willard
    Katya Willard over 1 year

    I would like to iterate through an RDD of strings and "do something" to each string. The output should be double[][]. Here is an example with a for loop. I understand I need to use (I think) the foreach function for Java RDDs. However, I have no idea how to understand the syntax. Documentation is not particularly helpful. I do not have Java 8.

    Here is an example of what I would like to do if I could use a regular for loop.

    public class PCA {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("PCA Example");
            SparkContext sc = new SparkContext(conf);
    
            RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);
    
            // here is the "type" of code I would like to execute
            // 30 because I have 30 variables
            double[][] vals = new double[data.count()][30];
    
            double[] temp;
            for (int i = 0; i < data.count(); i++) {
                temp = splitStringtoDoubles(data[i]);
                vals[i] = temp;
            }
        }
    
        private static double[] splitStringtoDoubles(String s) {
            String[] splitVals = s.split("\\t");
            Double[] vals = new Double[splitVals.length];
            for (int i = 0; i < splitVals.length; i++) {
                vals[i] = Double.parseDouble(splitVals[i]);
            }
        }
    
    }
    

    I understand that foreach seems to require a function that has a void return type. Not sure how to work with that. Here is what I have attempted so far (obviously the syntax is wrong):

        double[][] matrix = new double[data.count()][30];
        foreach(String s : data) {
            String[] splitvals = s.split("\\t");
            double[] vals = Double.parseDouble(splitvals);
            matrix[s] = vals; 
        }
    
  • Katya Willard
    Katya Willard over 8 years
    Can't agree with you more that Scala is better. I've done some work in Python for Spark and it's amazing how easy it is to use functions like map. Alas, I have to use Java for this case.
  • Balduz
    Balduz over 8 years
    I too prefer Scala for Spark, as Java is much more verbose... Unfortunately my Scala skills are nonexistent :(
  • Balaji Reddy
    Balaji Reddy almost 8 years
    Collect action is not recommended action since it pulls all the data to driver program which is highly expensive.