All CategoriesBig Data Hadoop & Spark - Advanced

Mapreduce Use Case – Sentiment Analysis on Twitter Data

This post is about performing Sentiment Analysis on Twitter data using Map Reduce. We will use the concept of distributed cache to implement Sentiment Analysis on Twitter data.

What does distributed cache do here?

By using distributed cache, we can perform map side joins. So, here we will join the dictionary dataset containing the sentiment values of each word. In order to perform Sentiment Analysis, we will be using a dictionary called AFINN.

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

AFINN is a dictionary, which consists of 2500 words rated from +5 to -5, depending on their meaning.

Here is the complete code to perform Sentiment Analysis using Map Reduce.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
public class Sentiment_Analysis extends Configured implements Tool {
public static class Map extends Mapper<LongWritable, Text, Text, Text>{
private URI[] files;
private HashMap<String,String> AFINN_map = new HashMap<String,String>();
@Override
public void setup(Context context) throws IOException
{
files = DistributedCache.getCacheFiles(context.getConfiguration());
System.out.println("files:"+ files);
Path path = new Path(files[0]);
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream in = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line="";
while((line = br.readLine())!=null)
{
String splits[] = line.split("\t");
AFINN_map.put(splits[0], splits[1]);
}
br.close();
in.close();
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String name;
String twt;
String line = value.toString();
String[] tuple = line.split("\\n");
JSONParser jsonParser = new JSONParser();
try{
for(int i=0;i<tuple.length; i++){
JSONObject obj =(JSONObject) jsonParser.parse(line);
String tweet_id = (String) obj.get("id_str");
String tweet_text=(String) obj.get("text");
twt=(String) obj.get("text");
String[] splits = twt.toString().split(" ");
int sentiment_sum=0;
for(String word:splits){
if(AFINN_map.containsKey(word))
{
Integer x=new Integer(AFINN_map.get(word));
sentiment_sum+=x;
}
}
context.write(new Text(tweet_id),new Text(tweet_text+"\t----->\t"+new Text(Integer.toString(sentiment_sum))));
}
}catch(Exception e){
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key, Text value, Context context) throws IOException, InterruptedException{
context.write(key,value);
}
}
public static void main(String[] args) throws Exception
{
ToolRunner.run(new Sentiment_Analysis(),args);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: Parse <in> <out>");
System.exit(2);
}
DistributedCache.addCacheFile(new URI("/AFINN.txt"),conf);
Job job = new Job(conf, "SentimentAnalysis");
job.setJarByClass(Sentiment_Analysis .class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
}

How did we achieve this?

Here are the three simple steps we have followed to perform Sentiment Analysis:

1. Implementing Distributed Caching

2. Writing a mapper class to calculate the sentiments

3. Writing a reducer class to display all the mapper output

4. Writing a Driver class for our mapreduce program
HadoopImplementing Distributed Caching

In Map Reduce, map-side joins are carried out by distributed cache. Distributed cache is applied when we have two datasets, where the smaller dataset size is limited to the cache memory of the cluster. Here, the dictionary is the smaller dataset, so we are using distributed cache. Here is the implementation of the distributed cache.

private HashMap<String,String> AFINN_map = new HashMap<String,String>();
@Override
public void setup(Context context) throws IOException
{
files = DistributedCache.getCacheFiles(context.getConfiguration());
System.out.println("files:"+ files);
Path path = new Path(files[0]);
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream in = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line="";
while((line = br.readLine())!=null)
{
String splits[] = line.split("\t");
AFINN_map.put(splits[0], splits[1]);
}
br.close();
in.close();
}

Initially, we have declared a HashMap by name AFINN_map to store the key and value i.e., the dictionary and the rating. You can download the dataset from here.

The data is tab separated, containing the word and its rating. So, we have read the file using the FSDataInputStream, read the file line by line using the readLine() method, we have split the line using the tab delimiter, and we have stored the word and its rating in the HashMap, which we have created with name AFINN_map.

files = DistributedCache.getCacheFiles(context.getConfiguration());

The above line reads the cached file in the cluster. So, with that, our cached file was read and processed successfully and the distributed cache implementation is completed.

2. Writing a Map Method to Calculate the Sentiments

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String name;
String twt;
String line = value.toString();
String[] tuple = line.split("\\n");
JSONParser jsonParser = new JSONParser();
try{
for(int i=0;i<tuple.length; i++){
JSONObject obj =(JSONObject) jsonParser.parse(line);
String tweet_id = (String) obj.get("id_str");
String tweet_text=(String) obj.get("text");
twt=(String) obj.get("text");
String[] splits = twt.toString().split(" ");
int sentiment_sum=0;
for(String word:splits){
if(AFINN_map.containsKey(word))
{
Integer x=new Integer(AFINN_map.get(word));
sentiment_sum+=x;
}
}
context.write(new Text(tweet_id),new Text(tweet_text+"\t----->\t"+new Text(Integer.toString(sentiment_sum))));
}
}catch(Exception e){
e.printStackTrace();
}
}

Now, the map method takes each record as input and the record is converted into a string, using the toString method. After this, we have created a jsonobject called jsonparser, which parses each record which is in JSON format.

Now we are extracting the tweet_id and the tweet_text which are required for sentiment analysis as shown below

JSONObject obj =(JSONObject) jsonParser.parse(line);
String tweet_id = (String) obj.get("id_str");
String tweet_text=(String) obj.get("text");

Now, we are splitting the tweet_text into words, taking a for loop which repeats for all the words in the tweet_text, and we are performing an inner join to find the matches from the words in the AFINN dictionary if there is a match then take the rating of the word and add it. The same is as shown below.

String[] splits = twt.toString().split(" ");
int sentiment_sum=0;
for(String word:splits){
if(AFINN_map.containsKey(word))
{
Integer x=new Integer(AFINN_map.get(word));
sentiment_sum+=x;
}
}
context.write(new Text(tweet_id),new Text(tweet_text+"\t----->\t"+new Text(Integer.toString(sentiment_sum))));

Finally, in the context, we are writing the tweet_id as key and the combination of tweeted_text and the sentiment_sum as value.

3. Writing a Reducer Class to Display all the Mappers Output

public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key, Text value, Context context) throws IOException, InterruptedException{
context.write(key,value);
}
}

