Hadoop Real-World Solutions Cookbook(Second Edition)
上QQ阅读APP看书,第一时间看更新

Adding support for a new writable data type in Hadoop

In this recipe, we are going to learn how to introduce a new data type in Map Reduce programs and then use it.

Getting ready

To perform this recipe, you should have a running Hadoop cluster as well as an eclipse that's similar to an IDE.

How to do it...

Hadoop allows us to add new custom data types ,which are made up of one or more primary data types. In the previous recipe, you must have noticed that when you handled the log data structure, you had to remember the sequence in which each data component was placed. This can get very nasty when it comes to complex programs. To avoid this, we will introduce a new data type in which WritableComparable can be used efficiently.

To add a new data type, we need to implement the WritableComparable interface, which is provided by Hadoop. This interface provides three methods, readFields(DataInput in), write(DataOut out), and compareTo(To), which we will need to override with our own custom implementation. Here, we are going to abstract the log parsing and pattern matching logic from the user of this data type by providing a method that returns parsed objects:

public class ApacheAccessLogWritable implements WritableComparable<ApacheAccessLogWritable> {

    public static String APACHE_ACCESS_LOGS_PATTERN = "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+) (.+?) \"([^\"]+|(.+?))\"";
    public static Pattern pattern = Pattern.compile(APACHE_ACCESS_LOGS_PATTERN);

    private Text clientIp, id, username, dateString, httpMethod, url, httpVersion, referral, browserString;
    private IntWritable httpStatus, requestSize;

    @Override
    public void readFields(DataInput in) throws IOException {
        clientIp.readFields(in);
        id.readFields(in);
        username.readFields(in);
        dateString.readFields(in);
        httpMethod.readFields(in);
        url.readFields(in);
        httpVersion.readFields(in);
        referral.readFields(in);
        browserString.readFields(in);
        httpStatus.readFields(in);
        requestSize.readFields(in);

    }

    @Override
    public void write(DataOutput out) throws IOException {
        clientIp.write(out);
        id.write(out);
        username.write(out);
        dateString.write(out);
        httpMethod.write(out);
        url.write(out);
        httpVersion.write(out);
        referral.write(out);
        browserString.write(out);
        httpStatus.write(out);
        requestSize.write(out);

    }

    private ApacheAccessLogWritable(Text clientIp, Text id, Text username, Text dateString, Text httpMethod, Text url,
            Text httpVersion, IntWritable httpStatus, IntWritable requestSize, Text referral, Text browserString) {

        this.clientIp = clientIp;
        this.id = id;
        this.username = username;
        this.dateString = dateString;
        this.httpMethod = httpMethod;
        this.url = url;
        this.httpVersion = httpVersion;
        this.referral = referral;
        this.browserString = browserString;
        this.httpStatus = httpStatus;
        this.requestSize = requestSize;
    }

    public static ApacheAccessLogWritable parseFromLogLine(String logline) {
        Matcher m = pattern.matcher(logline);
        if (!m.find()) {

            throw new RuntimeException("Error parsing logline");
        }

        return new ApacheAccessLogWritable(new Text(m.group(1)), new Text(m.group(2)), new Text(m.group(3)),
                new Text(m.group(4)), new Text(m.group(5)), new Text(m.group(6)), new Text(m.group(7)),
                new IntWritable(Integer.parseInt(m.group(8))), new IntWritable(Integer.parseInt(m.group(9))),
                new Text(m.group(10)), new Text(m.group(11)));
    }

    @Override
    public int compareTo(ApacheAccessLogWritable o) {
        // TODO Auto-generated method stub
        return 0;
    }
    // Getter and Setter methods 
    ..
}

The following piece of code shows us how we can use the data type in our map reduce program; here, I am going to update the same program that we used in the previous recipe. So, the mapper code will be updated as follows:

public static class PageViewMapper extends Mapper<Object, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
 ApacheAccessLogWritable log = ApacheAccessLogWritable.parseFromLogLine(value.toString());
            context.write(log.getUrl(), one);

        }
    }

The highlighted code shows you where we have used our own custom data type. Here, the reducer and driver code remain as it is. Refer to the previous recipe to know more about these two.

To execute this code, we need to bundle the datatype class and map reduce program into a single JAR itself so that at runtime the map reduce code is able to find our newly introduced data type.

How it works...

We know that when we execute the map reduce code, a lot of data gets transferred over the network when this shuffle takes place. Sometimes, the size of the keys and values that are transferred can be huge, which might affect network traffic. To avoid congestion, it's very important to send the data in a serialized format. To abstract the pain of serialization and deserialization from the map reduce programmer, Hadoop has introduced a set of box/wrapper classes such as IntWritable, LongWritable, Text, FloatWritable, DoubleWritable, and so on. These are wrapper classes on top of primitive data types, which can be serialized and deserialized easily. The keys need to be WritableComparable, while the values need to be Writable. Technically, both keys and values are WritableComparable.

Apart from the set of built-in data types, Hadoop also supports the introduction of custom and new data types that are WritableComparable. This is done so that the handling of complex data becomes easy and serialization and deserialization is taken care of automatically. WritableComparable are data types that are easy to serialize and deserialize and can be compared in order to decide what their order is.