Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • MapReduce Design Pattern – Finding Top-K Records

MapReduce Design Pattern – Finding Top-K Records

 July 14  | 0 Comments

Have you ever wondered how to process huge data residing on multiple systems? Well here is a simple solution for the same – Hadoop’s MapReduce feature.

MapReduce is a software framework for easily writing applications which process vast amounts of data residing on multiple systems. Although it is a very powerful framework, it doesn’t provide a solution for all the big data problems.

Before discussing about MapReduce let first understand framework in general. Framework is a set of rules which we follow or should follow to obtain the desired result. So whenever we write a MapReduce program we should fit our solution into the MapReduce framework.

Although MapReduce is very powerful it has its limitations. Some problems like processing graph algorithms, algorithms which require iterative processing, etc. are tricky and challenging. So implementing such problems in MapReduce is very difficult. To overcome such problems we can use MapReduce design pattern.

[Note: A Design pattern is a general repeatable solution to a commonly occurring problem in software design. A design pattern isn’t a finished design that can be transformed directly into code. It is a description or template for how to solve a problem that can be used in many different situations.]

We generally use MapReduce for data analysis. The most important part of data analysis is to find outlier. An outlier is any value that is numerically distant from most of the other data points in a set of data. These records are most interesting and unique pieces of data in the set.

The point of this blog is to develop MapReduce design pattern which aims at finding the Top K records for a specific criteria so that we could take a look at them and perhaps figure out the reason which made them special.

This can be achived by defining a ranking function or comparison function between two records that determines whether one is higher than the other. We can apply this pattern to use MapReduce to find the records with the highest value across the entire data set.

Before discussing MapReduce approach let’s understand the traditional approach of finding Top K records in a file located on a single machine.

Steps to find K records

Traditional Approach: If we are dealing with the file located in the single system or RDBMS we can follow below steps find to K records:

  1. Sort the data
  2. Pick Top K records

MapReduce approach: Solving the same using MapReduce is a bit complicated because:

  1. Data is not sorted
  2. Data is processed across multiple nodes

Finding Top K records using MapReduce Design Pattern

For finding the Top K records in distributed file system like Hadoop using MapReduce we should follow the below steps:

  1. In MapReduce find Top K for each mapper and send to reducer
  2. Reducer will in turn find the global top 10 of all the mappers

To achieve this we can follow Top-K MapReduce design patterns which is explained below with the help of an algorithm:

class mapper:
map(key, record):
insert record into top ten sorted list
if length of array is greater-than 10 then
truncate list to a length of 10
cleanup():
for record in top sorted ten list:
emit null,record
class reducer:
reduce(key, records):
sort records
truncate records to top 10
for record in records:
emit record;

The above algorithm is shown pictorially below:

As shown in Figure 1 above we find the local Top K for each mapper which is in turn sent to the reducer.

Let’s consider the same with the help of sample  data:

yearID,teamID,lgID,playerID,salary

1985,ATL,NL,barkele01,870000
1985,ATL,NL,bedrost01,550000
1985,ATL,NL,benedbr01,545000
1985,ATL,NL,campri01,633333
1985,ATL,NL,ceronri01,625000
1985,ATL,NL,chambch01,800000

Above data set contains 5 columns – yearID, teamID, lgID, playerID, salary. In this example we are finding Top K records based on salary.

For sorting the data easily we can use java.lang.TreeMap. It will sort the keys automatically.

But in the default behavior Tree sort will ignore the duplicate values which will not give the correct results.

To overcome this we should create a Tree Map with our own compactor to include the duplicate values and sort them.

 

Below is the implementation of Comparator to sort and include the duplicate values :

Comparator code:

import java.util.Comparator;
 public class Salary {
 private int sum;
     public int getSum() {
           return sum;
     }
     public void setSum(int sum) {
  this.sum = sum;
     }
public Salary(int sum) {
           super();
           this.sum = sum;
     }
}
class MySalaryComp1 implements Comparator<Salary>{
    @Override
    public int compare(Salary e1, Salary e2) {
        if(e1.getSum()>e2.getSum()){
            return 1;
        } else {
            return -1;
        }
    }
}

Mapper Code:

public class Top20Mapper extends Mapper<LongWritable, Text, NullWritable, Text> {
// create the Tree Map with MySalaryComparator
public static TreeMap<sala, Text> ToRecordMap = new TreeMap<Salary , Text>(new MySalaryComp1());
                    
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
                     String line=value.toString();
                     String[] tokens=line.split("\t");
                     //split the data and fetch salary
                     int salary=Integer.parseInt(tokens[3]);
                //insert salary object as  key and entire row as value
                     //tree map sort the records based on salary
               ToRecordMap.put(new Salary (salary), new Text(value));
 
// If we have more than ten records, remove the one with the lowest salary
// As this tree map is sorted in descending order, the employee with
// the lowest salary is the last key.
 
Iterator<Entry<Salary , Text>> iter = ToRecordMap.entrySet().iterator();
     Entry<Salary , Text> entry = null;
                
                 while(ToRecordMap.size()>10){
                   entry = iter.next();      
   iter.remove();         
                 }
                     }
protected void cleanup(Context context) throws IOException, InterruptedException {
 
// Output our ten records to the reducers with a null key
 
                     for (Text t:ToRecordMap.values()) {
                     context.write(NullWritable.get(), t);
                     }
                     }                 
}

According to Text input format we receive 1 line for each iteration of Mapper. In order to get the top 10 records from each input split we need all the records of split so that we can compare them and find the Top K records.

First step in Mapper is to extract the column based on which we would like to find Top K records and insert that value as key into TreeMap and entire row as value.

If we have more than ten records, we remove the one with the least salary as this tree map is sorted in descending order. The employee with the least salary is the last key.

Cleanup is the overridden method of Mapper class. This method will execute at the end of mapclass i.e. once per split. In clean up method of map we will get the Top K records for each split. After this we send the local top 10 of each map to reducer.

Reducer Code:

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.Map.Entry;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
 
 
      public class Top20Reducer extends Reducer<NullWritable, Text, NullWritable, Text> {
                public static TreeMap<Salary , Text> ToRecordMap = new TreeMap<Salary , Text>(new MySalaryComp1());
 
                public void reduce(NullWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
 
                     for (Text value : values) {
                       String line=value.toString();
                      if(line.length()>0){
                      String[] tokens=line.split("\t");
                     //split the data and fetch salary
                     int salary=Integer.parseInt(tokens[3]);
                     //insert salary as key and entire row as value
                     //tree map sort the records based on salary
                    ToRecordMap.put(new Salary (salary), new Text(value));
                     }
                     }
                     // If we have more than ten records, remove the one with the lowest sal
                     // As this tree map is sorted in descending order, the user with
                     // the lowest sal is the last key.
                     Iterator<Entry<Salary , Text>> iter = ToRecordMap.entrySet().iterator();
                     Entry<Salary , Text> entry = null;
         
          while(ToRecordMap.size()>10){
             entry = iter.next();
                           iter.remove();            
          }
                     for (Text t : ToRecordMap.descendingMap().values()) {
                     // Output our ten records to the file system with a null key
                     context.write(NullWritable.get(), t);
                     }
                 }
 
     }

In Reducer we will get the Top 10 records from each Mapper and the steps followed in Mappers are repeated except the clean up phase because we have all the records in Reducer since key is the same for all the mappers i.e. NullWritable.

The Output of the Job is Top K records.

This way we can obtain the Top K records using MapReduce functionality.

I hope this blog was helpful in giving you a better understanding of Implementing MapReduce design pattern.

Keep visiting our site Acadgild for more updates on Big data and other technologies.

>