We forgot something. Combiner.

The title to this blog post may say it clearly enough, but we did manage to miss something rather important.  This is just based on my understanding, and I haven’t had the time to test the results, so any help in doing so would be greatly appreciated, and so would any help further explaining it or clearing up any of my misconceptions on the class would also be great.  I’ll do another on the Partitioner soon, as well.

I noticed it on Wednesday, and couldn’t help but dig a little deeper.  I’ll start with a few hints so we can see what we missed, but let’s start by looking back to the MapReduce WordCount program from earlier.  The one I’m using here is the latest posted on Blackboard, but critically has  a few lines different from the one posted earlier on the Titanpad instructions, and found on in this tutorial.  The main difference is how the classes are written, and what they do.

This is the latest one on Blackboard:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

And this is the code from the Hortonworks tutorial:


package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 public class WordCount {

 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 String line = value.toString();
 StringTokenizer tokenizer = new StringTokenizer(line);
 while (tokenizer.hasMoreTokens()) {
 word.set(tokenizer.nextToken());
 context.write(word, one);
 }
 }
 } 

 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

 public void reduce(Text key, Iterable<IntWritable> values, Context context) 
 throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) {
 sum += val.get();
 }
 context.write(key, new IntWritable(sum));
 }
 }

 public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

 Job job = new Job(conf, "wordcount");

 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);

 job.setMapperClass(Map.class);
 job.setReducerClass(Reduce.class);

 job.setInputFormatClass(TextInputFormat.class);
 job.setOutputFormatClass(TextOutputFormat.class);

 FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));

 job.waitForCompletion(true);
 }
}

You can see there isn’t much difference if you look at the two side by side, as both the map and reduce methods are nearly the exact same.  The main difference being the name of the Tokenizer, and the use of toString over line.  Now, although they seem very similar there’s a big difference in how the <key,value> pairs are handled.  The main reason is how the jobs are called,

In the first:

Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

And in the second:

Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

Hmm, notice that repeated call to the reducer with the IntSumReducer class?  Let’s find out what’s going on there, starting with the documentation for that setCombinerClass.  So what’s happening here and what’s the difference between the two?

The combiner is the local reduction phase, which is to simplify In the one from the Hortonworks tutorial (the second) we’re not combining our <key,value> pairs before sending them to the reducer, and in the Apache one we are!  What the combiner is is a local call of the reducer, so that the <key,value> pairs are combined before being sent off to the reducer.  From a practical standpoint that means what’s being sent to the reducer in the first is <word,N>, and in the second is <word,1> N times.  This clearly results in far less data being transferred between nodes, and improved efficiency, so don’t forget to include it.

You can learn more about the combiner here.

Advertisements