
Implementing a user-defined counter in a Map Reduce program
In this recipe, we are going to learn how to add a user-defined counter so that we can keep track of certain events easily.
Getting ready
To perform this recipe, you should have a running Hadoop cluster as well as an eclipse that's similar to an IDE.
How to do it...
After every map reduce execution, you will see a set of system defined counters getting published, such as File System counters, Job counters, and Map Reduce Framework counters. These counters help us understand the execution in detail. They give very detailed information about the number of bytes written to HDFS, read from HDFS, the input given to a map, the output received from a map, and so on. Similar to this information, we can also add our own user-defined counters, which will help us track the execution in a better manner.
In earlier recipes, we considered the use case of log analytics. There can be chances that the input we receive might always not be in the same format as we expect it to be. So, its very important to track such bad records and also avoid any failures because of them. In order to achieve this, in this recipe, we are going to add one custom counter that keeps track of such bad records without failing the task.
First of all, we have to define the counter as enum
in our program:
private enum COUNTERS { INVALID_RECORD_COUNT }
Next, we will update our mapper code to use the defined counter, as shown here:
public static class PageViewMapper extends Mapper<Object, Text, Text, IntWritable> { private static final IntWritable one = new IntWritable(1); public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { ApacheAccessLogWritable log = null; try { // If record is in expected format, do normal processing log = ApacheAccessLogWritable.parseFromLogLine(value.toString()); context.write(log.getUrl(), one); } catch (Exception e) { // if not, increment the invalid record counter System.out.println("Invalid record found"); context.getCounter(COUNTERS.INVALID_RECORD_COUNT).increment(1L); } } }
The reducer code will remain as it is while we will update the driver code to print the final count of invalid records, as shown here:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: PageViewCounter <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Page View Counter");
job.setJarByClass(PageViewCounter.class);
job.setMapperClass(PageViewMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
org.apache.hadoop.mapreduce.Counters counters = job.getCounters();
System.out.println("No. of Invalid Records :" + counters.findCounter(COUNTERS.INVALID_RECORD_COUNT).getValue());
}
Now, to demonstrate, I've added a few invalid records (records with fewer columns than expected) and added the log file to HDFS. So, when I execute the program, I can see the invalid record count getting printed at the end of the execution:
$hadoop jar logAnalyzer.jar com.demo.PageViewCounter /log-input /log-output 15/11/15 08:44:37 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/15 08:44:37 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 15/11/15 08:44:37 INFO input.FileInputFormat: Total input paths to process : 2 15/11/15 08:44:37 INFO mapreduce.JobSubmitter: number of splits:2 15/11/15 08:44:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1447554086128_0003 15/11/15 08:44:38 INFO impl.YarnClientImpl: Submitted application application_1447554086128_0003 15/11/15 08:44:38 INFO mapreduce.Job: The url to track the job: http://admin1:8088/proxy/application_1447554086128_0003/ 15/11/15 08:44:38 INFO mapreduce.Job: Running job: job_1447554086128_0003 15/11/15 08:44:43 INFO mapreduce.Job: Job job_1447554086128_0003 running in uber mode : false 15/11/15 08:44:43 INFO mapreduce.Job: map 0% reduce 0% 15/11/15 08:44:50 INFO mapreduce.Job: map 100% reduce 0% 15/11/15 08:44:55 INFO mapreduce.Job: map 100% reduce 100% 15/11/15 08:44:55 INFO mapreduce.Job: Job job_1447554086128_0003 completed successfully 15/11/15 08:44:55 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=580 FILE: Number of bytes written=345070 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=3168 HDFS: Number of bytes written=271 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=8542 Total time spent by all reduces in occupied slots (ms)=3046 Total time spent by all map tasks (ms)=8542 Total time spent by all reduce tasks (ms)=3046 Total vcore-seconds taken by all map tasks=8542 Total vcore-seconds taken by all reduce tasks=3046 Total megabyte-seconds taken by all map tasks=8747008 Total megabyte-seconds taken by all reduce tasks=3119104 Map-Reduce Framework Map input records=13 Map output records=10 Map output bytes=724 Map output materialized bytes=586 Input split bytes=201 Combine input records=10 Combine output records=8 Reduce input groups=4 Reduce shuffle bytes=586 Reduce input records=8 Reduce output records=4 Spilled Records=16 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=69 CPU time spent (ms)=1640 Physical memory (bytes) snapshot=591077376 Virtual memory (bytes) snapshot=1219731456 Total committed heap usage (bytes)=467140608 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 com.demo.PageViewCounter$COUNTERS INVALID_RECORD_COUNT=3 File Input Format Counters Bytes Read=2967 File Output Format Counters Bytes Written=271
How it works...
Custom counters are helpful in various situations such as keeping track of bad records, count outliers in the form of maximum and minimum values, summations, and so on. The Hadoop framework imposes an upper limit on using these counters. They can be incremented/decremented globally, or you may also update them in mappers or reducers. In either case, they are referred to using the group and counter names. All the counters are managed at the Application Master level. Information about each increment or decrement is passed to the Application Master via heartbeat messages between the containers that run mappers and reducers.
It is better to keep the counters to a limited number as this causes an overhead on the processing framework. The best thing to do is to remember a thumb rule: do not let the number of counters go beyond 100.