How does range partitioner work in Spark?
Background on Range Partitioning
The code you're posting comes from the method used to take an unpartitioned RDD and partition it by a new range partitioner. This involves three steps:
 Compute reasonable range boundaries
 Construct a partitioner from these range boundaries which gives you a function from key
K
to partition index  Shuffle the RDD against this new partitioner
Your question concerns the first of these steps. Ideally you could just collect all the RDD data, sort it, and determine range bounds that divide our sorted collection into nPartitions
chunks. Easy!
Not so much. This algorithmic is O(n log n) in compute, and requires memory proportional to the collection. These facts (the second in particular) makes it impractical to execute in the distributed Spark framework. But we don't need our partitions to be exactly balanced as they will be after my terrible collectandsort implementation. As long as our partitions end up reasonably balanced, we're in the clear. If we can use an algorithm that gives us approximate quantile boundaries but is faster to run, this is probably a win.
Okay, so we've got the motivation to have an efficient algorithm that runs quickly and doesn't take too much memory. Reservoir sampling turns out to be a great way to do this. If your collection has 1B elements and you sample 1M, the 10th percentile of your 1M elements is approximately equal to the 10th percentile of your 1B. You can do exactly the same collectandsort algorithm to determine range bounds, but on a reduced randomlysampled subset of the full data.
Your specific question on the multiplication by 3
The first line (sampleSize
) estimates the number of samples required to represent the true range of values adequately. This is somewhat arbitrary, and probably based on trialanderror. But since you want to sample in parallel, you need to know how many values to take from each distributed partition
, not how many values to take overall. The second line (sampleSizePerPartition
) estimates this number.
Earlier I mentioned how we want partitions to be approximately balanced. This is because a huge number of Spark functions rely on this property  the sampleSizePerPartition
code included. We know that partition sizes vary a bit, but assume that they don't vary too much. By sampling 3x more values from each partition than we would need if they were perfectly balanced, we can tolerate more partition imbalance.
Consider what would happen if you have 100,000 partitions. In this case, sampleSize
is 2 million (20 * partitions)
If you take 20 random elements from each partition, then if any partition has fewer than 20 elements you're going to end up with fewer samples than sampleSize
. Taking 60 elements from each partition is aggressive, but ensures that you'll get enough samples in all but the most extreme imbalancedpartition scenarios.
Related videos on Youtube
American curl
Updated on May 15, 2022Comments

American curl 13 days
I'm not so clear about how range partitioner works in Spark. It uses (Reservoir Sampling) to take samples. And I was confused by the way of computing the boundaries of the input.
// This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and oversample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
Why the calculated sampleSize should multiply by 3.0? And how to get the boundary? Can someone show me some examples about this? Thank you!