Free Shipping

Secure Payment

easy returns

24/7 support

Custom Input Format in Hadoop

 July 9  | 0 Comments

In this post, we will be looking at ways to implement custom input format in Hadoop. For doing this, we have taken the Titanic Bigdata set as an example and have implemented the following problem statement.

Problem Statement:

Find out the number of people who died and survived, along with their genders.

Data Set Description:

The dataset description is as follows:

Column 1: PassengerId

Column 2: Survived (survived=0 & died=1)

Column 3: Pclass

Column 4: Name

Column 5: Sex

Column 6: Age

Column 7: SibSp

Column 8: Parch

Column 9: Ticket

Column 10: Fare

Column 11: Cabin

Column 12: Embarked

You can download the dataset from the below link:

Titanic Data set

Here, we need to implement a custom key which is a combination of two columns i.e., 2nd column, which consists of the dead or the survivors and the 5th column, which contains the gender of the person. So, let’s prepare a custom key by combining both these columns and sort them using the gender column.

To begin with, we need to prepare our custom key. To prepare a custom, we need to implement the WritableComparable interface. Below is the source code, which contains the implementation of custom key.

To know more about writableComparable, click here.

Custom Key

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.*;
import com.google.common.collect.ComparisonChain;
public class Key_value implements WritableComparable<Key_value> {
private String x;
private String y;
public String getX() {
return x;
}
public void setX(String x) {
this.x = x;
}
public String getY() {
return y;
}
public void setY(String y) {
this.y = y;
}
public Key_value(String x, String y) {
this.x = x;
this.y = y;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(x);
out.writeUTF(y);
}
public void readFields(DataInput in) throws IOException {
x = in.readUTF();
y = in.readUTF();
}
public Key_value(){
}
@Override
public int compareTo(Key_value o) {
// TODO Auto-generated method stub
return ComparisonChain.start().compare(this.y,o.y).compare(this.x,o.x).result();
}
public boolean equals(Object o1) {
if (!(o1 instanceof Key_value)) {
return false;
}
Key_value other = (Key_value)o1;
return this.x == other.x && this.y == other.y;
}
@Override
public String toString() {
return x.toString()+","+y.toString();
}
}

In the compareTo method, we have written our logic to sort the keys by the gender column. We have taken the ComparisionChain class and first compared the gender column and then compared the 1st column. Therefore, this logic will print the keys sorted by Gender column.

Note: If you compare only one column, then the second will be considered as a single value by the WritableComparable interface.

Hadoop

Now, we have written a custom key. Next, we need to write one inputFormat class which extends the default FileInputFormat.

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class Titanic_input extends FileInputFormat<Key_value,IntWritable> {
@Override
public RecordReader<Key_value,IntWritable> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException, InterruptedException {
return new MyRecordReader();
}
}

Here, we are implementing the custom input format by extending the default FileInputFormat, which accepts the parameters key and value as our custom _key and the value as IntWritable.

Now, these values are passed to the Record reader, which does the actual formatting of the inputs. The custom RecordReader class is as follows.

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class MyRecordReader extends RecordReader<Key_value,IntWritable> {
private Key_value key;
private IntWritable value;
private LineRecordReader reader = new LineRecordReader();
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
reader.close();
}
@Override
public Key_value getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public IntWritable getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return reader.getProgress();
}
@Override
public void initialize(InputSplit is, TaskAttemptContext tac)
throws IOException, InterruptedException {
reader.initialize(is, tac);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
boolean gotNextKeyValue = reader.nextKeyValue();
if(gotNextKeyValue){
if(key==null){
key = new Key_value();
}
if(value == null){
value = new IntWritable();
}
Text line = reader.getCurrentValue();
String[] tokens = line.toString().split(",");
key.setX(new String(tokens[1]));
key.setY(new String(tokens[4]));
value.set(new Integer(1));
}
else {
key = null;
value = null;
}
return gotNextKeyValue;
}
}

In the RecordReader, the nextKeyVlaue() is the method passed to our inputs. From the dataset, this RecordReader will take each line as input and sets the columns into our custom key as follows:

key.setX(new String(tokens[1]));
key.setY(new String(tokens[4]));

As discussed earlier, we need 2nd and 5th columns passed to our custom key.

The value is set as ‘1‘ since we need to count the number of people.

value.set(new Integer(1));

Now, the Mapper class is as follows:

public static class Map extends Mapper<Key_value, Text, Key_value, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public void map(Key_value key, IntWritable value, Context context ) throws IOException, InterruptedException {
context.write(key1, one);
}
}

The Mapper class will just emit the keys and values, as it is sent by the RecordReader.

The Reducer class is as follows:

public static class Reduce extends Reducer<Key_value, IntWritable, Key_value, IntWritable> {
public void reduce(Key_value key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

The Reducer will count all the values for each unique Reducer.

The Driver class is as follows:

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=new Job();
job.setJarByClass(Female_survived.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Key_value.class);
job.setMapOutputKeyClass(Key_value.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(Titanic_input.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path out=new Path(args[1]);
out.getFileSystem(conf).delete(out);
FileInputFormat.addInputPath(job,new Path( args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}

We need to set the InputFormatClass as our CustomInput class. The OutputKey and OutputValue classes also needs to be set appropriately, otherwise it will throw an error

The final output after running this program is as shown in the below screen shot.

The Keys have been sorted by the gender column. We have successfully implemented custom input format in Hadoop.

Hope this post has been helpful in understanding how to implement custom input format in Hadoop. In case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.

>