Shuffle, merger and fetcher errors when processing large files in hadoop

10,477

Solution 1

It could be one of four things I know if, most likely being the point you made in your question about disk space, or a similar problem - inodes:

  • Files being deleted by another process (unlikely, unless you remember doing this yourself)
  • Disk error (unlikely)
  • Not enough disk space
  • Not enough inodes (run df -i)

Even if you run df -h and df -i before/after the job, you don't know how much is being eaten and cleaned away during the job. So while your job is running, suggest watching these numbers / log them to a file / graph them / etc. E.g.

watch "df -h && df -i"

Solution 2

"mapreduce.cluster.local.dir" (Old deprecated name: mapred.local.dir) specified in the mapred-site.xml.

Solution 3

You need to specify some temp directories to store the intermediate map and reduce output. May be you have not specified any temp directories so it could not find any valid directory to store the intermediate data. You can do it by editing mapred-site.xml

<property>
  <name>mapred.local.dir</name>
  <value>/temp1,/temp2,/temp3</value>
</property>

Comma-separated list of paths on the local filesystem where temporary MapReduce data is written. Multiple paths help spread disk i/o.

After specifying these temp directories it will store intermediate map and reduce output by choosing the temp directories in any of the below ways

random: In this case, the intermediate data for reduce tasks is stored at a data location chosen at random.

max: In this case, the intermediate data for reduce tasks is stored at a data location with the most available space.

roundrobin: In this case, the mappers and reducers pick disks through round-robin scheduling for storing intermediate data at the job level within the number of local disks. The job ID is used to create unique sub directories on the local disks to store the intermediate data for each job.

you can set this property in mapred-site.xml example

<property>
  <name>mapreduce.job.local.dir.locator</name>
  <value>max</value>
</property>

By default in hadoop it is roundrobin

Share:
10,477
Admin
Author by

Admin

Updated on June 05, 2022

