Write to multiple outputs by key Spark - one Spark job
Solution 1
This includes the codec as requested, necessary imports, and pimp as requested.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
def writeAsMultiple(prefix: String, codec: String,
keyName: String = "key")
(implicit sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
rdd.toDF(keyName, "_2").write.partitionBy(keyName)
.format("text").option("codec", codec).save(prefix)
}
}
val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
One subtle difference to the OP is that it will prefix <keyName>=
to the directory names. E.g.
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Would give:
prefix/key=1/part-00000
prefix/key=2/part-00000
where prefix/my_number=1/part-00000
would contain the lines a
and b
, and prefix/my_number=2/part-00000
would contain the line c
.
And
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
Would give:
prefix/foo=1/part-00000
prefix/foo=2/part-00000
It should be clear how to edit for parquet
.
Finally below is an example for Dataset
, which is perhaps nicer that using Tuples.
implicit class PimpedDataset[T](dataset: Dataset[T]) {
def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
dataset.write.partitionBy(field)
.format("text").option("codec", codec).save(prefix)
}
}
Solution 2
If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but partitionBy()
, which we need, was introduced in 1.4.)
If you're starting out with an RDD, you'll first need to convert it to a DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
In Python, this same code is:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:
people_df.write.partitionBy("number").text("people")
And you can easily use other output formats if you want:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh
Solution 3
I would do it like this which is scalable
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Split" + args(1))
val sc = new SparkContext(conf)
sc.textFile("input/path")
.map(a => (k, v)) // Your own implementation
.partitionBy(new HashPartitioner(num))
.saveAsHadoopFile("output/path", classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
spark.stop()
}
}
Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.
new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.
Solution 4
If you potentially have many values for a given key, I think the scalable solution is to write out one file per key per partition. Unfortunately there is no built-in support for this in Spark, but we can whip something up.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.mapPartitionsWithIndex { (p, it) =>
val outputs = new MultiWriter(p.toString)
for ((k, v) <- it) {
outputs.write(k.toString, v)
}
outputs.close
Nil.iterator
}
.foreach((x: Nothing) => ()) // To trigger the job.
// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
def write(key: String, value: Any) = {
if (!writers.contains(key)) {
val f = new java.io.File("output/" + key + "/" + suffix)
f.getParentFile.mkdirs
writers(key) = new java.io.PrintWriter(f)
}
writers(key).println(value)
}
def close = writers.values.foreach(_.close)
}
(Replace PrintWriter
with your choice of distributed filesystem operation.)
This makes a single pass over the RDD and performs no shuffle. It gives you one directory per key, with a number of files inside each.
Solution 5
I was in need of the same thing in Java. Posting my translation of Zhang Zhan's Scala answer to Spark Java API users:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
return key.toString();
}
}
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Split Job")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
sc.parallelize(Arrays.asList(strings))
// The first character of the string is the key
.mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
.saveAsHadoopFile("output/", String.class, String.class,
RDDMultipleTextOutputFormat.class);
sc.stop();
}
}
samthebest
To make me answer a question I like to answer questions on Spark, Hadoop, Big Data and Scala. I'm pretty good at Bash, git and Linux, so I can sometimes answer these questions too. I've stopped checking my filters for new questions these days, so I'm probably not answering questions which I probably could. Therefore if you think I can help, especially with Spark and Scala, then rather than me give me email out, please comment on a similar question/answer of mine with a link. Furthermore cross-linking similar questions can be nice for general SO browsing and good for SEO. My favourite answers Round parenthesis are much much better than curly braces http://stackoverflow.com/a/27686566/1586965 Underscore evangelism and in depth explanation http://stackoverflow.com/a/25763401/1586965 Generalized memoization http://stackoverflow.com/a/19065888/1586965 Monad explained in basically 2 LOCs http://stackoverflow.com/a/20707480/1586965
Updated on December 16, 2020Comments
-
samthebest over 3 years
How can you write to multiple outputs dependent on the key using Spark in a single Job.
Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job
E.g.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .writeAsMultiple(prefix, compressionCodecOption)
would ensure
cat prefix/1
isa b
and
cat prefix/2
would bec
EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.
-
samthebest almost 10 yearsYes, I'd like to use the hadoop/hdfs api - i.e. use
MultipleOutputFormat
, but I would like to know how to do that. -
samthebest almost 10 yearsThis is certainly the nicest solution so far and seems to nearly do the trick. I'm a bit concerned that this will result in one file per key, which will cause problems for large data sets. If you could modify your answer so that it the number of output files per key is configurable I'd be very grateful.
-
douglaz almost 10 years@samthebest, I can do that but it will be a very specific solution. Could you update the question to say you want multiple output files per key? By the way, are you really using integer keys on your job?
-
samthebest almost 10 yearsWell, any key that makes sense to partition on - so something that is reasonable when we call
toString
on it. I'm not sure I need to update my answer as it's well known bad practice to produce large files on HDFS because it limits the types of compression you can use. If we have very large files and we have to pick a splittable compression algo, which might not be best for the job at hand. Furthermore Spark cannot currently read bzip2 (my fav splittable compression) due to a bug in Hadoop. Nevertheless I'll update my answer to be explicit. Again, many thanks. -
Daniel Darabos almost 10 yearsThis solution puts all the data through one node, if they all have the same key, correct? Seems like a detriment to its general scalability.
-
samthebest almost 10 yearsThanks. If we where to use HDFS instead of local filesystem as we will essentially be implementing the shuffle part by hand ourselves right? Also, what happens when multiple partitions contain pairs that have the same key? Both tasks may try to write to the same file, and therefore we need some kind of synchronized file management system to keep track of creating part-XXXXX. I'm afraid this solution feels very dirty given that I'm sure a solution using
MultipleOutputFormat
exists. -
samthebest almost 10 years@DanielDarabos point is correct. Surely it's possible to tweak the
IdentityIntPartitioner
so that for each possible key there are several partitions, say M, where one is chosen at random. We'd need to use a hash function and modulo the result bynumPartitions
, though there is then a problem - different keys could end up in the same partition, which I'm assuming will break thesaveAsHadoopFile
? It's a non-trivial problem. -
Daniel Darabos almost 10 yearsYou are right that it is kind of implementing shuffle. But there is no bottleneck, I think. There is no single node which is receiving all records with a key. There is no problem with the same key coming from multiple partition, and there is no need for synchronization either. The file name is
output/<key>/<partition>
. So each partition writes to different files. (The partition index goes to thesuffix
in the example.) -
Daniel Darabos almost 10 years
MultipleOutputFormat
sounds perfect for the job, and would work by the same idea. I've just never used it. I think you would just rewrite myMultiWriter
to useMultipleOutputFormat
instead of rolling its own key->file mapping. But themapPartitionsWithIndex
bit would be mostly unchanged. -
samthebest almost 10 yearsSorry, I misunderstood your solution (tbh I skim read). Thanks for the clarification. Yes I think with some playing around and replacing the writer code with HDFS this would work (and no bottleneck either). Thanks for your answer.
-
samthebest over 9 yearsI'm concerned that when we use
mapPartitionsWithIndex
and manually write to HDFS, then that particular partition will not necessarily output to the desired location of that partition. Therefore the additional shuffle is unnecessary and can be avoided. -
samthebest over 9 yearsPlease could you add all necessary import statements? I haven't tested this, but accepting the answer as it appears to be what I want. What is the point in
partitionBy(new Hashpartitioner(num))
?? isn't this the same asrepartition(num)
?? -
zhang zhan over 9 yearsIt is different. hash partition will ensure that all records with the same key to go to the same partition. As I remembered repartition does not have this functionality.
-
Yiannis Gkoufas about 9 yearsThanks a lot for this very good solution. I was just wondering the following: how should I modify your code in order to have the output on each file sorted on the values v?
-
Adrian about 9 yearsYou can't make an RDD inside another RDD (your 2nd line). See this ppt slideshare.net/databricks/…
-
maasg about 9 years@Adrian you're right. I was missing a collect there.
-
user2848932 almost 9 yearswhat is the generateActualKey mean? why it always return null?
-
douglaz almost 9 years@user2848932 this a hadoop interface. For hadoop, generateActualKey may generate the key based on the data. For Spark and for purpose of this problem, I guess that generateActualKey won't be called so we can return null there just to satisfy the interface.
-
user2848932 almost 9 years@douglaz by the way, when I runs your code, it says: java.lang.RuntimeException: java.lang.NoSuchMethodException: $iwC$$iwC$KeyBasedOutput.<init>() Maybe, you should add a initial function for the KeyBasedOutput class?
-
silasdavis almost 9 yearsI was looking for writing multiple parquet outputs, and this a solution along these lines looks promising (only subclassing MultipleOutputFormat directly, not using MultipleTextOutputFormat). Unfortunately MutlipleOutputFormat only exists in old API MR1/mapred, whereas the AvroParquetOutputFormat and ParquetOutputFormat (supporting parquet) are written against the new API MR2/mapreduce, so it seems the same path is not open...
-
Sohaib almost 9 years@zhangzhan This would create a single file for every key. In my case there could be potentially millions of lines for a key in which case I would ideally like a directory with output parts inside it. How does this solution enable that?
-
Sohaib almost 9 years@zhangzhan I removed the hash partitioner as I don't really want to limit my partitions (number of keys is small. No of values in each key is large) and added the following in the RDD Multiout
key.asInstanceOf[String] + Path.SEPARATOR + name
. I do not know about the performance hits thought. -
zengr over 8 yearsGuys, I am getting this error (posted by someone else): stackoverflow.com/questions/25996822/… any suggestions?
-
perrohunter over 8 yearsthis looks like a great solution, specially because it deals with the result iterables, I'm getting a org.apache.spark.SparkException: Task not serializable, do you think the fs instance is causing this problem?
-
NDavis about 8 yearsLooks great! Is there a python equivalent?
-
Linlin about 8 yearswhat is the different by removed the .partitionBy(new HashPartitioner(num))?
-
Nick Chammas almost 8 years@NDavis - There is a different approach to accomplishing this that is available in Python. See my answer.
-
samthebest almost 8 yearsCan you add the equivalent
Dataset
s code in Scala? and I'll accept as best answer. Yes some people don't care about types and like running their entire application every few minutes to find out if they have any bugs, but some of us like to catch typos like 'nubmer' the instant we typed it :) Seriously though, good answer. -
Nick Chammas almost 8 years@samthebest - Done. In Spark 2.0,
DataFrame
will simply be an alias forDataset[Row]
, so I think my Scala example should hold the same. -
samthebest almost 8 yearsYeah, looks like there isn't a 100% typesafe way to do it, but then the original accepted answer wasn't either. I imagine in Spark 2.0 there will be a purely type safe way to do it (i.e. passing a function into
partitionBy
rather than a string). Anyway, just tweaked a couple things and accepted. -
Nick Chammas almost 8 years@samthebest - Just FYI, I rolled back your edit because it had a few problems: It didn't fit my style of writing; I don't know much about Datasets, so the note about
Dataset[SomeCaseClass]
is more appropriate as a comment; finally, Python doesn't have amakeRDD()
method. -
samthebest almost 8 yearsNote that if you had
Dataset[SomeCaseClass]
then you can just call.toDF()
and the column labels will match up toSomeCaseClass
es fields. This gives a little more type safety. -
moustachio almost 8 yearsIs there any way to force this method to write only one file/part per partition?
-
Nick Chammas almost 8 years@moustachio - Good question. I think you can force that by coalescing the DataFrame into one partition before the
partitionBy()
. For example:people_df.coalesce(1).write.partitionBy("number").text("people")
This may limit Spark's parallelism when writing out the data, though, depending on your data and cluster configuration. -
rsmith54 almost 7 yearsIs there an easy way to read these partitioned RDDs back into another spark job? Something with
wholeTextFiles
? -
Nick Chammas almost 7 years@rsmith54 - You can read each partition into a DataFrame (remember, these are DataFrames, not plain RDDs) and then union all the DataFrames to get the original DataFrame back. I dunno if there is an easier way to do it, but if there is it probably does not involve
wholeTextFiles()
, which is meant for small, unstructured files. -
aks almost 7 years@zhangzhan Hey, I am getting following error while using this : java.lang.RuntimeException: java.lang.NoSuchMethodException : $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$RDDMultipleTextOutputFormat.<init>()
-
Aliostad over 6 yearsnot sure it does not have +100 upvote, and actually had zero upvote. Very helpful, thanks!
-
volni almost 6 yearsIf
people
already exists then the save will fail, right? What if you want to add more entires to thepeople
directory/prefix? -
Nick Chammas almost 6 years@volni - Yes, the save will fail if the destination path already exists, unless you pass
mode="overwrite"
as an option. To append new entries to an existing directory, you can trymode="append"
. You can pass this either as an argument to the various methods likejson()
,parquet()
, etc. or as top-level write option. -
Aisah Hamzah over 5 years@Aliostad, look at the dates, this was posted one-and-a-half year later. Also, it is not customary (and sometimes considered rude) at SO to post an answer to your own question (after it already has one or more valid ones) and accept it. Sometimes a situation warrants multiple answers, but then you typically keep the original answer accepted (unless it turns out to be wrong, or a new answer from another user is just so much better, but that's not the case here, the OP clearly considered the original answer correct). I can only assume the OP wasn't aware of the guidelines in this case.
-
samthebest over 5 years@Abel I'm aware of the guidelines, but I felt it necessary to post my own answer as mine "is just so much better" than all here because it's the only answer that: 1. Includes how to specify the compression codec (as requested in the OP), 2. includes how to add it as a pimp/extension method (as requested in the OP), 3. actually compiles! (includes necessary imports), 4. uses correct Scala style and formatting. It's sad that it's nearly 2019 and not everyone can even write code that compiles nor is even correct style.
-
eggie5 over 4 yearsI like this solution, as it doesn't use DataFrames. It works for me. I worry that it only writes 1 file per group which could be troublesome for large datasets right? For example my groups are about 150MB which is fine...
-
JP Silvashy over 4 yearsThe top answer is actually the best, it appears you basically copied his.
-
samthebest over 4 years@JPSilvashy I did try to edit the answer so that it 1. Includes how to specify the compression codec (as requested in the OP), 2. includes how to add it as a pimp/extension method (as requested in the OP), 3. actually compiles! (includes necessary imports), 4. uses correct Scala style and formatting. The poster rejected my edits, so I created a new answer. At least dozen people have found my answer more helpful than the top answer.
-
Ayoub Omari over 3 yearsI think this solution doesn't work for huge amount of data in each key