How to use map-function in SPARK with Java

33,970

Solution 1

So there are a two small issues with the program. First is you probably want flatMap rather than map, since you are trying to return an RDD of words rather than an RDD of Lists of words, we can use flatMap to flatten the result. The other is, our function class also requires the type of the input it is called on. I'd replace the JavaRDD words... with:

JavaRDD<String> words = rdd.flatMap(
  new FlatMapFunction<String, String>() { public Iterable<String> call(String s) {
      return Arrays.asList(s.split("\\s*,\\s*"));
    }});

Solution 2

This is the sample of code from https://opencredo.com/data-analytics-using-cassandra-and-spark/ tutorial in Java.

Scala code :

  /* 1*/    val includedStatuses = Set("COMPLETED", "REPAID")
/* 2*/    val now = new Date();
/* 3*/    sc.cassandraTable("cc", "cc_transactions")
/* 4*/      .select("customerid", "amount", "card", "status", "id")
/* 5*/      .where("id < minTimeuuid(?)", now)
/* 6*/      .filter(includedStatuses contains _.getString("status"))
/* 7*/      .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/      .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/      .reduceByKey(_ + _)
/*10*/      .map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/*11*/      .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))

Java code :

SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ProjectPropertie.context);
        JavaRDD<Balance> balances = functions.cassandraTable(ProjectPropertie.KEY_SPACE, Transaction.TABLE_NAME)
                .select("customerid", "amount", "card", "status", "id")
                .where("id < minTimeuuid(?)", date)
                .filter( row -> row.getString("status").equals("COMPLETED") )
                .keyBy(row -> new Tuple2<>(row.getString("customerid"), row.getString("card")))
                .mapToPair( row -> new Tuple2<>(row._1,row._2.getInt("amount")))
                .reduceByKey( (i1,i2) -> i1.intValue()+i2.intValue())
                .flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Integer>, Balance>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterator<Balance> call(Tuple2<Tuple2<String, String>, Integer> r) throws Exception {
                        List<Balance> list = new ArrayList<Balance>();
                        list.add(new Balance(r._1._1, r._1._2, r._2,reportDate));
                        return list.iterator();
                    }
                }).cache();

Where ProjectPropertie.context is SparkContext Here is how you can get SparkContext (only one context per JVM you should use):

   SparkConf conf = new SparkConf(true).setAppName("App_name").setMaster("local[2]").set("spark.executor.memory", "1g")
                            .set("spark.cassandra.connection.host", "127.0.0.1,172.17.0.2")
                            .set("spark.cassandra.connection.port", "9042")
                            .set("spark.cassandra.auth.username", "cassandra")
                            .set("spark.cassandra.auth.password", "cassandra");
   SparkContext context = new SparkContext(conf);

For datasource I'm using Cassandra, where 172.17.0.2 is docker container where my Cassandra node is running and 127.0.0.1 is the host (in this case is local)

Share:
33,970
progNewbie
Author by

progNewbie

I am a computer science student from Germany. I spend a lot of my free time with programming apps or webpages.

Updated on August 21, 2020

Comments

  • progNewbie
    progNewbie over 3 years

    I try to read a csv-file in spark and I want to split the lines, which are comma-seperated, so that I have an RDD with a two dimensional Array. I am very new to Spark.

    I tried to do this:

    public class SimpleApp 
    {   
        public static void main(String[] args) throws Exception 
        {       
            String master = "local[2]";
            String csvInput = "/home/userName/Downloads/countrylist.csv";
            String csvOutput = "/home/userName/Downloads/countrylist";
    
            JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
    
            JavaRDD<String> csvData = sc.textFile(csvInput, 1);
            JavaRDD<String> words = csvData.map(new Function <List<String>>() { //line 43
                  @Override
                  public List<String> call(String s) {
                    return Arrays.asList(s.split("\\s*,\\s*"));
                  }
                });
    
            words.saveAsTextFile(csvOutput);
        }
    }
    

    This should split the lines and return an ArrayList. But I am not sure about this. I get this error:

    SimpleApp.java:[43,58] wrong number of type arguments; required 2