Shuffle, merger and fetcher errors when processing large files in hadoop
Solution 1
It could be one of four things I know if, most likely being the point you made in your question about disk space, or a similar problem - inodes:
- Files being deleted by another process (unlikely, unless you remember doing this yourself)
- Disk error (unlikely)
- Not enough disk space
- Not enough inodes (run
df -i
)
Even if you run df -h
and df -i
before/after the job, you don't know how much is being eaten and cleaned away during the job. So while your job is running, suggest watching these numbers / log them to a file / graph them / etc. E.g.
watch "df -h && df -i"
Solution 2
"mapreduce.cluster.local.dir" (Old deprecated name: mapred.local.dir) specified in the mapred-site.xml.
Solution 3
You need to specify some temp directories to store the intermediate map and reduce output. May be you have not specified any temp directories so it could not find any valid directory to store the intermediate data. You can do it by editing mapred-site.xml
<property>
<name>mapred.local.dir</name>
<value>/temp1,/temp2,/temp3</value>
</property>
Comma-separated list of paths on the local filesystem where temporary MapReduce data is written. Multiple paths help spread disk i/o.
After specifying these temp directories it will store intermediate map and reduce output by choosing the temp directories in any of the below ways
random: In this case, the intermediate data for reduce tasks is stored at a data location chosen at random.
max: In this case, the intermediate data for reduce tasks is stored at a data location with the most available space.
roundrobin: In this case, the mappers and reducers pick disks through round-robin scheduling for storing intermediate data at the job level within the number of local disks. The job ID is used to create unique sub directories on the local disks to store the intermediate data for each job.
you can set this property in mapred-site.xml example
<property>
<name>mapreduce.job.local.dir.locator</name>
<value>max</value>
</property>
By default in hadoop it is roundrobin
Admin
Updated on June 05, 2022Comments
-
Admin almost 2 years
I am running a word-count like mapreduce job processing 200 files of 1Gb each. I am running the job on a hadoop cluster comprising 4 datanodes (2cpu each) with 8Gb of memory and about 200G of space. I have tried various configurations options but every time my job fails, with either InMemory Shuffle, OnDisk Shuffle, InMemory merger, OnDisk Merger, or Fetcher errors.
The size of the mapper output is comparable to the size of the input files, therefore , in order to minimise the mapper output size I am using the BZip2 compression for the mapreduce output. However even with a compressed map output I still get errors in the reducer phase. I use 4 reducers. Thus I have tried various configurations of the hadoop cluster:
The standard configuration of the cluster was:
Default virtual memory for a job's map-task 3328 Mb Default virtual memory for a job's reduce-task 6656 Mb Map-side sort buffer memory 205 Mb Mapreduce Log Dir Prefix /var/log/hadoop-mapreduce Mapreduce PID Dir Prefix /var/run/hadoop-mapreduce yarn.app.mapreduce.am.resource.mb 6656 mapreduce.admin.map.child.java.opts -Djava.net.preferIPv4Stack=TRUE -Dhadoop.metrics.log.level=WARN mapreduce.admin.reduce.child.java.opts -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN mapreduce.admin.user.env LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/`$JAVA_HOME/bin/java -d32 -version &> /dev/null;if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi` mapreduce.am.max-attempts 2 mapreduce.application.classpath $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/* mapreduce.cluster.administrators hadoop mapreduce.framework.name yarn mapreduce.job.reduce.slowstart.completedmaps 0.05 mapreduce.jobhistory.address ip-XXXX.compute.internal:10020 mapreduce.jobhistory.done-dir /mr-history/done mapreduce.jobhistory.intermediate-done-dir /mr-history/tmp mapreduce.jobhistory.webapp.address ip-XXXX.compute.internal:19888 mapreduce.map.java.opts -Xmx2662m mapreduce.map.log.level INFO mapreduce.map.output.compress true mapreduce.map.sort.spill.percent 0.7 mapreduce.map.speculative false mapreduce.output.fileoutputformat.compress true mapreduce.output.fileoutputformat.compress.type BLOCK mapreduce.reduce.input.buffer.percent 0.0 mapreduce.reduce.java.opts -Xmx5325m mapreduce.reduce.log.level INFO mapreduce.reduce.shuffle.input.buffer.percent 0.7 mapreduce.reduce.shuffle.merge.percent 0.66 mapreduce.reduce.shuffle.parallelcopies 30 mapreduce.reduce.speculative false mapreduce.shuffle.port 13562 mapreduce.task.io.sort.factor 100 mapreduce.task.timeout 300000 yarn.app.mapreduce.am.admin-command-opts -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN yarn.app.mapreduce.am.command-opts -Xmx5325m yarn.app.mapreduce.am.log.level INFO yarn.app.mapreduce.am.staging-dir /user mapreduce.map.maxattempts 4 mapreduce.reduce.maxattempts 4
This configuration gave me the following error:
14/05/16 20:20:05 INFO mapreduce.Job: map 20% reduce 3% 14/05/16 20:27:13 INFO mapreduce.Job: map 20% reduce 0% 14/05/16 20:27:13 INFO mapreduce.Job: Task Id : attempt_1399989158376_0049_r_000000_0, Status : FAILED Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in InMemoryMerger - Thread to merge in-memory shuffled map-outputs at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1399989158376_0049_r_000000_0/map_2038.out at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131) at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$InMemoryMerger.merge(MergeManagerImpl.java:450) at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)
Then I've tried changing various options, hopping to reduce the load during the shuffle phase, however I got the same error.
mapreduce.reduce.shuffle.parallelcopies 5 mapreduce.task.io.sort.factor 10
or
mapreduce.reduce.shuffle.parallelcopies 10 mapreduce.task.io.sort.factor 20
Then I realised that the tmp files on my data node were non existing and therefore all the merging and shuffling was happening in memory. Therefore I've manually added on each datanode. I've kept the initial configuration but increased the time delay before the reducer starts in order to limit the load on the datanode.
mapreduce.job.reduce.slowstart.completedmaps 0.7
I've also tried increasing the io.sort.mb:
mapreduce.task.io.sort.mb from 205 to 512.
However now I get the following onDisk error:
14/05/26 12:17:08 INFO mapreduce.Job: map 62% reduce 21% 14/05/26 12:20:13 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_0, Status : FAILED Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in OnDiskMerger - Thread to merge on-disk map-outputs at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for hadoop/yarn/local/usercache/eoc21/appcache/application_1400958508328_0021/output/attempt_1400958508328_0021_r_000000_0/map_590.out at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$OnDiskMerger.merge(MergeManagerImpl.java:536) at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)
The reducer dropped down to 0% and when it got back to 17% I got the following error:
14/05/26 12:32:03 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_1, Status : FAILED Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#22 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1400958508328_0021_r_000000_1/map_1015.out at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150) at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131) at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213) at org.apache.hadoop.mapreduce.task.reduce.OnDiskMapOutput.<init>(OnDiskMapOutput.java:61) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:257) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)
I read around and it seems that "Could not find any valid local directory for output/attempt_1400958508328_0021_r_000000_1/map_1015.out" is correlated to not having enough space on the node for the spill. However I checked the data node and it seems that there is enough space:
Filesystem Size Used Avail Use% Mounted on /dev/xvde1 40G 22G 18G 56% / none 3.6G 0 3.6G 0% /dev/shm /dev/xvdj 1008G 758G 199G 80% /hadoop/hdfs/data
So not sure what to try anymore. Is the cluster too small for processing such jobs? Do I require more space on the datanodes? Is there a way to find an optimum configuration for the job on hadoop? Any suggestion is highly appreciated!