Write spark dataframe to single parquet file

11,389

Solution 1

I’ll approach the print functions issue first, as it’s something fundamental to understanding spark. Then limit vs sample. Then repartition vs coalesce.

The reasons the print functions take so long in this manner is because coalesce is a lazy transformation. Most transformations in spark are lazy and do not get evaluated until an action gets called.

Actions are things that do stuff and (mostly) dont return a new dataframe as a result. Like count, show. They return a number, and some data, whereas coalesce returns a dataframe with 1 partition (sort of, see below).

What is happening is that you are rerunning the sql query and the coalesce call each time you call an action on the tiny dataframe. That’s why they are using the 25k mappers for each call.

To save time, add the .cache() method to the first line (for your print code anyway).

Then the data frame transformations are actually executed on your first line and the result persisted in memory on your spark nodes.

This won’t have any impact on the initial query time for the first line, but at least you’re not running that query 2 more times because the result has been cached, and the actions can then use that cached result.

To remove it from memory, use the .unpersist() method.

Now for the actual query youre trying to do...

It really depends on how your data is partitioned. As in, is it partitioned on specific fields etc...

You mentioned it in your question, but sample might the right way to go.

Why is this?

limit has to search for 500 of the first rows. Unless your data is partitioned by row number (or some sort of incrementing id) then the first 500 rows could be stored in any of the the 25k partitions.

So spark has to go search through all of them until it finds all the correct values. Not only that, it has to perform an additional step of sorting the data to have the correct order.

sample just grabs 500 random values. Much easier to do as there’s no order/sorting of the data involved and it doesn’t have to search through specific partitions for specific rows.

While limit can be faster, it also has its, erm, limits. I usually only use it for very small subsets like 10/20 rows.

Now for partitioning....

The problem I think with coalesce is it virtually changes the partitioning. Now I’m not sure about this, so pinch of salt.

According to the pyspark docs:

this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

So your 500 rows will actually still sit across your 25k physical partitions that are considered by spark to be 1 virtual partition.

Causing a shuffle (usually bad) and persisting in spark memory with .repartition(1).cache() is possibly a good idea here. Because instead of having the 25k mappers looking at the physical partitions when you write, it should only result in 1 mapper looking at what is in spark memory. Then write becomes easy. You’re also dealing with a small subset, so any shuffling should (hopefully) be manageable.

Obviously this is usually bad practice, and doesn’t change the fact spark will probably want to run 25k mappers when it performs the original sql query. Hopefully sample takes care of that.

edit to clarify shuffling, repartition and coalesce

You have 2 datasets in 16 partitions on a 4 node cluster. You want to join them and write as a new dataset in 16 partitions.

Row 1 for data 1 might be on node 1, and row 1 for data 2 on node 4.

In order to join these rows together, spark has to physically move one, or both of them, then write to a new partition.

That’s a shuffle, physically moving data around a cluster.

It doesn’t matter that everything is partitioned by 16, what matters is where the data is sitting on he cluster.

data.repartition(4) will physically move data from each 4 sets of partitions per node into 1 partition per node.

Spark might move all 4 partitions from node 1 over to the 3 other nodes, in a new single partition on those nodes, and vice versa.

I wouldn’t think it’d do this, but it’s an extreme case that demonstrates the point.

A coalesce(4) call though, doesn’t move the data, it’s much more clever. Instead, it recognises “I already have 4 partitions per node & 4 nodes in total... I’m just going to call all 4 of those partitions per node a single partition and then I’ll have 4 total partitions!”

So it doesn’t need to move any data because it just combines existing partitions into a joined partition.

Solution 2

Try this, in my empirical experience repartition works better for this kind of problems:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")

Even better if you are interested in the parquet you don't need to save it as a table:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")
Share:
11,389
seth127
Author by

seth127

Updated on July 02, 2022

Comments

  • seth127
    seth127 over 1 year

    I am trying to do something very simple and I'm having some very stupid struggles. I think it must have to do with a fundamental misunderstanding of what spark is doing. I would greatly appreciate any help or explanation.

    I have a very large (~3 TB, ~300MM rows, 25k partitions) table, saved as parquet in s3, and I would like to give someone a tiny sample of it as a single parquet file. Unfortunately, this is taking forever to finish and I don't understand why. I have tried the following:

    tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
    tiny.coalesce(1).write.saveAsTable("db.tiny_table")
    

    and then when that didn't work I tried this, which I thought should be the same, but I wasn't sure. (I added the print's in an effort to debug.)

    tiny = spark.table("db.big_table").limit(500).coalesce(1)
    print(tiny.count())
    print(tiny.show(10))
    tiny.write.saveAsTable("db.tiny_table")
    

    When I watch the Yarn UI, both print statements and the write are using 25k mappers. The count took 3 mins, the show took 25 mins, and the write took ~40 mins, although it finally did write the single file table I was looking for.

    It seems to me like the first line should take the top 500 rows and coalesce them to a single partition, and then the other lines should happen extremely fast (on a single mapper/reducer). Can anyone see what I'm doing wrong here? I've been told maybe I should use sample instead of limit but as I understand it limit should be much faster. Is that right?

    Thanks in advance for any thoughts!

  • seth127
    seth127 about 5 years
    Hmm, I'm not sure what you mean by "prefers" although I did find another post saying "I think the current documentation covers this pretty well: github.com/apache/spark/blob/… Keep in mind that all repartition does is call coalesce with the shuffle parameter set to true. Let me know if that helps." So that's interesting, but I'm not clear on why that would make it faster. (the other post: stackoverflow.com/questions/31610971/…)
  • Alessandro
    Alessandro about 5 years
    You are right I meant I empirically found it to be working better!
  • seth127
    seth127 about 5 years
    Thank you, this is very helpful! To your first point: my understanding of the lazy eval was that it didn't actually execute until something like tiny.count() is called (as you said) but then I don't understand why it has to eval again for the subsequent lines. I guess because I didn't cache the df? This leads to your second point: I've never heard this about "virtual" partitioning. Can you point me to any docs/explanation about that. I figured, once the tiny.count() happened, then all 500 rows were on the same partition.
  • seth127
    seth127 about 5 years
    Just to get on a soapbox for a short minute: I've read literally dozens of explanations of "repartition vs. coalesce" and none of them seem to adequately explain it. The "virtual" partitioning is a perfect example (i.e. that I've never heard of this concept). Maybe "all repartition does is call coalesce with the shuffle parameter set to true" (from stackoverflow.com/questions/31610971) is all I need to know and I just need to better understand the implications of shuffling. I'm not sure. Any good explanation/docs you can point me to would be greatly appreciated.
  • dijksterhuis
    dijksterhuis about 5 years
    @seth127 also just found this.. might be useful edureka.co/blog/demystifying-partitioning-in-spark