Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • Implementing Custom Output Format in Hadoop

Implementing Custom Output Format in Hadoop

 July 13  | 0 Comments

In this post, we will be discussing how to implement Custom output format in Hadoop. Here we will implement XML output format, which converts all the output keys and values into XML format.

Let’s implement a word count program in MapReduce and write a custom output format which stores the key and value in XML format.

The following is the word count program.

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(XMLOutputFormat.class);
Path out=new Path(args[1]);
out.getFileSystem(conf).delete(out);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Now, to write custom output format, we need to extend the default FileOutputFormat. Inside this default FileOutputFormat, we need to write our own custom RecordWriter. RecordWriter is the one that receives all our output key and values, which are written in the context. Few default methods need to be overridden in order to write a custom output format.

First, we need to write a class extending the RecordWriter.

protected static class XMLRecordWriter extends RecordWriter<Text, IntWritable> {

Two methods need to be overridden for sure while writing a custom output format. They are write(k,v) and close(TaskAttemptContext context) methods.

Write method takes the Key and value from the context and returns the key, the key to write and >value, the value to write.

The close method closes the RecordWriter to future operations and context stands for the context of the task.

A data output stream lets an application to write primitive Java data types to an output stream in a portable way. An application can then use a data input stream to read the Bigdata back in.

We use WriteBytes() method to modify our key and values.

Finally, our custom output format class looks as shown below:

import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class XMLOutputFormat extends FileOutputFormat<Text, IntWritable> {
protected static class XMLRecordWriter extends RecordWriter<Text, IntWritable> {
private DataOutputStream out;
public XMLRecordWriter(DataOutputStream out) throws IOException
{
this.out = out;
out.writeBytes("<Output>\n");
}
private void writeStyle(String xml_tag,String tag_value) throws IOException{
out.writeBytes("<"+xml_tag+">"+tag_value+"</"+xml_tag+">\n");
}
public synchronized void write(Text key, IntWritable value) throws IOException
{
out.writeBytes("<record>\n");
this.writeStyle("key", key.toString());
this.writeStyle("value", value.toString());
out.writeBytes("</record>\n");
}
public synchronized void close(TaskAttemptContext job)
throws IOException
{
try {
out.writeBytes("</Output>\n");
} finally {
out.close();
}
}
}
public RecordWriter<Text, IntWritable> getRecordWriter(
TaskAttemptContext job)
throws IOException {
String file_extension = ".xml";
Path file = getDefaultWorkFile(job, file_extension);
FileSystem fs = file.getFileSystem(job.getConfiguration());
FSDataOutputStream fileOut = fs.create(file, false);
return new XMLRecordWriter(fileOut);
}
}

In the getRecordWriter class, we can modify our output file names. Here we are modifying the name of the part file. As we are using XML format, we have given the extension as .xml.lets an application write primitive Java data types to an output stream in a port

We can set our custom class as out output format in the driver class using the

job.setOutputFormatClass(XMLOutputFormat.class); method.

Now we will build the jar file and run it as a normal Hadoop program. The input data, which we are using, is as follows:

hello all from acadgild

acadgild teaches Hadoop

I am learning hadoop from acadgild An application can then use a data input stream to read the data back in.

 

You can refer to the below screenshots to know how we ran the program.

After executing, you can see that the .xml file has been created. You can refer to the below screenshot for the same.
The output in XML format is as follows:
    <Output>
    <record>
    <key>Hadoop</key>
    <value>1</value>
    </record>
    <record>
    <key>I</key>
    <value>1</value>
    </record>
    <record>
    <key>acadgild</key>
    <value>3</value>
    </record>
    <record>
    <key>all</key>
    <value>1</value>
    </record>
    <record>
    <key>am</key>
    <value>1</value>
    </record>
    <record>
    <key>from</key>
    <value>2</value>
    </record>
    <record>
    <key>hadoop</key>
    <value>1</value>
    </record>
    <record>
    <key>hello</key>
    <value>1</value>
    </record>
    <record>
    <key>learning</key>
    <value>1</value>
    </record>
    <record>
    <key>teaches</key>
    <value>1</value>
    </record>
    </Output>
We hope this post has been helpful in understanding how to write custom output format in Hadoop. In case of any queries, feel free to comment below and we will get back to you at the earliest.
Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.
>