Spark Standalone Mode: How to compress spark output written to HDFS

15,085

Solution 1

The method saveAsTextFile takes an additional optional parameter of the codec class to use. So for your example it should be something like this to use gzip:

someMap.saveAsTextFile("hdfs://HOST:PORT/out", classOf[GzipCodec])

UPDATE

Since you're using 0.7.2 you might be able to port the compression code via configuration options that you set at startup. I'm not sure if this will work exactly, but you need to go from this:

conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)

to something like this:

System.setProperty("spark.hadoop.mapred.output.compress", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
System.setProperty("spark.hadoop.mapred.output.compression.type", "BLOCK")

If you get it to work, posting your config would probably be helpful to others as well.

Solution 2

Another way to save gzipped files to HDFS or Amazon S3 directory system is to use the saveAsHadoopFile method.

someMap is RDD[(K,V)], if you have someMap as RDD[V], you can call someMap.map(line=>(line, "") to use saveAsHadoopFile method.

import org.apache.hadoop.io.compress.GzipCodec

someMap.saveAsHadoopFile(output_folder_path, classOf[String], classOf[String], classOf[MultipleTextOutputFormat[String, String]], classOf[GzipCodec])

Solution 3

For newer Spark release, please do the following in your spark-defaults.xml file. (mapred is derecated).

<property>
    <name>mapreduce.output.fileoutputformat.compress</name>
    <value>true</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.codec</name>
    <value>GzipCodec</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.type</name>
    <value>BLOCK</value>
</property>

Solution 4

This is a simplest/shortest way to do compression quickly for all most all versions of the spark.

import org.apache.hadoop.io.SequenceFile.CompressionType

 /**
   * Set compression configurations to Hadoop `Configuration`.
   * `codec` should be a full class path
   */
  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) // "BLOCK" as string
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

where conf is spark.sparkContext.hadoopConfiguration

codec String parameter options in the above method are

 1.none 
 2.uncompressed 
 3.bzip2 
 4.deflate 
 5.gzip 
 6.lz4 
 7.snappy
Share:
15,085

Related videos on Youtube

ptikobj
Author by

ptikobj

Updated on June 13, 2022

Comments

  • ptikobj
    ptikobj almost 2 years

    Related to my other question, but distinct:

    someMap.saveAsTextFile("hdfs://HOST:PORT/out")
    

    If I save an RDD to HDFS, how can I tell spark to compress the output with gzip? In Hadoop, it is possible to set

    mapred.output.compress = true
    

    and choose the compression algorithm with

    mapred.output.compression.codec = <<classname of compression codec>>
    

    How would I do this in spark? Will this work as well?

    edit: using spark-0.7.2

  • ptikobj
    ptikobj almost 11 years
    with which version of spark does this work? I'm using spark-0.7.2 and I get an error at compiletime: error: too many arguments for method saveAsTextFile. I saw that this was discussed though.
  • ptikobj
    ptikobj almost 11 years
    I see that it is in the newest spark-0.8.0. Will have to pull it as it seems since this is a rather important feature.
  • Noah
    Noah almost 11 years
    ah, that makes sense. I've been working with the master branch, not 0.7.2.
  • ptikobj
    ptikobj almost 11 years
    I've tested your second snippet (System.setProperty(...) [...]) and it immediately worked with 0.7.2. Thanks :)
  • sw1nn
    sw1nn almost 9 years
    @noah You're setting spark.hadoop.mapred.output.compression.codec twice, which is redundant unless I'm missing something?
  • lisak
    lisak almost 8 years
    Wondering whether it is possible to avoid the hadoopish format when storing data to a file. I can't use the directory with _SUCCES and part-* file. I just need a specifical named single file... I'm using s3 storage
  • nikk
    nikk over 7 years
  • nikk
    nikk over 7 years
    Is it possible to set thes parameters in a similar manner in spark-defaults.xml instead, so every job could use it? I tried replicating the settings into spark-defaults.xml but the settings seem not to be picked up.
  • nikk
    nikk over 7 years
    Is it possible to set thes parameters in a similar manner in spark-defaults.xml instead, so every job could use it? I tried replicating the settings into spark-defaults.xml but the settings seem not to be picked up.

Related