Big Data Hadoop & Spark

Writing UDAF in Pig

Apache Pig – Introduction

Apache Pig is a high-level platform for creating MapReduce programs that will be used with Hadoop.It is a platform for analyzing large data sets, which consist of a high-level language for expressing data analysis programs.The beauty of Pig program’s structure is that they are open to perform any changes on them. This feature of Pig enables programmers to handle very large datasets.

The language used by Pig in this platform is called Pig Latin. Pig Latin abstracts the programming from the Java MapReduce idiom into a notation which makes MapReduce programming high-level, similar to that of SQL for RDBMS systems.

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

Pig Latin is a kind of a data flow language which evaluates one line of code at a time.The code in Pig looks like a script, so it is popularly known as a pig script.

What are UDF’s in Pig?

A user-defined function (UDF) is a function provided by the user of a program or environment, in a context where the usual assumption is that functions are built into the program or environment.

There are four type of UDF’s in Pig

  • Eval functions
  • Aggregate functions
  • Filter functions
  • Load functions

Now we will be discussing about Aggregate functions in Pig which can be called as UDAF’s(User Defined Aggregate Functions).

Aggregate Function

Coming to Aggregate Functions, they are a type of EvalFunc in Pig and perform operations on grouped data. It takes one group as input from foreach and perform operations on that group and returns a scalar value as a result. To perform this type of operation, it uses an algebraic type of interface.

An algebraic interface is where it takes one result as input, performs operations on the input and produces some intermediate results. The intermediate results are further processed and produced as a final result.

You might wonder how an algebraic interface is related to Aggregate functions.Since we are performing operations on group, all the group will be taken as input and one or more field will serve as intermediate values. The operation will be performed again on the intermediate values and a single result will be returned. 

Example for Aggregate Function

COUNT is an example of Aggregate Function in Pig, using COUNT number of values in a group can be counted.

The structure of an Aggregate function looks like this:

public interface Algebraic{
public String getInitial();
public String getIntermed();
public String getFinal();
}

This Algebraic interface has some methods which should be overridden in order to write a Pig UDAF

  • getIntial() method will take the supplied tuple as input and produces some intermediate results, which are then carried to the getIntermed().
  • getIntermed() method will take the output of getIntial() method which are called as intermediate values and are finally returned to the getFinal().
  • getFinal() method handles the output of getIntermed() method and returns the final result of the function.

We can define these methods used in Aggregate functions as shown below:

 getIntial( ) is like Map method which takes the input and

getIntermed( ) is like combiner which perform operations on the intermdiate results and

getFinal( ) is like Reducer which gives the final result.

This interface is implemented in the UDAF class by extending the EvalFunc class in Pig.

Let us implement this UDAF using olympic data set. You can download the data set from the below link.

Olympic Data set

The data description for the data set is as follows

Athlete: This field consists of the athlete name

Age: This field consists of athlete age

Country : This fields consists of the country names which participated in olympics

Year : This field consists of the year

Closing Date: This field consists of the closing date of ceremony

Sport : Consists of the sports name

Gold Medals : No. of Gold medals

Silver Medals : No. of Silver medals

Bronze Medals : No. of Bronze medals

Total Medals : Consists of total no. of medals

In this case study we will be calculating the number of people for each age category

Count Aggregate Function – Example

Now let’s see how the Count Aggregate function is implemented:

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class COUNT_UDAF extends EvalFunc<Long>implements Algebraic{
public Long exec(Tuple input) throws IOException {
                        returncount(input);
                        }
public String getInitial() {
            return Initial.class.getName();
            }
public String getIntermed() {
            return Intermed.class.getName();
            }
public String getFinal() {
            return Final.class.getName();
            }
@SuppressWarnings("rawtypes")
static public class Initial extends EvalFunc<Tuple>{
public Tuple exec(Tuple input) throws IOException {
            return TupleFactory.getInstance().newTuple(count(input));
            }
}
static public class Intermed extends EvalFunc<Tuple>{
public Tuple exec(Tuple input) throws IOException {
            return TupleFactory.getInstance().newTuple(sum(input));
            }
}
static public class Final extends EvalFunc<Long>{
public Long exec(Tuple input) throws IOException {
            return sum(input);
            }
}
static protected Long count(Tuple input) throws ExecException {
Object values = input.get(0);
if (valuesinstanceof DataBag) return ((DataBag)values).size();
else if(valuesinstanceof Map) returnnew Long(((Map)values).size());
                        return null;
}
static protected Long sum(Tuple input) throws ExecException, NumberFormatException {
DataBag values = (DataBag)input.get(0);
longsum = 0;
for (Iterator<Tuple>it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
sum += (Long)t.get(0);
}
return sum;
}
}

Hadoop

Walk through of the above program

