Spark + Parquet + Snappy: Overall compression ratio loses after spark shuffles data

12,414

when you call repartition(n) on a dataframe you are doing a round-robin partitioning. Any data locality that existed prior to repartitioning is gone entropy has gone up. So run length and dictionary encoders as well as compression codecs don't really have much to work with.

so when you do you repartition you need to use repartition (n, col) version. give it a good column that would preserve data locality.

Also, since you are probably optimizing your sqooped tables for downstream jobs you can sortWithinPartition for faster scans.

df.repartition(100, $"userId").sortWithinPartitions("userId").write.parquet(...)

Share:
12,414
Mikhail Dubkov
Author by

Mikhail Dubkov

Updated on June 07, 2022

Comments

  • Mikhail Dubkov
    Mikhail Dubkov almost 2 years

    Commmunity!

    Please help me understand how to get better compression ratio with Spark?

    Let me describe case:

    1. I have dataset, let's call it product on HDFS which was imported using Sqoop ImportTool as-parquet-file using codec snappy. As result of import, I have 100 files with total 46 GB du, files with diffrrent size (min 11MB, max 1.5GB, avg ~ 500MB). Total count of records a little bit more than 8 billions with 84 columns

    2. I'm doing simple read/repartition/write with Spark using snappy as well and as result I'm getting:

    ~100 GB output size with the same files count, same codec, same count and same columns.

    Code snippet:

    val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
    
    productDF
    .repartition(100)
    .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
    .option("compression", "snappy")
    .parquet("/processed/product/20180215/04-37/read_repartition_write/general")
    
    1. Using parquet-tools I have looked into random files from both ingest and processed and they looks as below:

    ingest:

    creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
    extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
    
    and almost all columns looks like
    AVAILABLE: OPTIONAL INT64 R:0 D:1
    
    row group 1:                    RC:3640100 TS:36454739 OFFSET:4 
    
    AVAILABLE:                       INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]
    

    processed:

    creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) 
    extra:                          org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"
    
    AVAILABLE:                      OPTIONAL INT64 R:0 D:1
    ...
    
    row group 1:                    RC:6660100 TS:243047789 OFFSET:4 
    
    AVAILABLE:                       INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]
    

    In other hand, without repartition or using coalesce - size remains close to ingest data size.

    1. Going forward, I did following:

      • read dataset and write it back with

        productDF
          .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
          .option("compression", "none")
          .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
        
      • read dataset, repartition and write it back with

        productDF
          .repartition(500)
          .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
          .option("compression", "none")
          .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
        

    As result: 80 GB without and 283 GB with repartition with same # of output files

    80GB parquet meta example:

    AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]
    

    283 GB parquet meta example:

    AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]
    

    It seems, that parquet itself (with encoding?) much reduce size of data even without uncompressed data. How ? :)

    I tried to read uncompressed 80GB, repartition and write back - I've got my 283 GB

    • The first question for me is why I'm getting bigger size after spark repartitioning/shuffle?

    • The second is how to efficiently shuffle data in spark to benefit parquet encoding/compression if there any?

    In general, I don't want that my data size growing after spark processing, even if I didn't change anything.

    Also, I failed to find, is there any configurable compression rate for snappy, e.g. -1 ... -9? As I know, gzip has this, but what is the way to control this rate in Spark/Parquet writer?

    Appreciate for any help!

    Thanks!

  • Blue Clouds
    Blue Clouds over 3 years
    " is gone entropy has gone up" what does this mean?