How to use map-function in SPARK with Java


Solution 1

So there are a two small issues with the program. First is you probably want flatMap rather than map, since you are trying to return an RDD of words rather than an RDD of Lists of words, we can use flatMap to flatten the result. The other is, our function class also requires the type of the input it is called on. I'd replace the JavaRDD words... with:

JavaRDD<String> words = rdd.flatMap(
  new FlatMapFunction<String, String>() { public Iterable<String> call(String s) {
      return Arrays.asList(s.split("\\s*,\\s*"));

Solution 2

This is the sample of code from tutorial in Java.

Scala code :

  /* 1*/    val includedStatuses = Set("COMPLETED", "REPAID")
/* 2*/    val now = new Date();
/* 3*/    sc.cassandraTable("cc", "cc_transactions")
/* 4*/      .select("customerid", "amount", "card", "status", "id")
/* 5*/      .where("id < minTimeuuid(?)", now)
/* 6*/      .filter(includedStatuses contains _.getString("status"))
/* 7*/      .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/      .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/      .reduceByKey(_ + _)
/*10*/      .map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/*11*/      .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))

Java code :

SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ProjectPropertie.context);
        JavaRDD<Balance> balances = functions.cassandraTable(ProjectPropertie.KEY_SPACE, Transaction.TABLE_NAME)
                .select("customerid", "amount", "card", "status", "id")
                .where("id < minTimeuuid(?)", date)
                .filter( row -> row.getString("status").equals("COMPLETED") )
                .keyBy(row -> new Tuple2<>(row.getString("customerid"), row.getString("card")))
                .mapToPair( row -> new Tuple2<>(row._1,row._2.getInt("amount")))
                .reduceByKey( (i1,i2) -> i1.intValue()+i2.intValue())
                .flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Integer>, Balance>() {

                    private static final long serialVersionUID = 1L;

                    public Iterator<Balance> call(Tuple2<Tuple2<String, String>, Integer> r) throws Exception {
                        List<Balance> list = new ArrayList<Balance>();
                        list.add(new Balance(r._1._1, r._1._2, r._2,reportDate));
                        return list.iterator();

Where ProjectPropertie.context is SparkContext Here is how you can get SparkContext (only one context per JVM you should use):

   SparkConf conf = new SparkConf(true).setAppName("App_name").setMaster("local[2]").set("spark.executor.memory", "1g")
                            .set("", ",")
                            .set("spark.cassandra.connection.port", "9042")
                            .set("spark.cassandra.auth.username", "cassandra")
                            .set("spark.cassandra.auth.password", "cassandra");
   SparkContext context = new SparkContext(conf);

For datasource I'm using Cassandra, where is docker container where my Cassandra node is running and is the host (in this case is local)

Author by


I am a computer science student from Germany. I spend a lot of my free time with programming apps or webpages.

Updated on August 21, 2020


  • progNewbie
    progNewbie over 3 years

    I try to read a csv-file in spark and I want to split the lines, which are comma-seperated, so that I have an RDD with a two dimensional Array. I am very new to Spark.

    I tried to do this:

    public class SimpleApp 
        public static void main(String[] args) throws Exception 
            String master = "local[2]";
            String csvInput = "/home/userName/Downloads/countrylist.csv";
            String csvOutput = "/home/userName/Downloads/countrylist";
            JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
            JavaRDD<String> csvData = sc.textFile(csvInput, 1);
            JavaRDD<String> words = Function <List<String>>() { //line 43
                  public List<String> call(String s) {
                    return Arrays.asList(s.split("\\s*,\\s*"));

    This should split the lines and return an ArrayList. But I am not sure about this. I get this error:[43,58] wrong number of type arguments; required 2