Extremely slow S3 write times from EMR/ Spark

26,035

Solution 1

What you are seeing is a problem with outputcommitter and s3. the commit job applies fs.rename on the _temporary folder and since S3 does not support rename it means that a single request is now copying and deleting all the files from _temporary to its final destination..

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") only works with hadoop version > 2.7. what it does is to copy each file from _temporary on commit task and not commit job so it is distributed and works pretty fast.

If you use older version of hadoop I would use Spark 1.6 and use:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

*note that it does not work with specualtion turned on or writing in append mode

**also note that it is deprecated in Spark 2.0 (replaced by algorithm.version=2)

BTW in my team we actually write with Spark to HDFS and use DISTCP jobs (specifically s3-dist-cp) in production to copy the files to S3 but this is done for several other reasons (consistency, fault tolerance) so it is not necessary.. you can write to S3 pretty fast using what I suggested.

Solution 2

I had similar use case where I used spark to write to s3 and had performance issue. Primary reason was spark was creating lot of zero byte part files and replacing temp files to actual file name was slowing down the write process. Tried below approach as work around

  1. Write output of spark to HDFS and used Hive to write to s3. Performance was much better as hive was creating less number of part files. Problem I had is(also had same issue when using spark), delete action on Policy was not provided in prod env because of security reasons. S3 bucket was kms encrypted in my case.

  2. Write spark output to HDFS and Copied hdfs files to local and used aws s3 copy to push data to s3. Had second best results with this approach. Created ticket with Amazon and they suggested to go with this one.

  3. Use s3 dist cp to copy files from HDFS to S3. This was working with no issues, but not performant

Solution 3

The direct committer was pulled from spark as it wasn't resilient to failures. I would strongly advice against using it.

There is work ongoing in Hadoop, s3guard, to add 0-rename committers, which will be O(1) and fault tolerant; keep an eye on HADOOP-13786.

Ignoring "the Magic committer" for now, the Netflix-based staging committer will ship first (hadoop 2.9? 3.0?)

  1. This writes the work to the local FS, in task commit
  2. issues uncommitted multipart put operations to write the data, but not materialize it.
  3. saves the information needed to commit the PUT to HDFS, using the original "algorithm 1" file output committer
  4. Implements a job commit which uses the file output commit of HDFS to decide which PUTs to complete, and which to cancel.

Result: task commit takes data/bandwith seconds, but job commit takes no longer than the time to do 1-4 GETs on the destination folder and a POST for every pending file, the latter being parallelized.

You can pick up the committer which this work is based on, from netflix, and probably use it in spark today. Do set the file commit algorithm = 1 (should be the default) or it wont actually write the data.

Solution 4

I was the same issue, I found a solution to change the s3 protocol, originally i was using s3a:// for read and write the data, then I changed to only s3:// and it works perfect, actually my process appends data.

Solution 5

What do you see in spark output? If you see lots of rename operations, read this

Share:
26,035
blakkheartt12
Author by

blakkheartt12

In the last fourteen years, I have worked within a broad spectrum of the technology field. At Veoh, we pioneered online video and advertising. I've built and maintained four world-class software engineering teams at Verve and Active. I worked as a go-between, functioning as the integral link between the company executive needs and technological development segment of the team. I have facilitated the companies vision and the technological expertise required, into a functioning and successful end products.

Updated on April 30, 2021

Comments

  • blakkheartt12
    blakkheartt12 almost 3 years

    I'm writing to see if anyone knows how to speed up S3 write times from Spark running in EMR?

    My Spark Job takes over 4 hours to complete, however the cluster is only under load during the first 1.5 hours.

    enter image description here

    I was curious into what Spark was doing all this time. I looked at the logs and I found many s3 mv commands, one for each file. Then taking a look directly at S3 I see all my files are in a _temporary directory.

    Secondary, I'm concerned with my cluster cost, it appears I need to buy 2 hours of compute for this specific task. However, I end up buying unto 5 hours. I'm curious if EMR AutoScaling can help with cost in this situation.

    Some articles discuss changing the file output committer algorithm but I've had little success with that.

    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    

    Writing to the local HDFS is quick. I'm curious if issuing a hadoop command to copy the data to S3 would be faster?

    enter image description here