How can I make (Spark1.6) saveAsTextFile to append existing file?

15,215

Solution 1

worked on Spark 1.5 , I think this is right usage..

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).**partitionBy**("parameter1", "parameter2").save(path);

Solution 2

As spark uses HDFS, this is the typical output it produces. You can use the FileUtil to merge the files back into one. It is an efficient solution as it doesn't require spark to collect whole data into single memory by partitioning it into 1. This is the approach i follow.

import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}   

val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val hdfs = FileSystem.get(hadoopConf)
val mergedPath = "merged-" + filePath + ".json"
val merged = new Path(mergedPath)
if (hdfs.exists(merged)) {
  hdfs.delete(merged, true)
}
df.wirte.mode(SaveMode.Append).json(filePath)

FileUtil.copyMerge(hdfs, path, hdfs, merged, false, hadoopConf, null)

You can read the single file using mergedPath location. Hope it helps.

Solution 3

You can try this method which I find from somewhere. Process Spark Streaming rdd and store to single HDFS file

    import org.apache.hadoop.fs.{ FileSystem, FileUtil, Path }

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
  val sourceFile = hdfsServer + "/tmp/"
  rdd.saveAsTextFile(sourceFile)
  val dstPath = hdfsServer + "/final/"
  merge(sourceFile, dstPath, fileName)
}

def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  val destinationPath = new Path(dstPath)
  if (!hdfs.exists(destinationPath)) {
    hdfs.mkdirs(destinationPath)
  }
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
}
Share:
15,215
yjxyjx
Author by

yjxyjx

Updated on June 19, 2022

Comments

  • yjxyjx
    yjxyjx almost 2 years

    In SparkSQL,I use DF.wirte.mode(SaveMode.Append).json(xxxx),but this method get these files likeenter image description here

    the filename is too complex and random,I can't use api to get.So I want to use saveAstextfile ,beacuse filename is not complex and regular, but I don't know how to append file in same diretory?Appreciate for your time.

  • yjxyjx
    yjxyjx almost 8 years
    Thanks, and I want to achieve that ,for example, In HDFS, I have 3 files like part-00000,part-00001,part-00002,my demand is to make these files turn into part-00000,So can I use copyMerge into existed file?
  • NehaM
    NehaM almost 8 years
    I am not very clear about your question. If you are asking whether you can merge part-00000,part-00001,part-00002 into part-00000,, it is what the above code does. You just need to formulate the mergedPath as you wish. Is that what you are looking for?
  • UninformedUser
    UninformedUser almost 7 years
    There is the coalesce function in Spark to merge everything into a single file.
  • Borja
    Borja over 5 years
    I think you forgot to import : "import org.apache.hadoop.conf.Configuration"