Hadoop: Reducer writing Mapper output into Output File

10,106

Your reduce function arguments should be as follows:

public void reduce(Text key, Iterable <Text> wtfs,
     Context context) throws IOException, InterruptedException {

With the way you've defined the arguments, reduce operation is not getting a list of values, and therefore it just outputs whatever input it gets from the map function because

sum+ = val.get()

is just going from 0 to 1 every time because each <key, value> pair in the form <word, one> is coming separately to the reducer.

Also, the mapper function doesn't normally write to the output file ( i have never heard of it, but I don't know if that's possible). In the usual case, it is always the reducer that writes to output file. Mapper output is intermediate data that is handled transparently by Hadoop. So if you see something in the output file, that has to be the reducer output, not the mapper output. If you want to verify this, you can go to the logs for the job you ran, and check out what's happening in each mapper and reducer individually.

Hope this clears some things for you.

Share:
10,106
tony marbo
Author by

tony marbo

graduate student

Updated on June 04, 2022

Comments

  • tony marbo
    tony marbo almost 2 years

    I met a very very strange problem. The reducers do work but if I check the output files, I only found the output from the mappers. When I was trying to debug, I found the same problem with the word count sample after I changed the mappers' output value type from Longwritable to Text

        package org.myorg;
    
    import java.io.IOException;
    import java.util.*;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    import org.apache.hadoop.util.*;
    
    public class WordCount extends Configured implements Tool {
    
       public static class Map
           extends Mapper<LongWritable, Text, Text, Text> {
         private final static IntWritable one = new IntWritable(1);
         private Text word = new Text();
    
         public void map(LongWritable key, Text wtf, Context context)
             throws IOException, InterruptedException {
           String line = wtf.toString();
           StringTokenizer tokenizer = new StringTokenizer(line);
           while (tokenizer.hasMoreTokens()) {
             word.set(tokenizer.nextToken());
             context.write(word, new Text("frommapper"));
           }
         }
       }
    
       public static class Reduce
           extends Reducer<Text, Text, Text, Text> {
         public void reduce(Text key, Text wtfs,
             Context context) throws IOException, InterruptedException {
    /*
           int sum = 0;
           for (IntWritable val : wtfs) {
             sum += val.get();
           }
           context.write(key, new IntWritable(sum));*/
        context.write(key,new Text("can't output"));
         }
       }
    
       public int run(String [] args) throws Exception {
         Job job = new Job(getConf());
         job.setJarByClass(WordCount.class);
         job.setJobName("wordcount");
    
    
         job.setOutputKeyClass(Text.class);
         job.setMapOutputValueClass(Text.class);
           job.setOutputValueClass(Text.class);
         job.setMapperClass(Map.class);
         //job.setCombinerClass(Reduce.class);
         job.setReducerClass(Reduce.class);
    
         job.setInputFormatClass(TextInputFormat.class);
         job.setOutputFormatClass(TextOutputFormat.class);
    
         FileInputFormat.setInputPaths(job, new Path(args[0]));
         FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
         boolean success = job.waitForCompletion(true);
         return success ? 0 : 1;
             }
    
       public static void main(String[] args) throws Exception {
         int ret = ToolRunner.run(new WordCount(), args);
         System.exit(ret);
       }
    }
    

    here are the results

    JobClient:     Combine output records=0
    12/06/13 17:37:46 INFO mapred.JobClient:     Map input records=7
    12/06/13 17:37:46 INFO mapred.JobClient:     Reduce shuffle bytes=116
    12/06/13 17:37:46 INFO mapred.JobClient:     Reduce output records=7
    12/06/13 17:37:46 INFO mapred.JobClient:     Spilled Records=14
    12/06/13 17:37:46 INFO mapred.JobClient:     Map output bytes=96
    12/06/13 17:37:46 INFO mapred.JobClient:     Combine input records=0
    12/06/13 17:37:46 INFO mapred.JobClient:     Map output records=7
    12/06/13 17:37:46 INFO mapred.JobClient:     Reduce input records=7
    

    then I found the strange results in the outfile. This problem happened after I changed the output value type of map and input key type of reducer to Text no matter I changed the type of reduce output value or not. I was also forced to change job.setOutputValue(Text.class)

    a   frommapper
    a   frommapper
    a   frommapper
    gg  frommapper
    h   frommapper
    sss frommapper
    sss frommapper
    

    Help!

  • Admin
    Admin almost 12 years
    i think this is right, i don't see any case where the map output is directly written to output file when a reduce function is explicitly defined in the program
  • tony marbo
    tony marbo almost 12 years
    Thanks a lot! It is exactly where the problem is. By the way, mapper can only write to the files when you use job.setNumReduceTasks(0) in the job configuration. At that time, output file names contain a '-m" instead of '-r'