Write to multiple outputs by key Spark - one Spark job

79,981

Solution 1

This includes the codec as requested, necessary imports, and pimp as requested.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

One subtle difference to the OP is that it will prefix <keyName>= to the directory names. E.g.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Would give:

prefix/key=1/part-00000
prefix/key=2/part-00000

where prefix/my_number=1/part-00000 would contain the lines a and b, and prefix/my_number=2/part-00000 would contain the line c.

And

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

Would give:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

It should be clear how to edit for parquet.

Finally below is an example for Dataset, which is perhaps nicer that using Tuples.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}

Solution 2

If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but partitionBy(), which we need, was introduced in 1.4.)

If you're starting out with an RDD, you'll first need to convert it to a DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

In Python, this same code is:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:

people_df.write.partitionBy("number").text("people")

And you can easily use other output formats if you want:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh

Solution 3

I would do it like this which is scalable

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.

new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

Solution 4

If you potentially have many values for a given key, I think the scalable solution is to write out one file per key per partition. Unfortunately there is no built-in support for this in Spark, but we can whip something up.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(Replace PrintWriter with your choice of distributed filesystem operation.)

This makes a single pass over the RDD and performs no shuffle. It gives you one directory per key, with a number of files inside each.

Solution 5

I was in need of the same thing in Java. Posting my translation of Zhang Zhan's Scala answer to Spark Java API users:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;


class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}
Share:
79,981
samthebest
Author by

samthebest

To make me answer a question I like to answer questions on Spark, Hadoop, Big Data and Scala. I'm pretty good at Bash, git and Linux, so I can sometimes answer these questions too. I've stopped checking my filters for new questions these days, so I'm probably not answering questions which I probably could. Therefore if you think I can help, especially with Spark and Scala, then rather than me give me email out, please comment on a similar question/answer of mine with a link. Furthermore cross-linking similar questions can be nice for general SO browsing and good for SEO. My favourite answers Round parenthesis are much much better than curly braces http://stackoverflow.com/a/27686566/1586965 Underscore evangelism and in depth explanation http://stackoverflow.com/a/25763401/1586965 Generalized memoization http://stackoverflow.com/a/19065888/1586965 Monad explained in basically 2 LOCs http://stackoverflow.com/a/20707480/1586965

Updated on December 16, 2020

