Spark: Repartition strategy after reading text file

13,216

It looks like you're working with a gzipped file.

Quoting from my answer here:

I think you've hit a fairly typical problem with gzipped files in that they cannot be loaded in parallel. More specifically, a single gzipped file cannot be loaded in parallel by multiple tasks, so Spark will load it with 1 task and thus give you an RDD with 1 partition.

You need to explicitly repartition the RDD after loading it so that more tasks can run on it parallel.

For example:

val file = sc.textFile("/path/to/file.txt.gz").repartition(sc.defaultParallelism * 3)
println(file.count())

Regarding the comments on your question, the reason setting minPartitions doesn't help here is because a gzipped file is not splittable, so Spark will always use 1 task to read the file.

If you set minPartitions when reading a regular text file, or a file compressed with a splittable compression format like bzip2, you'll see that Spark will actually deploy that number of tasks in parallel (up to the number of cores available in your cluster) to read the file.

Share:
13,216

Related videos on Youtube

Stephane Maarek
Author by

Stephane Maarek

Updated on June 04, 2022

Comments

  • Stephane Maarek
    Stephane Maarek almost 2 years

    I have launched my cluster this way:

    /usr/lib/spark/bin/spark-submit --class MyClass --master yarn-cluster--num-executors 3 --driver-memory 10g --executor-memory 10g --executor-cores 4 /path/to/jar.jar
    

    The first thing I do is read a big text file, and count it:

    val file = sc.textFile("/path/to/file.txt.gz")
    println(file.count())
    

    When doing this, I see that only one of my nodes is actually reading the file and executing the count (because I only see one task). Is that expected? Should I repartition my RDD afterwards, or when I use map reduce functions, will Spark do it for me?

  • Stephane Maarek
    Stephane Maarek over 9 years
    Thanks! Would you recommend bzip2 over gzip then? If I load that file frequently, what should be my strategy to optimize every run?
  • Nick Chammas
    Nick Chammas over 9 years
    @Stephane - It depends on how much data is coming in and how much time your cluster spends repartitioning the data. A single gzipped file might be fine. If the one file is too big, you could probably also go with multiple gzipped files (i.e. splitting before compressing) as each gzipped file can be loaded in parallel into the same RDD (one task per file). That's probably the path of least resistance.
  • Stephane Maarek
    Stephane Maarek over 9 years
    very very interesting thanks! So .gz.001 splitted files or bzip2... I'll experiment with both! I think that yes, the big bottleneck is the first repartition, so if I manage to already split my files when they're coming it might save me a little bit of time
  • Alexander Aleksandrovič Klimov
    Alexander Aleksandrovič Klimov about 9 years
    @Stephane, do you know why that limitation exists? It doesn't seem any easier to distribute the reading of a non-gzipped file - in both cases, you need to read the file serially to work out where the next record begins?
  • Stephane Maarek
    Stephane Maarek about 9 years
    @Paul, I haven't experimented with bzip2 yet, I'll tell you if parallel reading truly works. I don't know, if the archive is splittable, then I guess you can read it in parallel (block 1 to n, n+1 to 2n, etc...) and then probably send the few missing bytes here and there to make sure every part is correctly formed. I hope that's what Spark does
  • Stephane Maarek
    Stephane Maarek about 9 years
    (parallel read does work --- but performance isn't improved too much from a repartition after a serial read, I guess it really depends on if the network is a bottleneck)
  • Nick Chammas
    Nick Chammas about 9 years
    @Stephane - What do your task stats look like? When you say "parallel read does work", how are you verifying that that is happening?
  • Stephane Maarek
    Stephane Maarek about 9 years
    I used to see one task before. Now from what I have seen, with the bzip2 archive, as my file was 8GB, and the reading block on hadoop was 64MB, I saw around 125 tasks being created, and split between my various datanodes. If you click in the spark UI to see the tasks stats, you see the details (input, task time, gc, etc), and I saw I had 125 tasks
  • user2848932
    user2848932 almost 9 years
    do you know other method? I think repartition is too heavy for this purpose, because we don't need to shuffle all of the data but only split a large file into several partitions.
  • Nick Chammas
    Nick Chammas almost 9 years
    @user2848932 - If you don't shuffle the data, how will the other workers in your cluster get to share the work of processing it? Without shuffling, only 1 task on 1 worker will be able to process the gzipped file. Shuffling the data sends parts of it to other workers, which is relatively quick compared to how long you'd otherwise be waiting for 1 task to do all the work.