Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • Avro File Processing Using MapReduce

Avro File Processing Using MapReduce

 July 9  | 0 Comments

Hadoop has a rich set of file formats like TextFile, SequenceFile, RCFile, ORCFile, Avro file, Paraquet file and much more. In this tutorial, we will show you a demo on Avro File Processing Using MapReduce. Here we will take an Avro file as input and we will process the Avro data and store the output as a normal text file. First, let us know what is Avro.

What is Avro?

Avro is one of the preferred data serialization systems because of its language neutrality.
Due to lack of language portability in Hadoop writable classes, Avro becomes a natural choice because of its ability to handle multiple data formats which can be further processed by multiple languages.

Avro is also very much preferred for serializing the data in Hadoop.
It uses JSON for defining data types and protocols and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services.

By this, we can define Avro as a file format introduced with Hadoop to store data in a predefined format. This file format can be used in any of the Hadoop’s tools like Pig and Hive.

Avro provides MapReduce libraries for processing Avro files when they are taken as input or stored as output for Hadoop programs.

You require two extra jar file for processing Avro files in Hadoop. In this tutorial, we will be showing demo on Hadoop2 cluster.

Download the required jar files from the below link

https://drive.google.com/open?id=0ByJLBTmJojjzUFNDdktYRTg2Z2c

From the above link download avro-mapred-1.7.7.hadoop2.jar, avro-tools-1.7.7 and then add these jars into buildpath.

You can download the Avro dataset used in this tutorial from the below link.

https://drive.google.com/open?id=0ByJLBTmJojjzQlhnX1Byb184MDg

Here is the Avro schema of the Avro data which we have used.

{
"name": "my_record",
"type": "record",
"fields": [
{"name":"athelete", "type":"string"},
{"name":"age", "type":"int"},
{"name":"country", "type":"string"},
{"name":"year", "type":"string"},
{"name":"closing", "type":"string"},
{"name":"sport", "type":"string"},
{"name":"gold", "type":"int"},
{"name":"silver", "type":"int"},
{"name":"bronze", "type":"int"},
{"name":"total", "type":"int"}
]}

You can also download this schema file from the below link.
https://drive.google.com/open?id=0ByJLBTmJojjzZTJCelJ3Y0gxd1E
After preparing the schema, you need to build a java file for the schema file, you can do that by using avro-tools jar file.
Here is the list of things you can do with avro-tools jar file.

You can convert the Avro file into json file and vice versa. Now you need to convert the schema to a java file by using the below command.

java -jar /path/to/avro-tools-1.7.7.jar compile schema <schema file_path> <destination_path>
In the below screen shot, you can see that we have built a java file for the schema.

Now you need to copy this java file into avro_file package. Here is the code to process the Avro file as input.

import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Avro_file_input extends Configured implements Tool {
  public static class ColorCountMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, Text , IntWritable> {
	  private final static IntWritable one = new IntWritable(1);
	    @Override
	    public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
	        	CharSequence country =   (CharSequence) key.datum().get("country") ;
	            context.write(new Text(country.toString()), one);
	    }
  }
  public static class ColorCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	  private IntWritable result = new IntWritable();
	    public void reduce(Text key, Iterable<IntWritable> values,
	                       Context context
	                       ) throws IOException, InterruptedException {
	      int sum = 0;
	      for (IntWritable val : values) {
	        sum += val.get();
	      }
	      result.set(sum);
	      context.write(key, result);
	    }
  }
public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: MapReduceColorCount <input path> <output path>");
      return -1;
    }
    Configuration conf = new Configuration();
	Job job = new Job();
    job.setJarByClass(Avro_file_input.class);
    job.setJobName("Color Count");
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setMapperClass(ColorCountMapper.class);
    AvroJob.setInputKeySchema(job, my_record.getClassSchema());
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(1);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setReducerClass(ColorCountReducer.class);
    Path outputPath = new Path(args[1]);
    outputPath.getFileSystem(conf).delete(outputPath, true);
    return (job.waitForCompletion(true) ? 0 : 1);
  }
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Avro_file_input(), args);
    System.exit(res);
  }
}

The dataset is about Olympics. Above is the program which takes the Avro file as input and counts the numbers of countries that are present in the dataset.

Here is the Mapper class implementation for taking Avro file as input.

public static class ColorCountMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, Text , IntWritable> {
	  private final static IntWritable one = new IntWritable(1);
	    @Override
	    public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {
	        	CharSequence country =   (CharSequence) key.datum().get("country") ;
	            context.write(new Text(country.toString()), one);
	    }
  }

For the Mapper class, the parameters are AvroKey<GenericData.Record>, NullWritable, Text, IntWritable

As avro is a semi structured file format it contains data as records. So the Mapper accepts AvroKey<Schema> and NullWritable as input key and Value pairs.

Here for the AvroKey<> we have given the parameter as GenericData.Record, so this GenericData will automatically fetch the record based on the schema provided and convert as a normal record.

Here instead of GenericData.Record, you can also give the record name of your schema file for which you have built a java program.

Similarly map() method will take the AvroKey<> & NullWritable as key and value.

As the data inside the Avro file are serialized, we will be using CharSequnce to fetch the record data.

The data in an Avro record is of datum so we will be using key.datum().get(“”) and inside the get method you can give the name of the column based on the schema. We have given the Country column. So it will take out the country column data from the avro file.

In the context the country name and Integer 1 will be returned to the reducer.

In the reducer we will be performing normal word count program logic and is as follows:

 public static class ColorCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	  private IntWritable result = new IntWritable();
	    public void reduce(Text key, Iterable<IntWritable> values,
	                       Context context
	                       ) throws IOException, InterruptedException {
	      int sum = 0;
	      for (IntWritable val : values) {
	        sum += val.get();
	      }
	      result.set(sum);
	      context.write(key, result);
	    }
  }

Finally, the output file consists of the count of each country that is present in the dataset.

To take the input as avro format, you need to set the format as AvroInputFormat in your main class.

job.setInputFormatClass(AvroKeyInputFormat.class);

You need to set the Avro schema by using the below method.

AvroJob.setInputKeySchema(job, my_record.getClassSchema());

After setting up the things, you can build a jar file as a normal Hadoop program and you can run as a normal Hadoop program.

For learning how to develop and execute your first Hadoop program, you can follow our eBook.

https://acadgild.com/blog/develop-execute-first-map-reduce-program-ebook-download/

Before running, please make sure that the additional jar files which you have downloaded into the Hadoop classpath otherwise the program will throw class not found exception.

Note: You can copy paste the addition jar files in $HADOOP_HOME/share/hadoop folder.

Now let us run the program using the normal Hadoop jar command as follows:

hadoop jar jars/avro_file.jar /user/hive/warehouse/olympic_avro/000000_0.avro /avro_file

The jar file which we have built for this program is avro_file.jar.

The program has been successfully executed, you can see the result in the below screen shot.

You can see the count of each country in the above screenshot which is stored as a normal Hadoop TextOutputFormat.

We hope this blog helped you in understanding how to process Avro files using MapReduce. Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.

>