Comments

  • samthebest
    samthebest over 3 years

    How can you write to multiple outputs dependent on the key using Spark in a single Job.

    Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job

    E.g.

    sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
    .writeAsMultiple(prefix, compressionCodecOption)
    

    would ensure cat prefix/1 is

    a
    b
    

    and cat prefix/2 would be

    c
    

    EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.

  • samthebest
    samthebest almost 10 years
    Yes, I'd like to use the hadoop/hdfs api - i.e. use MultipleOutputFormat, but I would like to know how to do that.
  • samthebest
    samthebest almost 10 years
    This is certainly the nicest solution so far and seems to nearly do the trick. I'm a bit concerned that this will result in one file per key, which will cause problems for large data sets. If you could modify your answer so that it the number of output files per key is configurable I'd be very grateful.
  • douglaz
    douglaz almost 10 years
    @samthebest, I can do that but it will be a very specific solution. Could you update the question to say you want multiple output files per key? By the way, are you really using integer keys on your job?
  • samthebest
    samthebest almost 10 years
    Well, any key that makes sense to partition on - so something that is reasonable when we call toString on it. I'm not sure I need to update my answer as it's well known bad practice to produce large files on HDFS because it limits the types of compression you can use. If we have very large files and we have to pick a splittable compression algo, which might not be best for the job at hand. Furthermore Spark cannot currently read bzip2 (my fav splittable compression) due to a bug in Hadoop. Nevertheless I'll update my answer to be explicit. Again, many thanks.
  • Daniel Darabos
    Daniel Darabos almost 10 years
    This solution puts all the data through one node, if they all have the same key, correct? Seems like a detriment to its general scalability.
  • samthebest
    samthebest almost 10 years
    Thanks. If we where to use HDFS instead of local filesystem as we will essentially be implementing the shuffle part by hand ourselves right? Also, what happens when multiple partitions contain pairs that have the same key? Both tasks may try to write to the same file, and therefore we need some kind of synchronized file management system to keep track of creating part-XXXXX. I'm afraid this solution feels very dirty given that I'm sure a solution using MultipleOutputFormat exists.
  • samthebest
    samthebest almost 10 years
    @DanielDarabos point is correct. Surely it's possible to tweak the IdentityIntPartitioner so that for each possible key there are several partitions, say M, where one is chosen at random. We'd need to use a hash function and modulo the result by numPartitions, though there is then a problem - different keys could end up in the same partition, which I'm assuming will break the saveAsHadoopFile ? It's a non-trivial problem.
  • Daniel Darabos
    Daniel Darabos almost 10 years
    You are right that it is kind of implementing shuffle. But there is no bottleneck, I think. There is no single node which is receiving all records with a key. There is no problem with the same key coming from multiple partition, and there is no need for synchronization either. The file name is output/<key>/<partition>. So each partition writes to different files. (The partition index goes to the suffix in the example.)
  • Daniel Darabos
    Daniel Darabos almost 10 years
    MultipleOutputFormat sounds perfect for the job, and would work by the same idea. I've just never used it. I think you would just rewrite my MultiWriter to use MultipleOutputFormat instead of rolling its own key->file mapping. But the mapPartitionsWithIndex bit would be mostly unchanged.
  • samthebest
    samthebest almost 10 years
    Sorry, I misunderstood your solution (tbh I skim read). Thanks for the clarification. Yes I think with some playing around and replacing the writer code with HDFS this would work (and no bottleneck either). Thanks for your answer.
  • samthebest
    samthebest over 9 years
    I'm concerned that when we use mapPartitionsWithIndex and manually write to HDFS, then that particular partition will not necessarily output to the desired location of that partition. Therefore the additional shuffle is unnecessary and can be avoided.
  • samthebest
    samthebest over 9 years
    Please could you add all necessary import statements? I haven't tested this, but accepting the answer as it appears to be what I want. What is the point in partitionBy(new Hashpartitioner(num))?? isn't this the same as repartition(num)??
  • zhang zhan
    zhang zhan over 9 years
    It is different. hash partition will ensure that all records with the same key to go to the same partition. As I remembered repartition does not have this functionality.
  • Yiannis Gkoufas
    Yiannis Gkoufas about 9 years
    Thanks a lot for this very good solution. I was just wondering the following: how should I modify your code in order to have the output on each file sorted on the values v?
  • Adrian
    Adrian about 9 years
    You can't make an RDD inside another RDD (your 2nd line). See this ppt slideshare.net/databricks/…
  • maasg
    maasg about 9 years
    @Adrian you're right. I was missing a collect there.
  • user2848932
    user2848932 almost 9 years
    what is the generateActualKey mean? why it always return null?
  • douglaz
    douglaz almost 9 years
    @user2848932 this a hadoop interface. For hadoop, generateActualKey may generate the key based on the data. For Spark and for purpose of this problem, I guess that generateActualKey won't be called so we can return null there just to satisfy the interface.
  • user2848932
    user2848932 almost 9 years
    @douglaz by the way, when I runs your code, it says: java.lang.RuntimeException: java.lang.NoSuchMethodException: $iwC$$iwC$KeyBasedOutput.<init>() Maybe, you should add a initial function for the KeyBasedOutput class?
  • silasdavis
    silasdavis almost 9 years
    I was looking for writing multiple parquet outputs, and this a solution along these lines looks promising (only subclassing MultipleOutputFormat directly, not using MultipleTextOutputFormat). Unfortunately MutlipleOutputFormat only exists in old API MR1/mapred, whereas the AvroParquetOutputFormat and ParquetOutputFormat (supporting parquet) are written against the new API MR2/mapreduce, so it seems the same path is not open...
  • Sohaib
    Sohaib almost 9 years
    @zhangzhan This would create a single file for every key. In my case there could be potentially millions of lines for a key in which case I would ideally like a directory with output parts inside it. How does this solution enable that?
  • Sohaib
    Sohaib almost 9 years
    @zhangzhan I removed the hash partitioner as I don't really want to limit my partitions (number of keys is small. No of values in each key is large) and added the following in the RDD Multiout key.asInstanceOf[String] + Path.SEPARATOR + name. I do not know about the performance hits thought.
  • zengr
    zengr over 8 years
    Guys, I am getting this error (posted by someone else): stackoverflow.com/questions/25996822/… any suggestions?
  • perrohunter
    perrohunter over 8 years
    this looks like a great solution, specially because it deals with the result iterables, I'm getting a org.apache.spark.SparkException: Task not serializable, do you think the fs instance is causing this problem?
  • NDavis
    NDavis about 8 years
    Looks great! Is there a python equivalent?
  • Linlin
    Linlin about 8 years
    what is the different by removed the .partitionBy(new HashPartitioner(num))?
  • Nick Chammas
    Nick Chammas almost 8 years
    @NDavis - There is a different approach to accomplishing this that is available in Python. See my answer.
  • samthebest
    samthebest almost 8 years
    Can you add the equivalent Datasets code in Scala? and I'll accept as best answer. Yes some people don't care about types and like running their entire application every few minutes to find out if they have any bugs, but some of us like to catch typos like 'nubmer' the instant we typed it :) Seriously though, good answer.
  • Nick Chammas
    Nick Chammas almost 8 years
    @samthebest - Done. In Spark 2.0, DataFrame will simply be an alias for Dataset[Row], so I think my Scala example should hold the same.
  • samthebest
    samthebest almost 8 years
    Yeah, looks like there isn't a 100% typesafe way to do it, but then the original accepted answer wasn't either. I imagine in Spark 2.0 there will be a purely type safe way to do it (i.e. passing a function into partitionBy rather than a string). Anyway, just tweaked a couple things and accepted.
  • Nick Chammas
    Nick Chammas almost 8 years
    @samthebest - Just FYI, I rolled back your edit because it had a few problems: It didn't fit my style of writing; I don't know much about Datasets, so the note about Dataset[SomeCaseClass] is more appropriate as a comment; finally, Python doesn't have a makeRDD() method.
  • samthebest
    samthebest almost 8 years
    Note that if you had Dataset[SomeCaseClass] then you can just call .toDF() and the column labels will match up to SomeCaseClasses fields. This gives a little more type safety.
  • moustachio
    moustachio almost 8 years
    Is there any way to force this method to write only one file/part per partition?
  • Nick Chammas
    Nick Chammas almost 8 years
    @moustachio - Good question. I think you can force that by coalescing the DataFrame into one partition before the partitionBy(). For example: people_df.coalesce(1).write.partitionBy("number").text("peop‌​le") This may limit Spark's parallelism when writing out the data, though, depending on your data and cluster configuration.
  • rsmith54
    rsmith54 almost 7 years
    Is there an easy way to read these partitioned RDDs back into another spark job? Something with wholeTextFiles?
  • Nick Chammas
    Nick Chammas almost 7 years
    @rsmith54 - You can read each partition into a DataFrame (remember, these are DataFrames, not plain RDDs) and then union all the DataFrames to get the original DataFrame back. I dunno if there is an easier way to do it, but if there is it probably does not involve wholeTextFiles(), which is meant for small, unstructured files.
  • aks
    aks almost 7 years
    @zhangzhan Hey, I am getting following error while using this : java.lang.RuntimeException: java.lang.NoSuchMethodException : $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$RDDMultipleTextOutpu‌​tFormat.<init>()
  • Aliostad
    Aliostad over 6 years
    not sure it does not have +100 upvote, and actually had zero upvote. Very helpful, thanks!
  • volni
    volni almost 6 years
    If people already exists then the save will fail, right? What if you want to add more entires to the people directory/prefix?
  • Nick Chammas
    Nick Chammas almost 6 years
    @volni - Yes, the save will fail if the destination path already exists, unless you pass mode="overwrite" as an option. To append new entries to an existing directory, you can try mode="append". You can pass this either as an argument to the various methods like json(), parquet(), etc. or as top-level write option.
  • Aisah Hamzah
    Aisah Hamzah over 5 years
    @Aliostad, look at the dates, this was posted one-and-a-half year later. Also, it is not customary (and sometimes considered rude) at SO to post an answer to your own question (after it already has one or more valid ones) and accept it. Sometimes a situation warrants multiple answers, but then you typically keep the original answer accepted (unless it turns out to be wrong, or a new answer from another user is just so much better, but that's not the case here, the OP clearly considered the original answer correct). I can only assume the OP wasn't aware of the guidelines in this case.
  • samthebest
    samthebest over 5 years
    @Abel I'm aware of the guidelines, but I felt it necessary to post my own answer as mine "is just so much better" than all here because it's the only answer that: 1. Includes how to specify the compression codec (as requested in the OP), 2. includes how to add it as a pimp/extension method (as requested in the OP), 3. actually compiles! (includes necessary imports), 4. uses correct Scala style and formatting. It's sad that it's nearly 2019 and not everyone can even write code that compiles nor is even correct style.
  • eggie5
    eggie5 over 4 years
    I like this solution, as it doesn't use DataFrames. It works for me. I worry that it only writes 1 file per group which could be troublesome for large datasets right? For example my groups are about 150MB which is fine...
  • JP Silvashy
    JP Silvashy over 4 years
    The top answer is actually the best, it appears you basically copied his.
  • samthebest
    samthebest over 4 years
    @JPSilvashy I did try to edit the answer so that it 1. Includes how to specify the compression codec (as requested in the OP), 2. includes how to add it as a pimp/extension method (as requested in the OP), 3. actually compiles! (includes necessary imports), 4. uses correct Scala style and formatting. The poster rejected my edits, so I created a new answer. At least dozen people have found my answer more helpful than the top answer.
  • Ayoub Omari
    Ayoub Omari over 3 years
    I think this solution doesn't work for huge amount of data in each key