All CategoriesBig Data Hadoop & Spark - Advanced

Processing data in MongoDB Using Hadoop's MapReduce

MongoDB and Hadoop are a powerful combination that can be used together to deliver complex analytic solution. Hadoop can be integrated with many sources similarly to MongoDB also.

MongoDB provides Hadoop connector to connect with Hadoop sources. Using this MongoDB connector, we can change the Input and Output formats of MapReduce programs to MongoInput and MongoOutput format classes.

In this tutorial, we will be showing a demo on how to use MongoInputFormat in Map Reduce to work on the data in MongoDB database collection.

Here is the Data Present in Our MongoDB

Our database name is Acadgild and inside that database, we have a collection called user_details in this collection we have stored few user details.

Here is the Map Reduce Code to Work on the Data in MongoDB Collection.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.util.MongoConfigUtil;
public class Mongo_MR {
  public static class TokenizerMapper
       extends Mapper<Object, BasicDBObject, Text, Text>{
    public void map(Object key, BasicDBObject value, Context context
                    ) throws IOException, InterruptedException {
    	String id = value.get("_id").toString();
    	String device = value.get("created_at").toString();
        context.write(new Text(id.toString()),new Text(device.toString()));
    }
  }
  public static class IntSumReducer
       extends Reducer<Text,Text,Text,Text> {
    public void reduce(Text key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException {
    	Text result =  null;
      for (Text val : values) {
    	  result = val;
      }
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    MongoConfigUtil.setInputURI(job.getConfiguration(), "mongodb://localhost/Acadgild.user_details");
    job.setJarByClass(Mongo_MR.class);
    job.setNumReduceTasks(0);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setInputFormatClass(com.mongodb.hadoop.MongoInputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileOutputFormat.setOutputPath(job, new Path(args[0]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Let us See What the Mapper Class Contains:

Hadoop

public static class TokenizerMapper
       extends Mapper<Object, BasicDBObject, Text, Text>{
    public void map(Object key, BasicDBObject value, Context context
                    ) throws IOException, InterruptedException {
    	String id = value.get("_id").toString();
    	String device = value.get("created_at").toString();
        context.write(new Text(id.toString()),new Text(device.toString()));
    }
  }

MongoDB stores data as BSON objects, so here we have used the Mapper input value as BasicDBObject. The same is passed into the map method. Map method will work on each object. Here we are retrieving the data of object_id and created_at fields. get() method is used to parse the BasicDBObject and extract the required fields from the DB object.

Here we are sending this object_id as key and created_at field as the value. The same will be sent to the reducer as input.

In the reducer class, we are just sending the Mapper output to the context. We are not performing any kind of operations in the reducer class.

When it comes to the Driver class, we need to pass the MongoDB configurations to Hadoop.

Using MongoConfigUtils class we need to pass the MongoDB URI to Hadoop as shown below.

MongoConfigUtil.setInputURI(job.getConfiguration(), "mongodb://localhost/Acadgild.user_details");

MongoDB URI consists of the URL to your MongoDB databas followed by the collection_name.

Here our database name is Acadgild and the collection name is user_details.

As we are using MongoInputFormat in MapReduce, we need to change the InputFormat of our MapReduce program to MongoInputFormat as shown below.

job.setInputFormatClass(com.mongodb.hadoop.MongoInputFormat.class);

If your output of MapReduce code is to be stored in MongoDB, then you need to provide the OutputFormat class as MongoOutputFormat. In this tutorial, we are storing the output in the HDFS itself.

For executing this program, you need to have MongoDB Hadoop connector jar files. You can download them from the below link.

https://drive.google.com/open?id=0ByJLBTmJojjzVUlvRGlBQVV0RDA

You can download these jar files and you can copy them into your $HADOOP_HOME/share/hadoop/mapreduce directory so that while executing this program your MapReduce code can directly take these dependencies.

We have built a jar file for this program, let us run it as a normal Hadoop jar and we will check for the output.

Here we need not provide input file path as the input is directly taken from the database URI given in the driver program. So we have just provided the output file path. The output of this program is stored in /mongo directory of HDFS.

Let us check for the output in the path we have given while deploying the jar file.

In the above screenshot, you can see that the object id and the created_at date files. So we have successfully processed the data in MongoDB using Hadoop’s MapReduce using MongoDB Hadoop connectors.

In the similar way, you can also perform Data Migration from MongoDB to HDFS using MapReduce.

We hope this blog helped you in understanding how to process data in MongoDB using MapReduce. Keep visiting our site www.acadgild.com for more updates on Big data and other technologies.

Hadoop

Tags

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close