How does Spark achieve sort order?
13,090
Sorting in Spark is a multiphase process which requires shuffling:
- input RDD is sampled and this sample is used to compute boundaries for each output partition (
sample
followed bycollect
) - input RDD is partitioned using
rangePartitioner
with boundaries computed in the first step (partitionBy
) - each partition from the second step is sorted locally (
mapPartitions
)
When the data is collected, all that is left is to follow the order defined by the partitioner.
Above steps are clearly reflected in a debug string:
scala> val rdd = sc.parallelize(Seq(4, 2, 5, 3, 1))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at ...
scala> rdd.sortBy(identity).toDebugString
res1: String =
(6) MapPartitionsRDD[10] at sortBy at <console>:24 [] // Sort partitions
| ShuffledRDD[9] at sortBy at <console>:24 [] // Shuffle
+-(8) MapPartitionsRDD[6] at sortBy at <console>:24 [] // Pre-shuffle steps
| ParallelCollectionRDD[0] at parallelize at <console>:21 [] // Parallelize
Related videos on Youtube
Author by
dveim
Updated on July 09, 2022Comments
-
dveim almost 2 years
Assume I have a list of Strings. I filter & sort them, and collect the result to driver. However, things are distributed, and each RDD has it's own part of original list. So, how does Spark achieve the final sorted order, does it merge results?
-
Juh_ over 2 yearsGreat answer. If I understand correctly, before collecting, Spark shuffle data into "sorted" partition that are separated by boundaries which are quantiles from a data sample ? For example, with 50 partitions, there will be a 1st partition for all item below the "sampled 2% quantile item", a 2nd partition from the 2% quantile to the 4% one, etc... ?
-
Juh_ over 2 yearsOne other question: how do you find this information? Looking into the spark code, I find the logical plan node, such as
Sort(...) extends UnitaryNode
, which are the plan definition, but not the sorting algorithm.