Big Data Hadoop & Spark

MapReduce Custom Partitioner

In this post, we will be looking at how the custom partitioner in MapReduce Hadoop works. This post will give you a good idea of how a user can split reducer into multiple parts (sub-reducers) and store the particular group results in the split reducers via custom partitioner.

Before beginning with the custom partitioner, it is best to have some basic knowledge in the concept of MapReduce program. You can to refer to below blog to brush up on the basics of MapReduce concepts and about the working of MapReduce program.

MapReduce Use Case – Youtube Data Analysis
Map reduce Use case – Titanic Data Analysis
Mapper class in Hadoop
Reducer class in Hadoop
 

What is Custom Partitioner?

Custom Partitioner is a process that allows you to store the results in different reducers, based on the user condition. By setting a partitioner to partition by the key, we can guarantee that, records for the same key will go to the same reducer. A partitioner ensures that only one reducer receives all the records for that particular key.

How Partitioning is done in Hadoop?

HashPartitioner is the default partitioner in Hadoop, which creates one Reduce task for each unique “key”.  All the values with the same key goes to the same instance of your reducer, in a single call to the reduce function.

If user is interested to store a particular group of results in different reducers, then the user can write his own partitioner implementation. It can be general purpose or custom made to the specific data types or values that you expect to use in user application.

Let’s look at the steps to create a custom made partitioner and populate the results in the following example.

Example:

Here, our aim of this example is to subdivide a reducer into multiple manageable parts based on the Emp dataset Department name field and to store the highest paid employee details in the each of these reducers.

Emp Dataset

Data Set Description:

Column 1 : Employee id

Column 2 : Employee name

Column 3 : Department name

Column 4 : Employee designation

Column 5 : Employee salary

Column 6 : Employee mail_id

Data Set:

Source Code:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

MapPartitioner:

1. public class Sp extends Configured implements Tool

