How to avoid empty files while writing parquet files?

11,270

Solution 1

Is there any way I can stop writing an empty file.

Yes, but you would rather not do it.

The reason for many empty parquet files is that Spark SQL (the underlying infrastructure for Structured Streaming) tries to guess the number of partitions to load a dataset (with records from Kafka per batch) and does this "poorly", i.e. many partitions have no data.

When you save a partition with no data you will get an empty file.

You can use repartition or coalesce operators to set the proper number of partitions and reduce (or even completely avoid) empty files. See Dataset API.

Why would you not do it? repartition and coalesce may incur performance degradation due to the extra step of shuffling the data between partitions (and possibly nodes in your Spark cluster). That can be expensive and not worth doing it (and hence I said that you would rather not do it).

You may then be asking yourself, how to know the right number of partitions? And that's a very good question in any Spark project. The answer is fairly simple (and obvious if you understand what and how Spark does the processing): "Know your data" so you can calculate how many is exactly right.

Solution 2

I recommend using repartition(partitioningColumns) on the Dataframe resp. Dataset and after that partitionBy(partitioningColumns) on the writeStream operation to avoid writing empty files.

Reason: The bottleneck if you have a lot of data is often the read performance with Spark if you have a lot of small (or even empty) files and no partitioning. So you should definitely make use of the file/directory partitioning (which is not the same as RDD partitioning). This is especially a problem when using AWS S3. The partitionColumns should fit your common queries when reading the data like timestamp/day, message type/Kafka topic, ...

See also the partitionBy documentation on http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

year=2016/month=01/, year=2016/month=02/

Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.

This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0.

Solution 3

you can try with repartitionByRange(column)..

I used this while writing dataframe to HDFS .. It solved my empty file creation issue.

Share:
11,270
ranjith reddy
Author by

ranjith reddy

Updated on July 11, 2022

Comments

  • ranjith reddy
    ranjith reddy almost 2 years

    I am reading from Kafka queue using Spark Structured Streaming. After reading from Kafka I am applying filter on the dataframe. I am saving this filtered dataframe into a parquet file. This is generating many empty parquet files. Is there any way I can stop writing an empty file?

    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .load()
    
    Transaction_DF = df.selectExpr("CAST(value AS STRING)")
    
    decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
    filterDF = decomDF.filter(.....) 
    
    query = filterDF .writeStream \
        .option("path", outputpath) \
        .option("checkpointLocation", RawXMLCheckpoint) \
        .start()
    
    • Rahul Sharma
      Rahul Sharma over 6 years
      That was similar challenge I had faced then I decided to write data to hbase then down streaming uses hbase. In your case also you can write it to some Nosql database to avoid many small files.
  • ranjith reddy
    ranjith reddy over 6 years
    While saving structured streams into the directory, every stream is appending as a new part file in the directory.If I add a filter condition on the stream and if the stream does not satisfy then stream becomes empty, this empty stream is written into separate file