public class COUNT_UDAF extends EvalFunc (Long) implements Algebraic{
public Long exec(Tuple input) throws IOException {return count(input);}
public String getInitial() {return Initial.class.getName();}
public String getIntermed() {return Intermed.class.getName();}
public String getFinal() {return Final.class.getName();}
  • First, we are declaring the class as COUNT_UDAF and this extends the default EvalFunc class of Pig.
  • We are specifying the parameter as ‘Long’. This implements the required algebraic interface for writing Aggregate Functions (UDAFs).
  • In the next line we are overriding the exec() function which take a Tuple as input and returns one output (as previously explained, EvalFunc takes one input and returns one output).
  • The GetIntial() method will return a class with name ‘Intial’. The input in that class will be processed and will return the result into the getIntermed() method.
  • The GetIntermed() method will return a class with name ‘Intermed’. The output obtained from getIntial() is then processes as the input to this method, which are the intermediate results in the program. The output from this class is then served as an input to the getFinal() method.
  • The GetFinal() will return a class with name ‘Final’, which will be processes as the output of the getFinal() method and return the final results required by us (i.e., count).

Hadoop

Now let’s look at the operations of an ‘Intial’ class:

static public class Initial extends EvalFunc<Tuple>{
public Tuple exec(Tuple input) throws IOException {
            return TupleFactory.getInstance().newTuple(count(input));
            }
}
  • In Pig, for a Tuple to be taken as a input, we need to extend the EvalFunc() and pass the parameters as Tuple.
  • When extending theEvalFunc, the code must be written inside the exec() function.
  • For constructing new tuple, the TupleFactory must be used. Since we needed a new tuple in the above program, the TupleFactory command was used.
  • .getInstance will just check whether the object belongs to that class or not.
  • The count(input) method defines our new tuple, which will be discussed later on.
  • We know that the Aggregate Functions are applied on a grouped data and a group consists of several Tuples. This method will be applied on each Tuple.
  • This class will return the constructed tuple as input to the Intermed class.

Now let’s look at the operations of an ‘Intermed’ class:

static public class Intermed extends EvalFunc<Tuple>{
public Tuple exec(Tuple input) throws IOException {
            return TupleFactory.getInstance().newTuple(sum(input));
            }
}

The Intermed class will take the Input from ‘Intial’ and process it to create a new Tuple, similar to Intial class. This will return the newly constructed Tuple to the Final class as input.

sum(input) method defines our new tuple.

Now let’s look at the operations of an ‘Final’ class:

static public class Final extends EvalFunc<Long>{
public Long exec(Tuple input) throws IOException {
            returnsum(input);
            }
}

This Final class will take the input from Intermed class and process it.Here we are just returning the obtained sum from Intermed.

Now, let’s look at the count and sum methods:

static protected Long count(Tuple input) throws ExecException {
Object values = input.get(0);
if (valuesinstanceof DataBag) return ((DataBag)values).size();
else if (valuesinstanceof Map) return new Long(((Map)values).size());
                        return null;
}
  • This method will take the Tuple as input and grabs the required field using theget() method.
  • It is then stored in the variable with name values and if the value is an instanceof theDataBag, it will return ((DataBag)values).size(), which means the number of fields are present in the variable values.
  • Here we have stored only one value inside it, so the size will obviously be one.
  • This will be passed as input to the Intermed Class.
static protected Long sum(Tuple input) throws ExecException, NumberFormatException {
DataBag values = (DataBag)input.get(0);
longsum = 0;
for (Iterator<Tuple>it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
sum += (Long)t.get(0);
}
return sum;
}
  • The sum takes the input from the Intial Class.As discussed earlier, thecount(input) will be applied on every tuple in that group, and will send several inputs to the Intermed Class.
  • Here, an iterator is used to handle the list of inputs(all are 1’s in this case ) and is summed. This in turn returns as an input to the Final class.
  • The Final class just returns the sum.

We have finally implemented the COUNT Aggregate function will give the count of values.

Note: We need to build a jar file from the above java code and to bring the functionality of the given java code  by registering the jar file again in our pig script.

Refer the below document to know how to build a jar file and implement it in your pig script.

 https://drive.google.com/open?id=0ByJLBTmJojjzbDRWaHJuZGNTVlk

Now let us see the pig script for our problem statements

REGISTER /home/kiran/jars/count.jar
A = LOAD '/olympix_data.csv';
B = GROUP A BY $1;
C = FOREACH B GENERATE group, COUNT_UDAF(A);

First we have Registered the jar file. We have created  the jar file with name count and when you want to use the registered UDAF you need to give the package name and class name of the built jar. Here Since this UDAF has been written with a default package, So we need to specify just class name.

We have successfully Register our UDAF and executed it with the pig script. Now let us see the output by using the command dump 

Output: 

In the above screenshot there we can see the number of people in various age category.

This shows that we have successfully implemented UDAF in our pig script.

We hope the implementation of Olympic data set in the blog helped you in understanding UDAF in pig.Keep visiting our site www.acadgild.com for more updates on Bigdata and other technologies.

Suggested Reading

Pig Interview Questions

Hadoop

One Comment

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close