Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • MapReduce Program: Removing Redundant Image Files

MapReduce Program: Removing Redundant Image Files

 July 14  | 0 Comments

As the amount of data is getting increased day by day it is better to maintain less or no redundant data to save the cluster storage size and to increase faster data processing to get the required results in lesser time. So today, we will be discussing how we can eliminate redundant files from an Hadoop cluster using MapReduce Program.
We have a directory of image files where we have stored image replicas with different names. Thus, we need to find out and eliminate the redundant images in and hdfs path.
Here, we need to write two programs to perform this task:

  1. We need to convert image files to Hadoop sequence file and
  2. MapReduce redundant image remover program to remove the duplicate images from the given path.

As Hadoop MapReduce framework was designed to store and process large files,  we are using Sequence file format to convert all the image files (small files) into one single large file of binary file type for processing in the MapReduce computation.
A Sequence file is a flat file where the key value pairs of the output file will be in the form of a binary data type.
Once we get the Sequence file with all the binary data, we can use MD5 algorithm to generate a unique key for each image and compare this key in order to find the redundant files.
So, let us begin with  our coding part:

Convert Image  Files  To Hadoop Sequence File

We can refer below program to convert small binary files ( images ) into one sequence file of decent size.

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.*;
public class BinFilesToHadoopSeqFile {
public static class BinFilesToHadoopSeqFileMapper extends Mapper<Object, Text, Text, BytesWritable> {

Method Map

The Mapper implementation, TextinputFormat takes the offset value as key and entire path of the image line by line as value.
Entire value is read in the buffer and written back to bytearray output stream and emits the path of the images as key and the entire file contained in bytearray output stream as value.

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String uri = value.toString();
Configuration conf = new Configuration();
FileSystem fsys = FileSystem.get(URI.create(uri), conf);
FSDataInputStream fsin = null;
try {
fsin = fsys.open(new Path(uri));
java.io.ByteArrayOutputStream bout = new ByteArrayOutputStream();
byte buf[] = new byte[1024 * 1024];
while( fsin.read(buf, 0, buf.length) >= 0 ) {
bout.write(buf);
}
context.write(value, new BytesWritable(bout.toByteArray()));
} finally {
IOUtils.closeStream(fsin);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "BinFilesToHadoopSeqFile");
job.setJarByClass(BinFilesToHadoopSeqFile.class);
job.setMapperClass(BinFilesToHadoopSeqFileMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Main Method

In the main method, we define sequence format class which takes the key and values from mapper and stores them as a sequence file with all the pairs of keys and values in hdfs.
Output :
The above class key output will be Path of the image and the value output will be entire file consisting of all images.
From the above program, we can convert a Binary image file into sequence file format.
Next, we can refer below Map Reduce program to remove redundant files from the above sequence file as an input and to store the names of unique image files in the output directory.

ImgDupMapper

The Mapper code will read the binary image data of all the image files and will create MD5 string ( code ) for each file. It will pass this data to the reducer where the key will be the MD5 string while the value will be the filename. Thus, all the identical images will be grouped together by the Hadoop framework.

import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class ImgDupMapper extends Mapper<Text, BytesWritable, Text, Text>{
public void map(Text key, BytesWritable value, Context context) throws IOException,InterruptedException {

Get the md5 for this specific file

String md5St;
try {
md5St = calculateMd5(value.getBytes());
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
context.setStatus("Internal error - can't find the algorithm for calculating the md5");
return;
}
Text md5txt = new Text(md5St);

Put the file on the map where the md5 is the key, so duplicates will be grouped together for the reduce function

context.write(md5txt, key);
}
static String calculateMd5(byte[] imageData) throws NoSuchAlgorithmException {

Get the md5 for this specific data

MessageDigest md = MessageDigest.getInstance("MD5");
md.update(imageData);
byte[] hash = md.digest();

Below code converts Byte Array to hex

String hexStr = new String();
for (int i=0; i < hash.length; i++) {
hexStr += Integer.toString( ( hash[i] 0xff ) + 0x100, 16).substring( 1 );
}
return hexStr;
}
}

Hadoop

ImgDupReducer

In the reducer class, we will only take the first filename for each MD5 hash. In that way there will be only a single filename for each identical image will be selected and all the redundant Image file names will be filtered out. The output will be a map file where the key will be the filename and the value will be the MD5 hash.

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ImgDupReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

The key here is the md5 hash while the values are all the image files that are associated with it. for each md5 value, we need to take only one file (the first)

Text imgFilePath = null;
for (Text filePath : values) {
imgFilePath = filePath;
break;//only the first one
}

In the result file, the key will be again the image file path.

context.write(imgFilePath, key);
}
}

ImgDriver

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ImgDriver extends Configured implements Tool
{
public int run(String[] args) throws Exception
{

Get the configuration object and set the job name

 Configuration conf = getConf();
Job job = new Job(conf, "Eliminate redundant files");

Set the class names

job.setJarByClass(ImgDriver.class);
job.setMapperClass(ImgDuplicatesMapper.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setReducerClass(ImgDupReducer.class);

Set the output data type classes

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Accept the hdfs input and output dir at run time

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
 return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ImageDriver(), args);
System.exit(res);
}
}

Execution Procedure:

We have sample image directory in our HDFS path where it contains redundant images with different names. Now by using the above MapReduce program jar files we will see how we can remove duplicate images.
Hadoop -ls /sample_images 

>