How does range partitioner work in Spark?

10,918

Background on Range Partitioning

The code you're posting comes from the method used to take an unpartitioned RDD and partition it by a new range partitioner. This involves three steps:

  1. Compute reasonable range boundaries
  2. Construct a partitioner from these range boundaries which gives you a function from key K to partition index
  3. Shuffle the RDD against this new partitioner

Your question concerns the first of these steps. Ideally you could just collect all the RDD data, sort it, and determine range bounds that divide our sorted collection into nPartitions chunks. Easy!

Not so much. This algorithmic is O(n log n) in compute, and requires memory proportional to the collection. These facts (the second in particular) makes it impractical to execute in the distributed Spark framework. But we don't need our partitions to be exactly balanced as they will be after my terrible collect-and-sort implementation. As long as our partitions end up reasonably balanced, we're in the clear. If we can use an algorithm that gives us approximate quantile boundaries but is faster to run, this is probably a win.

Okay, so we've got the motivation to have an efficient algorithm that runs quickly and doesn't take too much memory. Reservoir sampling turns out to be a great way to do this. If your collection has 1B elements and you sample 1M, the 10th percentile of your 1M elements is approximately equal to the 10th percentile of your 1B. You can do exactly the same collect-and-sort algorithm to determine range bounds, but on a reduced randomly-sampled subset of the full data.

Your specific question on the multiplication by 3

The first line (sampleSize) estimates the number of samples required to represent the true range of values adequately. This is somewhat arbitrary, and probably based on trial-and-error. But since you want to sample in parallel, you need to know how many values to take from each distributed partition, not how many values to take overall. The second line (sampleSizePerPartition) estimates this number.

Earlier I mentioned how we want partitions to be approximately balanced. This is because a huge number of Spark functions rely on this property -- the sampleSizePerPartition code included. We know that partition sizes vary a bit, but assume that they don't vary too much. By sampling 3x more values from each partition than we would need if they were perfectly balanced, we can tolerate more partition imbalance.

Consider what would happen if you have 100,000 partitions. In this case, sampleSize is 2 million (20 * partitions)

If you take 20 random elements from each partition, then if any partition has fewer than 20 elements you're going to end up with fewer samples than sampleSize. Taking 60 elements from each partition is aggressive, but ensures that you'll get enough samples in all but the most extreme imbalanced-partition scenarios.

Share:
10,918

Related videos on Youtube

American curl
Author by

American curl

Updated on June 04, 2022

Comments

  • American curl
    American curl almost 2 years

    I'm not so clear about how range partitioner works in Spark. It uses (Reservoir Sampling) to take samples. And I was confused by the way of computing the boundaries of the input.

     // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
    

    Why the calculated sampleSize should multiply by 3.0? And how to get the boundary? Can someone show me some examples about this? Thank you!