Comments

  • Admin
    Admin almost 2 years

    I am running a word-count like mapreduce job processing 200 files of 1Gb each. I am running the job on a hadoop cluster comprising 4 datanodes (2cpu each) with 8Gb of memory and about 200G of space. I have tried various configurations options but every time my job fails, with either InMemory Shuffle, OnDisk Shuffle, InMemory merger, OnDisk Merger, or Fetcher errors.

    The size of the mapper output is comparable to the size of the input files, therefore , in order to minimise the mapper output size I am using the BZip2 compression for the mapreduce output. However even with a compressed map output I still get errors in the reducer phase. I use 4 reducers. Thus I have tried various configurations of the hadoop cluster:

    The standard configuration of the cluster was:

        Default virtual memory for a job's map-task      3328 Mb
        Default virtual memory for a job's reduce-task  6656 Mb
        Map-side sort buffer memory 205 Mb
        Mapreduce Log Dir Prefix    /var/log/hadoop-mapreduce
        Mapreduce PID Dir Prefix    /var/run/hadoop-mapreduce
        yarn.app.mapreduce.am.resource.mb   6656
        mapreduce.admin.map.child.java.opts -Djava.net.preferIPv4Stack=TRUE -Dhadoop.metrics.log.level=WARN
        mapreduce.admin.reduce.child.java.opts  -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN
        mapreduce.admin.user.env LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/`$JAVA_HOME/bin/java -d32 -version &> /dev/null;if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`
        mapreduce.am.max-attempts   2
        mapreduce.application.classpath $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
        mapreduce.cluster.administrators    hadoop
        mapreduce.framework.name    yarn
     mapreduce.job.reduce.slowstart.completedmaps   0.05
        mapreduce.jobhistory.address    ip-XXXX.compute.internal:10020
        mapreduce.jobhistory.done-dir   /mr-history/done
        mapreduce.jobhistory.intermediate-done-dir  /mr-history/tmp
        mapreduce.jobhistory.webapp.address ip-XXXX.compute.internal:19888
        mapreduce.map.java.opts -Xmx2662m
        mapreduce.map.log.level INFO
        mapreduce.map.output.compress   true
        mapreduce.map.sort.spill.percent    0.7
        mapreduce.map.speculative   false
        mapreduce.output.fileoutputformat.compress  true
        mapreduce.output.fileoutputformat.compress.type BLOCK
        mapreduce.reduce.input.buffer.percent   0.0
        mapreduce.reduce.java.opts  -Xmx5325m
        mapreduce.reduce.log.level  INFO
        mapreduce.reduce.shuffle.input.buffer.percent 0.7
        mapreduce.reduce.shuffle.merge.percent  0.66
        mapreduce.reduce.shuffle.parallelcopies 30
        mapreduce.reduce.speculative    false
        mapreduce.shuffle.port  13562
        mapreduce.task.io.sort.factor   100
        mapreduce.task.timeout  300000
        yarn.app.mapreduce.am.admin-command-opts    -Djava.net.preferIPv4Stack=true -Dhadoop.metrics.log.level=WARN
        yarn.app.mapreduce.am.command-opts  -Xmx5325m
        yarn.app.mapreduce.am.log.level INFO
        yarn.app.mapreduce.am.staging-dir   /user
        mapreduce.map.maxattempts       4
        mapreduce.reduce.maxattempts        4
    

    This configuration gave me the following error:

    14/05/16 20:20:05 INFO mapreduce.Job:  map 20% reduce 3%
    14/05/16 20:27:13 INFO mapreduce.Job:  map 20% reduce 0%
    14/05/16 20:27:13 INFO mapreduce.Job: Task Id : attempt_1399989158376_0049_r_000000_0,      Status : FAILED
    Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in InMemoryMerger - Thread to merge in-memory shuffled map-outputs
        at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
     Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1399989158376_0049_r_000000_0/map_2038.out
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
        at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
        at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
        at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213)
        at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$InMemoryMerger.merge(MergeManagerImpl.java:450)
        at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)
    

    Then I've tried changing various options, hopping to reduce the load during the shuffle phase, however I got the same error.

    mapreduce.reduce.shuffle.parallelcopies     5
    mapreduce.task.io.sort.factor   10
    

    or

    mapreduce.reduce.shuffle.parallelcopies     10
    mapreduce.task.io.sort.factor   20
    

    Then I realised that the tmp files on my data node were non existing and therefore all the merging and shuffling was happening in memory. Therefore I've manually added on each datanode. I've kept the initial configuration but increased the time delay before the reducer starts in order to limit the load on the datanode.

    mapreduce.job.reduce.slowstart.completedmaps 0.7
    

    I've also tried increasing the io.sort.mb:

    mapreduce.task.io.sort.mb from 205 to 512. 
    

    However now I get the following onDisk error:

    14/05/26 12:17:08 INFO mapreduce.Job:  map 62% reduce 21%
    14/05/26 12:20:13 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_0, Status : FAILED
    Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in OnDiskMerger - Thread to merge on-disk map-outputs
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
    Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for hadoop/yarn/local/usercache/eoc21/appcache/application_1400958508328_0021/output/attempt_1400958508328_0021_r_000000_0/map_590.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$OnDiskMerger.merge(MergeManagerImpl.java:536)
    at org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)
    

    The reducer dropped down to 0% and when it got back to 17% I got the following error:

    14/05/26 12:32:03 INFO mapreduce.Job: Task Id : attempt_1400958508328_0021_r_000000_1, Status : FAILED
    Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#22
    at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:121)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:380)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
    Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/attempt_1400958508328_0021_r_000000_1/map_1015.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:398)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)
    at org.apache.hadoop.mapred.YarnOutputFiles.getInputFileForWrite(YarnOutputFiles.java:213)
    at org.apache.hadoop.mapreduce.task.reduce.OnDiskMapOutput.<init>(OnDiskMapOutput.java:61)
    at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:257)
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
    at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)
    

    I read around and it seems that "Could not find any valid local directory for output/attempt_1400958508328_0021_r_000000_1/map_1015.out" is correlated to not having enough space on the node for the spill. However I checked the data node and it seems that there is enough space:

    Filesystem      Size  Used Avail Use% Mounted on
    /dev/xvde1       40G   22G   18G  56% /
    none            3.6G     0  3.6G   0% /dev/shm
    /dev/xvdj      1008G  758G  199G  80% /hadoop/hdfs/data
    

    So not sure what to try anymore. Is the cluster too small for processing such jobs? Do I require more space on the datanodes? Is there a way to find an optimum configuration for the job on hadoop? Any suggestion is highly appreciated!