Hadoop Real-World Solutions Cookbook(Second Edition)
上QQ阅读APP看书,第一时间看更新

Map Reduce program to find the top X

In this recipe, we are going to learn how to write a map reduce program to find the top X records from the given set of values.

Getting ready

To perform this recipe, you should have a running Hadoop cluster as well as an eclipse that's similar to an IDE.

How to do it...

A lot of the time, we might need to find the top X values from the given set of values. A simple example could be to find the top 10 trending topics from a Twitter dataset. In this case, we will need to use two map reduce jobs. First of all, find out all the words that start with # and the number of times each hashtag has occurred in a given set of data. The first map reduce program is quite simple, which is pretty similar to the word count program. But for the second program, we need to use some logic. In this recipe, we'll explore how we can write a map reduce program to find the top X values from the given set. Now, though, lets try to understand the logic behind this.

How to do it...

As shown in the preceding figure, our logic includes finding the top 10 words from each input split and then sending these records to only one reducer. In the reducer, we will again find the top 10 words to get the final result of the top 10 records. Now, let's understand the execution.

First, let's prepare the input. Here, we will use the word count program provided along with Hadoop binaries:

First of all, let's put data in HDFS to be processed.

hadoop fs -mkdir /input
hadoop fs -put /usr/local/hadoop/LICENSE.txt /input
hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.0.jar /input /word_count_output

This will execute the word count program and store the output in the HDFS folder called /word_count_output. This output will be used as the input for our top 10 map reduce program.

Let's take a look at the mapper code:

public static class TopTenMapper extends Mapper<Object, Text, Text, IntWritable> {

        // Tree map keeps records sorted by key
        private TreeMap<Integer, String> countWordMap = new TreeMap<Integer, String>();

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            String[] words = value.toString().split("[\t]");
            int count = Integer.parseInt(words[1]);
            String word = words[0];
            countWordMap.put(count, word);
            if (countWordMap.size() > 10) {
                
                countWordMap.remove(countWordMap.firstKey());
            }

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Entry<Integer, String> entry : countWordMap.entrySet()) {
                context.write(new Text(entry.getValue()), new IntWritable(entry.getKey()));
                
            }

        }
    }

In the preceding code, we are using TreeMap to store the words and their count. TreeMap helps store keys and values sorted order by the key. Here, we are using the count as the key and words as values. In each Mapper iteration, we check whether the size is greater than 10. If it is, we remove the first key from the key map, which would be the lowest count of the set. This way, at the end of each mapper, we will emit the top 10 words of the reducer.

You can read more about TreeMap at http://docs.oracle.com/javase/7/docs/api/java/util/TreeMap.html.

Now, let's take a look at the reducer code:

public static class TopTenReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        // Tree map keeps records sorted by key
        private TreeMap<IntWritable, Text> countWordMap = new TreeMap<IntWritable, Text>();

        public void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                        throws IOException, InterruptedException {

            for (IntWritable value : values) {

                countWordMap.put(value, key);

            }
            if (countWordMap.size() > 10) {

                countWordMap.remove(countWordMap.firstKey());
            }
            for (Entry<IntWritable, Text> entry : countWordMap.descendingMap().entrySet()) {
                context.write(entry.getValue(), entry.getKey());

            }

        }

    }

In the reducer, we will again use TreeMap to find the top 10 of all the collected records from each Mapper. Here, is it very important to use only one reducer for the complete processing; hence, we need to set this in the Driver class, as shown here:

public class TopTenWordsByOccurence {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println("Usage: toptencounter <in><out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "Top Ten Word By Occurence Counter");
        job.setJarByClass(TopTenWordsByOccurence.class);
        job.setMapperClass(TopTenMapper.class);
        job.setCombinerClass(TopTenReducer.class);
        job.setReducerClass(TopTenReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
 job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Now, when you execute the preceding code, as a result, you will see the output in the form of the top 10 words due to their frequencies in the document.

You can modify the same program to get the top 5, 20, or any number.

How it works

Here, the logic is quite straightforward, as shown in the preceding diagram. The trick is using TreeMap, which stores data in a sorted key order. It is also important to use only one reducer, and if we can't, we will again get the number of sets of the top records from each reducer, which will not show you the correct output.