2. {

3. public static class MapPartitioner extends Mapper<LongWritable, Text, Text, Text>

4. {

5. public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

6. {

7. String[] tokens = value.toString().split(“,”);

8. String emp_dept = tokens[2].toString();

9. String emp_id_n_ds_sal = tokens[0]+”,”+tokens[1]+”,”+tokens[3]+”,”+tokens[4]+”,”+tokens[5];

10. context.write(new Text(emp_dept), new Text(emp_id_n_ds_sal));

11. }

12. }

The following is the explanation of the above mapper code:

  • In Line 1, we are taking class by name Sp, which extends Configured class properties and implements Tool interface.
  • In Line 3, the mapper default class is extended to include the arguments KeyIn as LongWritable, ValueIn as Text, KeyOut as Text and ValueOut as Text.
  • In Line 5, we are overriding the map method, which will run one time for every line.
  • In Line 7, we are converting our input value to string and splitting the line by using comma “,” delimiter and storing the values in a String Array, so that all the columns in a row are stored in the string array.
  • In Line 8, we are storing the employee department name column, which is in the 3rd column in our input file Emp.
  • In Line 9, we are storing rest of the employee information, i.e., employee id, employee name, employee designation and employee salary in a String variable emp_id_n_ds_sal.

Hadoop

DeptPartitioner:

13. public static class DeptPartitioner extends Partitioner<Text, Text>

14. {

15. @Override

16. public int getPartition(Text key, Text value, int numReduceTasks)

17. {

18. String emp_dept = key.toString();

19. if(numReduceTasks == 0)

20. return 0;

21. if(key.equals(new Text(“Program Department”)))

22. {

23. return 0;

24. }

25. else if(key.equals(new Text(“Admin Department”)))

26. {

27. return 1 % numReduceTasks;

28. }

29. else

30. return 2 % numReduceTasks;

31. }

32. }

The following is the explanation of the above mapper code:

  • In Line 13, we are taking class by name DeptPartitioner, which extends Partitioner class.
  • In Line 16, the command overrides the getPartition function, which has three parameters. The key and value are the intermediate key and value produced by the map function. The numReduceTasks is the number of reducers used in the MapReduce program and is specified in the driver program.
  • In Line 17, we are converting key department name from Text type to String data type and storing the key value department name in the String variable emp_dept.
  • In Line 18, we are including a condition, where if the number of reduce task is zero then we exit (Line 20) or else proceed (Line 21).
  • In Line 21, we are including a condition, where if the key value name equals to department name “Program Department”, then we will return to 1st reducer.
  • In Line 25, we are including a condition else, where if the key value name equals to department name “Admin Department”, then we will return to 2nd reducer.
  • In Line 29, we are including a condition else, where if the key value name is not equal to the department name “Program Department” and “Admin Department”, then we must return to the 3rd reducer.

ReducePartition:

33. static class ReduceParitioner extends Reducer<Text, Text, Text, Text>

34. {

35. @Override

36. public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException

37. {

38. int max_sal = Integer.MIN_VALUE;

39. String emp_name = ” “;

40. String emp_dept = ” “;

41. String emp_des = ” “;

42. String emp_id = ” “;

43. int emp_sal = 0;

44. for(Text val: values)

45. {

46. String [] valTokens = val.toString().split(“,”);

47. emp_sal = Integer.parseInt(valTokens[3]);

48. if(emp_sal > max_sal)

49. {

50. emp_id = valTokens[0];

51. emp_name = valTokens[1];

52. emp_des = valTokens[2];

53. emp_dept =key.toString();

54. max_sal = emp_sal;

55. }

56. }

57. context.write(new Text(emp_dept), new Text(“id=>”+emp_id+”,name=>”+emp_name+”,des=>”+emp_des+”,sal=>”+max_sal));

58. }

59. }

  • Line 33 extends the default Reducer class with arguments KeyIn as Text and ValueIn as Text, which are same as the outputs of the mapper class and KeyOut as Text and ValueOut as Text which will be the final outputs of our MapReduce program.
  • In Line 35, we are overriding the Reduce method, which will run each time for every key.
  • In Line 38, we are declaring an integer variable max_sal, which will store the highest employee salary.
  • In Line 39, we are declaring a String variable emp_name, which will store the employee name.
  • In Line 40, we are declaring a String variable emp_dept, which will store the employee department name.
  • In Line 41, we are declaring a String variable emp_des, which will store the employee designation.
  • In Line 42, we are declaring a String variable emp_id, which will store the employee id.
  • In Line 43, we are declaring an integer variable emp_sal and initialized it to zero.
  • In Line 44, a foreach loop is taken and will run each time for the values inside the “Iterable values” coming from the shuffle and sort phase after the mapper phase.
  • In Line 46, we are converting the input value line to string and splitting the line columns by using comma “,” delimiter and storing the values in a String Array so that all the columns in a row are stored in the string array.
  • In Line 47, we are converting the column 4 of the String array valTokens into integer type and storing the each value into the emp_sal variable.
  • In line 48, we are including a condition, where if the employee salary is greater than max salary, then we proceed.
  • In line 50, we are storing String valTokens 0th column employee id value into emp_id variable.
  • In line 51, we are storing String valTokens 1st column employee name value into emp_name variable.
  • In line 52, we are storing String valTokens 2nd column employee designation name value into emp_des variable.
  • In line 53, we are converting the partitioned key value into String type and storing the key value into the variable emp_dept.
  • In line 54, we are storing highest paid employee salary in a particular department name into the variable max_sal.
  • Line 57 writes the respected key (department name) and the obtained rest of the employee details (employee id, employee name, employee designation name and employee salary) as value to the context.

Conf Code:

We are partitioning keys based on the “Department Name” category, and we have 3 different department names which are available in the dataset and hence we have partitioned number of reduce tasks to 3. Below are the details of the Driver method:

60. public int run(String[] arg) throws Exception

61. {

62. Configuration conf = getConf();

63. Job job = new Job(conf, “Maxsal”);

64. job.setJarByClass(Sp.class);

65. FileInputFormat.setInputPaths(job, new Path(arg[0]));

66. FileOutputFormat.setOutputPath(job,new Path(arg[1]));

67. job.setMapperClass(MapPartitioner.class);

68. job.setMapOutputKeyClass(Text.class);

69. job.setMapOutputValueClass(Text.class);

70. job.setPartitionerClass(DeptPartitioner.class);

71. job.setReducerClass(ReduceParitioner.class);

72. job.setNumReduceTasks(3);

73. job.setInputFormatClass(TextInputFormat.class);

74. job.setOutputFormatClass(TextOutputFormat.class);

75. job.setOutputKeyClass(Text.class);

76. job.setOutputValueClass(Text.class);

77. System.exit(job.waitForCompletion(true)? 0 : 1);

78. return 0;

79. }

80. public static void main(String[] args) throws Exception

81. {

82. int res = ToolRunner.run(new Configuration(), new Sp(), args);

83. if (args.length != 2)

84. {

85. System.err.println(“Usage: SamplePartitioner <in> <output>”);

86. System.exit(2);

87. }

88. System.exit(res);

89. }

90. }

Way to execute the Jar file to get the final output

Hadoop jar /home/acadgild/Desktop/sp2.jar /Emp /sp_dir

Here, ‘hadoop’ specifies that we are running a Hadoop command and the jar specifies which type of application we are running. Here, sp2.jar is the jar file which we have created, and consists the above source code and the path of the input file name. In our case, it is Emp, which is a text file and sp_dir is the output file where we will be storing the output.

Ways to View the Output:

Hdfs dfs -cat /sp_dir/part-r-00000

Hdfs dfs -cat /sp_dir/part-r-00001

Hdfs dfs -cat /sp_dir/part-r-00002

Here, ‘hdfs’ specifies that we are running Hadoop command, ‘dfs’ specifies that we are performing an operation related to Hadoop distributed file system. Here, ‘-cat’ command is used to view the contents of a file and ‘sp_dir’ is the output directory where our output part files are present. part-r-00000, part-r-00001, part-r-00002 are the output files which contains the above code partitioned results.

We hope this post has been helpful in understanding the concept of Hadoop Custom Partitioning. Keep visiting our blog for more updates on Big Data and other technologies.

Keep visiting our site www.acadgild.com for more updates on Bigdata and other technologies.

Hadoop

One Comment

  1. Pingback: How to write a custom partitioner in MR | Hadoop Reference Links

Leave a Reply

Your email address will not be published. Required fields are marked *

Related Articles

Close