Big Data Hadoop & Spark

MapReduce Program: Removing Redundant Image Files

As the amount of data is getting increased day by day it is better to maintain less or no redundant data to save the cluster storage size and to increase faster data processing to get the required results in lesser time. So today, we will be discussing how we can eliminate redundant files from an Hadoop cluster using MapReduce Program.   
We have a directory of image files where we have stored image replicas with different names. Thus, we need to find out and eliminate the redundant images in and hdfs path.
Here, we need to write two programs to perform this task:

  1. We need to convert image files to Hadoop sequence file and
  2. MapReduce redundant image remover program to remove the duplicate images from the given path.

As Hadoop MapReduce framework was designed to store and process large files,  we are using Sequence file format to convert all the image files (small files) into one single large file of binary file type for processing in the MapReduce computation.
A Sequence file is a flat file where the key value pairs of the output file will be in the form of a binary data type.
Once we get the Sequence file with all the binary data, we can use MD5 algorithm to generate a unique key for each image and compare this key in order to find the redundant files.
So, let us begin with  our coding part:

Convert Image  Files  To Hadoop Sequence File

We can refer below program to convert small binary files ( images ) into one sequence file of decent size.

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.*;
public class BinFilesToHadoopSeqFile {
public static class BinFilesToHadoopSeqFileMapper extends Mapper<Object, Text, Text, BytesWritable> {

Method Map

The Mapper implementation, TextinputFormat takes the offset value as key and entire path of the image line by line as value.
Entire value is read in the buffer and written back to bytearray output stream and emits the path of the images as key and the entire file contained in bytearray output stream as value.

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String uri = value.toString();
Configuration conf = new Configuration();
FileSystem fsys = FileSystem.get(URI.create(uri), conf);
FSDataInputStream fsin = null;
try {
fsin = fsys.open(new Path(uri));
java.io.ByteArrayOutputStream bout = new ByteArrayOutputStream();
byte buf[] = new byte[1024 * 1024];
while( fsin.read(buf, 0, buf.length) >= 0 ) {
bout.write(buf);
}
context.write(value, new BytesWritable(bout.toByteArray()));
} finally {
IOUtils.closeStream(fsin);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "BinFilesToHadoopSeqFile");
job.setJarByClass(BinFilesToHadoopSeqFile.class);
job.setMapperClass(BinFilesToHadoopSeqFileMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Main Method

In the main method, we define sequence format class which takes the key and values from mapper and stores them as a sequence file with all the pairs of keys and values in hdfs.
Output :
The above class key output will be Path of the image and the value output will be entire file consisting of all images.
From the above program, we can convert a Binary image file into sequence file format.
Next, we can refer below Map Reduce program to remove redundant files from the above sequence file as an input and to store the names of unique image files in the output directory.

ImgDupMapper

The Mapper code will read the binary image data of all the image files and will create MD5 string ( code ) for each file. It will pass this data to the reducer where the key will be the MD5 string while the value will be the filename. Thus, all the identical images will be grouped together by the Hadoop framework.

import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ImgDupMapper extends Mapper<Text, BytesWritable, Text, Text>{
public void map(Text key, BytesWritable value, Context context) throws IOException,InterruptedException {

Get the md5 for this specific file

String md5St;
try {
md5St = calculateMd5(value.getBytes());
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
context.setStatus("Internal error - can't find the algorithm for calculating the md5");
return;
}
Text md5txt = new Text(md5St);

Put the file on the map where the md5 is the key, so duplicates will be grouped together for the reduce function

context.write(md5txt, key);
}
static String calculateMd5(byte[] imageData) throws NoSuchAlgorithmException {

Get the md5 for this specific data

MessageDigest md = MessageDigest.getInstance("MD5");
md.update(imageData);
byte[] hash = md.digest();

Below code converts Byte Array to hex

String hexStr = new String();
for (int i=0; i < hash.length; i++) {
hexStr += Integer.toString( ( hash[i] 0xff ) + 0x100, 16).substring( 1 );
}
return hexStr;
}
}

Hadoop

ImgDupReducer

In the reducer class, we will only take the first filename for each MD5 hash. In that way there will be only a single filename for each identical image will be selected and all the redundant Image file names will be filtered out. The output will be a map file where the key will be the filename and the value will be the MD5 hash.

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ImgDupReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

The key here is the md5 hash while the values are all the image files that are associated with it. for each md5 value, we need to take only one file (the first)

Text imgFilePath = null;
for (Text filePath : values) {
imgFilePath = filePath;
break;//only the first one
}

In the result file, the key will be again the image file path.

context.write(imgFilePath, key);
}
}

ImgDriver

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ImgDriver extends Configured implements Tool
{
public int run(String[] args) throws Exception
{

Get the configuration object and set the job name

 Configuration conf = getConf();
Job job = new Job(conf, "Eliminate redundant files");

Set the class names

job.setJarByClass(ImgDriver.class);
job.setMapperClass(ImgDuplicatesMapper.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setReducerClass(ImgDupReducer.class);

Set the output data type classes  

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Accept the hdfs input and output dir at run time

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
 return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ImageDriver(), args);
System.exit(res);
}
}

Execution Procedure:

We have sample image directory in our HDFS path where it contains redundant images with different names. Now by using the above MapReduce program jar files we will see how we can remove duplicate images.
Hadoop -ls /sample_images 

Expected Output:


Now, create a new file and copy paste the path of the image files where it is saved in the HDFS directory. 
Hadoop -put sequence_input_new /

Run the Binary to Sequence jar file by using input file sequence_input_new directory as input and the result should be stored in convert_to_seq_file directory output directory.
Hadoop jar ‘/home/acadgild/Desktop/BtoS.jar’ /sequence_input_new /convert_to_seq_file
List the convert_to_seq_file directory to display the result part file.
Hadoop fs -ls /convert_to_seq_file

Display the result part file using cat command.
Hadoop fs -cat /convert_to_seq_file/part-r-00000

Now, use output directory convert_to_seq_file as input and run the duplicate remover jar (Bdr.jar) by using convert_toseq_file as input and the result will be stored in distinct_image_dir.
Hadoop jar ‘/home/acadgild/Desktop/Bdr.jar’ /convert_toseq_file /distinct_image_dir
List the distinct_image_dir directory
Hadoop fs -ls /distinct_image_dir

Display the result part file using cat command
Hadoop  fs -cat /distinct_image_dir/part-r-00000

Thus, we can observe from the above steps we have successfully eliminated redundant files and stored the single image file names in our output directory.
Keep visiting our website Acadgild for more updates on Big Data and other technologies.
Hadoop

2 Comments

  1. Hello Manjunathan,
    Thank you for sharing the code with us. I was trying to build the code , and while doing it, I found that you are trying to use chain Mapper, and in the 2nd mapper class you have used ” ” while creating a class, initially I thought that it was a mistake, but later on under the reducer class Is saw that you have used them multiple time. Kindly help me understand it, as I am not able to complete my code.
    Also, regarding “FileSystem fsys = FileSystem.get(URI.create(uri), conf);”, I am coding on my windows machine’s Eclipse, so how shoudl I set the path here, so that, once I execute the .jar, this object should take the path from my linux machine.
    Sorry, if that is a dumb question. But I hope you would help me.
    Waiting for your reply

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close