merge multiple small files in to few larger files in Spark

18,185

Solution 1

You may want to try using the DataFrame.coalesce method; it returns a DataFrame with the specified number of partitions (each of which becomes a file on insertion). So using the number of records you are inserting and the typical size of each record, you can estimate how many partitions to coalesce to if you want files of ~200MB.

Solution 2

I had the same issue. Solution was to add DISTRIBUTE BY clause with the partition columns. This ensures that data for one partition goes to single reducer. Example in your case:

INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table DISTRIBUTE BY date

Solution 3

The dataframe repartition(1) method works in this case.

Share:
18,185

Related videos on Youtube

dheee
Author by

dheee

Updated on September 15, 2022

Comments

  • dheee
    dheee over 1 year

    I using hive through Spark. I have a Insert into partitioned table query in my spark code. The input data is in 200+gb. When Spark is writing to a partitioned table, it is spitting very small files(files in kb's). so now the output partitioned table folder have 5000+ small kb files. I want to merge these in to few large MB files, may be about few 200mb files. I tired using hive merge settings, but they don't seem to work.

    'val result7A = hiveContext.sql("set hive.exec.dynamic.partition=true")
    
     val result7B = hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    
    val result7C = hiveContext.sql("SET hive.merge.size.per.task=256000000")
    
    val result7D = hiveContext.sql("SET hive.merge.mapfiles=true")
    
    val result7E = hiveContext.sql("SET hive.merge.mapredfiles=true")
    
    val result7F = hiveContext.sql("SET hive.merge.sparkfiles = true")
    
    val result7G = hiveContext.sql("set hive.aux.jars.path=c:\\Applications\\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar")
    
    val result8 = hiveContext.sql("INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table")'
    

    The above hive settings work in a mapreduce hive execution and spits out files of specified size. Is there any option to do this Spark or Scala?

  • Umesh K
    Umesh K almost 9 years
    Hi @zweiterlinde I tried to use hc.sql("bla bla").coalesce(10) method but it does not thing I still see 200 small small files of around 20 MB
  • zweiterlinde
    zweiterlinde almost 9 years
    I'd need a longer code sample to really be able to comment, but in my toy experiments calling df.write.parquetFile(...) resulted in many part files, but df.coalesce(1).write.parquetFile(...) resulted in one.
  • Hemakshi Sachdev
    Hemakshi Sachdev almost 6 years
    Hey @Jussi Kujala thanks a lot this seems to work for me. But I had one question, what if my table is partitioned by more than one column... will this work in that case also?