In the reducer class, we are just passing the input of the mapper as its output.

4. Driver class

public static void main(String[] args) throws Exception
{
ToolRunner.run(new Parse(),args);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: Parse <in> <out>");
System.exit(2);
}
DistributedCache.addCacheFile(new URI("/AFINN.txt"),conf);
Job job = new Job(conf, "SentimentAnalysis");
job.setJarByClass(Sentiment_Analysis .class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}

In the Driver class, we need to provide the path for the cached dataset, using the below line.

DistributedCache.addCacheFile(new URI("/AFINN.txt"),conf);

On top of this, we need to provide the input(tweets_folder) path and the output folder path as arguments.

How to run our mapreduce program? 

In order to run this program, we need to build a jar file of the above project, using the normal Hadoop command to run the program. You can refer to the below screenshot for this.

hadoop jar twitter.jar /user/flume/tweets /user/flume/tweets/output

The output will be created in the part file in the directory /user/flume/tweets/part-r-00000 and the result is displayed as shown in the below screenshot.

We hope this post has been helpful in understanding how to perform Sentiment Analysis using Map Reduce. In the case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.

Suggested Reading

Hadoop Commands

Hadoop

11 Comments

  1. I am getting like
    java:117: error: cannot find symbol
    ToolRunner.run(new Parse(),args);
    ^
    symbol: class Parse
    location: class Sentiment_Analysis
    1 error

  2. I think the line 135
    context.write(new Text(tweet_id),new Text(tweet_text+”\t—–>\t”+new Text(Integer.toString(sentiment_sum))));
    is not correct as the second argument for write should be of type IntWritable

    1. Hi Ann,
      Data types of Keys and Values of Mapper class output depends on the parameters OutputKey and OutputValue that are passed while extending the default Mapper class. Here we have passed the OutputKey and OutputValue parameters of the Mapper class as Text and Text you can refer to the 55th line of the program for the same. public static class Map extends Mapper . Depending on your requirement you can change the output data types of Mapper and Reducer classes.

  3. Thanks for the post.
    I am getting this error :
    Error: java.lang.ClassNotFoundException: org.json.simple.parser.JSONParser
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    How to resolve this ?
    Thanks

    1. Got the solution.
      Add the json jar in hadoop classpath.
      To know hadoop classpath, type this in cmd line
      $hadoop classpath
      and copy paste the json jar in any of the path listed in classpath.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close