How to save all the output of spark sql query into a text file

17,146

Solution 1

I found out why it does that, in case anybody else has the same problem. When you do foreachRDD it essentially executes your function on each RDD of the DStream you save it all to the same file. So they overwrite each others data and the first or last writer wins. The easiest fix is to save them in a file with a unique name. So I used saveAsTextFile(path + time().milliseconds().toString()) and fixed the problem. But, you could have the same timestamp twice so I make this even more unique by adding a random number.

Solution 2

That might be because you are not specifying the mode of writing. Instead use this,

df.write.mode('append').text("/path/to/file")

P.s: I am not used to doing it in java, the one I gave is a scala/python equivalent

Solution 3

val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("select * from tableName)
df.write.text("/path/to/file")

it will be written as a partitioned text file, so you'll have your results spaced amongst a bunch of files labeled part-00000, but it will be there.

Share:
17,146
Hoda Moradi
Author by

Hoda Moradi

Updated on June 17, 2022

Comments

  • Hoda Moradi
    Hoda Moradi almost 2 years

    I am writing a simple consumer program using spark streaming. My code save some of the data in to the file but not ALL of the data. Can anyone help me how to fix this. I am not sure where I am losing the data. I get the data from kafka topic then I apply my schema from java Bean class.

    public class ConsumerFile {
    public static void main(String[] args){
    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);
    String topic = args[0];
    final String path=new String(args[2]);
    String broker = args[1];
    SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));
    
        HashMap<String, String> kafkaParams = new HashMap<String, String>();
    
        kafkaParams.put("metadata.broker.list", broker);
        JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
        ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
    topicsSet
    );
    
    JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>() 
    
                                             {
                       public String call(Tuple2<String, String> message)
    
                                                 {
                                                     return message._2();}});
    words.foreachRDD(
              new Function2<JavaRDD<String>, Time, Void>() {
           public Void call(JavaRDD<String> rdd, Time time) {
       SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
                  // Convert RDD[String] to RDD[case class] to DataFrame
                  JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
                    public JavaRow call(String line) throws Exception{
                    String[] fields = line.split(",");
                      JavaRow record = new JavaRow(fields[0], fields[1],fields[2]  );
    
                      return record;
    
                    }
    
                  });
    
                  DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
                  wordsDataFrame.registerTempTable("Data");
                  DataFrame wDataFrame = sqlContext.sql(" select * from Data");  
                  if(!wDataFrame.rdd().isEmpty()){
                 wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
                  return null;
                }} );
    ssc.start();
     ssc.awaitTermination();}
    

    }