Spark - repartition() vs coalesce()

305,659

Solution 1

It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept.

So, it would go something like this:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Then coalesce down to 2 partitions:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

Notice that Node 1 and Node 3 did not require its original data to move.

Solution 2

Justin's answer is awesome and this response goes into more depth.

The repartition algorithm does a full shuffle and creates new partitions with data that's distributed evenly. Let's create a DataFrame with the numbers from 1 to 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf contains 4 partitions on my machine.

numbersDf.rdd.partitions.size // => 4

Here is how the data is divided on the partitions:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Let's do a full-shuffle with the repartition method and get this data on two nodes.

val numbersDfR = numbersDf.repartition(2)

Here is how the numbersDfR data is partitioned on my machine:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).

Difference between coalesce and repartition

coalesce uses existing partitions to minimize the amount of data that's shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

Is coalesce or repartition faster?

coalesce may run faster than repartition, but unequal sized partitions are generally slower to work with than equal sized partitions. You'll usually need to repartition datasets after filtering a large data set. I've found repartition to be faster overall because Spark is built to work with equal sized partitions.

N.B. I've curiously observed that repartition can increase the size of data on disk. Make sure to run tests when you're using repartition / coalesce on large datasets.

Read this blog post if you'd like even more details.

When you'll use coalesce & repartition in practice

Solution 3

One additional point to note here is that, as the basic principle of Spark RDD is immutability. The repartition or coalesce will create new RDD. The base RDD will continue to have existence with its original number of partitions. In case the use case demands to persist RDD in cache, then the same has to be done for the newly created RDD.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

Solution 4

repartition - it's recommended to use it while increasing the number of partitions, because it involve shuffling of all the data.

coalesce - it's recommended to use it while reducing the number of partitions. For example if you have 3 partitions and you want to reduce it to 2, coalesce will move the 3rd partition data to partition 1 and 2. Partition 1 and 2 will remains in the same container. On the other hand, repartition will shuffle data in all the partitions, therefore the network usage between the executors will be high and it will impacts the performance.

coalesce performs better than repartition while reducing the number of partitions.

Solution 5

What follows from the code and code docs is that coalesce(n) is the same as coalesce(n, shuffle = false) and repartition(n) is the same as coalesce(n, shuffle = true)

Thus, both coalesce and repartition can be used to increase number of partitions

With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large.

Another important note to accentuate is that if you drastically decrease number of partitions you should consider using shuffled version of coalesce (same as repartition in that case). This will allow your computations be performed in parallel on parent partitions (multiple task).

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

Please also refer to the related answer here

Share:
305,659
Praveen Sripati
Author by

Praveen Sripati

Very passionate about the intersection of Big Data and Cloud technologies. I am a Cloudera Certified Developer for Apache Hadoop, Hortonworks Certified Apache Hadoop Java Developer, AWS Certified Solutions Architect - Associate and AWS Certified Developer - Associate. If interested Consulting/Projects/Trainings around Cloud and Big Data, please contact at [email protected]. Currently I am conducting a training on AWS Development, more details here. I started blogging for fun and started liking it. So, I regularly blog at thecloudavenue.com around Big Data, K8S and Cloud related technologies. I also Tweet here.

Updated on February 17, 2022

Comments

  • Praveen Sripati
    Praveen Sripati over 2 years

    According to Learning Spark

    Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.

    One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased.

    If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement?