Big Data Hadoop & Spark

Hadoop Tutorial: Combiners in Hadoop

In this post, we will be looking into Combiners, and discuss the need and their functionality in Hadoop tutorial.
We know that Hadoop is an open-source framework, which is used to store and process large data sets in a distributed computing environment. Usually, when we are working on large data sets in MapReduce framework, the output from the map tasks is large, the data transfer between map and reduce tasks will be high and this data transfer across the network will also be expensive.
In order to reduce the volume of data transfer between Map and Reduce tasks, Combiner class can be used to summarize the map output records with the same key, and the output (key value collection) of the combiner will be sent over the network to the actual Reducer task as an input. This will help to cut down the amount of data shuffled between the mappers and the reducers.

Code – Example:

Problem statement:
In the below example, we will be writing a MapReduce program to find the highest salary earned by an employee in a particular department.
Dataset:

Dataset Description:
Column 1:  Dept_ID
Column 2:  Salary earned by each employee in Dept_ID (123)
Expected Output:
{123, 3000000 }
Suppose, for the below maximum salary example, the readings for the department id (123) were processed by two maps because they were in different splits.  
Imagine the first map produced the below output.
{ 123, 100000 }
{ 123, 75000 }
{ 123, 200000 }
And the second produced the following output.
{ 123, 1000000 }
{ 123, 890000 }
{ 123, 3000000 }
The reduce function would be called with a list of all the values as shown below.
{ 123, {100000, 75000, 200000, 1000000, 890000, 3000000} }
With output:
{123, 3000000 }
Since 3000000 is the maximum value in the list, we would use a combiner function that is similar to the reduce function, which finds the maximum salary in a particular department for each map output. The reduce function would be called using the following and would produce the same output as before.
{ 123, { 2000000, 3000000 } }
The combiner function doesn’t replace the reduce function, but it does help in bringing down the amount of data shuffled between the mappers and the reducers. For this reason alone, it is always worth considering the use of a combiner function in the MapReduce job.
To perform the above same operation we can use the below program to reduce the overload on reducer using combiner function.

import java.io.IOException;
	import org.apache.hadoop.io.IntWritable;
	import org.apache.hadoop.io.LongWritable;
	import org.apache.hadoop.io.Text;
	import org.apache.hadoop.mapreduce.Mapper;
	import org.apache.hadoop.mapreduce.Reducer;
	import org.apache.hadoop.fs.Path;
	import org.apache.hadoop.mapreduce.Job;
	import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
	import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CombinersHadoop {
	public static class MaxMapper
	extends Mapper<LongWritable, Text, Text, IntWritable> {
	private Text category = new Text();
	private Text sal = new Text();
	private IntWritable salary = new IntWritable();
	@Override
	public void map(LongWritable key, Text value, Context context)
	throws IOException, InterruptedException {
	String line = value.toString();
	String str[]=line.split("\t");
	if(str.length > 1){
	category.set(str[0]);
	sal.set(str[1]);
	int s=Integer.parseInt(str[1]);
	salary.set(s);
	}
	context.write(category,salary);
	}
	}
	public static class MaxReducer
	extends Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text key, Iterable<IntWritable> values,Context context)
	throws IOException, InterruptedException {
	int maxValue = Integer.MIN_VALUE;
	for (IntWritable value : values) {
	maxValue = Math.max(maxValue, value.get());
	}
	context.write(key, new IntWritable(maxValue));
	}
	}
	public static void main(String[] args) throws Exception {
	if (args.length != 2) {
	System.err.println("Working Of : Combiners <input path> <output path>");
	System.exit(-1);
	}
	Job job = new Job();
	job.setJarByClass(CombinersHadoop.class);
	job.setJobName("Working Of Combiners In Hadoop");
	FileInputFormat.addInputPath(job, new Path(args[0]));
	FileOutputFormat.setOutputPath(job, new Path(args[1]));
	job.setMapperClass(MaxMapper.class);
	job.setCombinerClass(MaxReducer.class);
	job.setReducerClass(MaxReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	}

Code Explanation:
Mapper Code:
In line 12, we are taking a class by the name CombinerHadoop.
In line 13, we are taking a class by the name Maxmapper.
In line 14, we are extending the Mapper default class having the arguments keyIn as LongWritable, ValueIn as Text, KeyOut as Text and ValueOut as IntWritable.
In line 15, we are declaring a private Text variable ‘category’, which will store the Dept_ID values as key.
In line 16, we are declaring a private Text variable ‘sal’, which will store the salary of each employee of Dept_ID column as String type.  
In line 17, we are declaring a private IntWritable variable ‘salary’.
In line 19, we are overriding the map method which will run one time for every line.
In line 21, we are storing the line in a string variable ‘line’.
In line 22, we are splitting the line by using tab “\t” delimiter and storing the values in a String Array, so that all the columns in a row are stored in the string array.
In line 23,  we are taking a condition if we have the string array of length greater than 1, which means, if the line or row has at least 1 columns, then it will enter into the if condition and execute the code to eliminate the ArrayIndexOutOfBoundsException.
In line 24, we are storing the values of category column, which is in the 0th position (column 0).
In line 25, we are storing the values of category sal, which is in the 1st column.
In line 26, we are declaring an integer variable s, which will convert all column values of string type str[1] to an integer type.   
In line 27, we are storing the array variable sal values, which is in the 1st column.
In line 29, we are writing the key and value into the context, which will be the output of the map method.
Reducer Code:
In line 32, we are declaring a class MaxReducer, which extends the default Reducer class with arguments KeyIn as Text and ValueIn as IntWritable, which are same as the outputs of the mapper class. In addition, the KeyOut is declared as Text and ValueOut as IntWritbale, which will be final outputs of our MapReduce program.
In line 34, we are overriding the Reduce method which will run each time for every key.
In line 36, we are declaring an integer variable maxValue.
In line 37, a foreach loop is taken which will run each time for the values inside the “Iterable values”, which are coming from the shuffle and sort phase after the mapper phase.
In line 38, we are retrieving and storing the highest value associated with its key.
Line 40, writes the respected key and the obtained sum as value to the context.
Combiner Function:
job.setCombinerClass(MaxReducer.class);
In the main method, the combiner function is defined using the Reducer class. And in this application, it is the same implementation as the reduce function in MaxReducer. The only change we need to make is to set the combiner class on the job.to
The combiner class function is performed on each map output splits and then the sort and shuffled result will be sent to reducer class. Thus, overload on Reducer class is reduced with the help of combiner class method.
Now, let us execute the above program.

How to Execute:

First, let’s store the input file Max_Sal into the HDFS directory.


As shown in the above image, we can observe that the input file Max_Sal is successfully copied into the HDFS directory.
After storing the input file in the HDFS directory, let’s now run the jar file Combiner.jar. In our case, we have exported it in our desktop directory, Max_Sal is our input file and Max_Sal_output is the directory, where we are going to save our MapReduce output part file

View Output:

After successfully running the jar file, we can observe in the below image that the output of the CombinersHadoop program is stored in Max_Sal_output directory.

Hence, from the above steps, we hope we have been clear in explaining the concept and working of Combiner function in Hadoop. In the case of any queries, feel free to comment below and we will get back to you at the earliest.
Keep visiting our website for more post on Big Data and other technologies.
 

2 Comments

  1. In this case the input key:value pair and output key:value pair of the reducer is same. What happens if they are different? Can the reducer class be used as a combiner in that case? As what I understood from here, the output from combiner will be the input for reducer.

    1. Your right Shehansu, but in real time i don’t think so we would face this scenario. If you have anything in mind. Please post that scenario.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close