Using scala to dump result processed by Spark to HDFS

15,887

Solution 1

To save locally just do

some_RDD.collect()

Then save the resulting array with something like from this question. And yes if the data set is small, and can easily fit in memory you should collect and bring it to the driver of the program. Another option if the data is a little to large to store in memory is just some_RDD.coalesce(numParitionsToStoreOn). Keep in mind coalesce also takes a boolean shuffle, if you are doing calculations on the data before coalescing, you should set this to true to get more parallelism on the calculations. Coalesce will reduce the number of nodes that store data when you call some_RDD.saveAsTextFile("hdfs://namenode/path"). If the file is very small but you need it on hdfs, call repartition(1), which is the same as coalesce(1,true), this will ensure that your data is only saved on one node.

UPDATE: So if all you want to do is save three values in HDFS you can do this. sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

Basically you are just putting the 3 vars in a tuple, wrap that in a List and set the parallelism to one since the data is very small

Solution 2

Answer 1: Since you just need several scalar, I'd like to say storing them in local file system. You can first do val localValue = rdd.collect(), which will collect all data from workers to master. And then you call java.io to write things to disk.

Answer 2: You can do sc.parallelize(yourString).saveAsTextFile("hdfs://host/yourFile"). The will write things to part-000*. If you want to have all things in one file, hdfs dfs -getmerge is here to help you.

Share:
15,887
user2773013
Author by

user2773013

Updated on June 04, 2022

Comments

  • user2773013
    user2773013 almost 2 years

    I'm a bit confused to find the right way to save data into HDFS after processing them with spark.

    This is what I'm trying to do. I'm calculating min, max and SD of numeric fields. My input files have millions of rows, but output will have only around 15-20 fields. So, the output is a single value(scalar) for each field.

    For example: I will load all the rows of FIELD1 into an RDD, and at the end, I will get 3 single values for FIELD 1(MIN, MAX, SD). I concatenated these three values into temporary string. In the end, I will have 15 to twenty rows, containing 4 columns in this following format

    FIELD_NAME_1  MIN  MAX  SD
    FIELD_NAME_2  MIN  MAX  SD
    

    This is a snippet of the code:

    //create rdd
    val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
    //just get the first column
    val values = data.map(_.split(",",-1)(1))
    
    val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
    val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
    val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
    val SD = data_double.stdev
    

    So, i have 3 variables, min_value, max_value and SD that I want to store back to hdfs.

    Question 1: Since the output will be rather small, do I just save it locally on the server? or should I dump it to HDFS. Seems to me like dumping the file locally makes better sense.

    Question 2: In spark, I can just call the following to save an RDD into text file

    some_RDD.saveAsTextFile("hdfs://namenode/path")
    

    How do I accomplish the same thing in for a String variable that is not an RDD in scala? should I parallelize my result into an RDD first and then call saveAsTextFile?