Big Data Hadoop & Spark

Solving the Unstructured Data Dilemma

 A web browser generates tons of data every day. All of which may not be stored in a structured format. Without structured dataset, no analysis team can start their work. The solution is to preprocess the data and convert the format into the structured format.
The blog, reveals how all the columns for a record which were earlier dispersed in several rows gets stored in one row.
Find the structure of file we saved inside HDFS before execution.
The Dataset is a text format which contains details for Employee for a company.The details are repeated as to mention respective details about Employee. Such as EmpID, EmpName, EmpSalary, etc.
Refer the below screenshot 1.1 on which processing will be done and we will end up getting our data stored in the format as shown in the screenshot 1.2.

Screenshot 1.1

Screenshot 1.2
Writing a Java MapReduce program will be our approach. All the steps are explained along with the execution steps.
Let us start with getting our dataset inside HDFS.You may find the dataset link below.
Dataset
Command to put local file into HDFS.Refer screenshots below.
hadoop dfs -put /home/acadgild/row  /user/prateek/inp

Below are the parts of code explained. As we now there are three parts of a MapReduce Code

  • MAPPER
  • REDUCER
  • DRIVER

Let us understand the Mapper class working.

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

Mapper Code:

 

        public class UnstrcturedRowToStructuredColumn {
	public static class AllFiveMapper extends Mapper<Object, Text, NullWritable, Text> {
	List<String> EmpID=new ArrayList<String>();
	List<String> EmpName=new ArrayList<String>();
	List<String> EmpSalary=new ArrayList<String>();
	List<String> Designation=new ArrayList<String>();
	List<String> Project_description=new ArrayList<String>();
	public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
	String line=value.toString();
	if(line.length()>0){
        if(line.contains("EmpID")||line.contains("EmpName")||line.contains("EmpSalary")||line.contains("Designation")||line.contains("Project_description") ){
        String [] parts=line.split(":");
		if(parts[0].contains("EmpID")){
				EmpID.add(parts[1]);
                                  			}
					else if(parts[0].contains("EmpName")){
						EmpName.add(parts[1]);
					}
					else if(parts[0].contains("EmpSalary")){
						EmpSalary.add(parts[1]);
					}
					else if(parts[0].contains("Designation")){
						Designation.add(parts[1]);
					}
					else if(parts[0].contains("Project_description")){
						Project_description.add(parts[1]);
					}
			                                                   }
			              }
			}
	protected void cleanup(Context context) throws IOException,InterruptedException {
		String patern="EmpID"+"\t"+"EmpName"+"\t"+"EmpSalary"+"\t"+"Designation"+"\t"+"Project_description";
		context.write(NullWritable.get(), new Text(patern));
		String data=null;
        for (int i=0;(i<EmpID.size()&&i<EmpName.size());i++)
         {
	data=EmpID.get(i).toString()+"\t"+EmpName.get(i).toString()+"\t"+EmpSalary.get(i).toString()+"\t"+Designation.get(i).toString()+"\t"+Project_description.get(i).toString();
		 if(data.length()>0)
		 context.write(NullWritable.get(),new Text( data));
	}	}	}

Explanation of Mapper Code:

3-7 Creating an Array list for every column in the record of the dataset. Later these will store all the text as per index respective of the keyword.
8 This map method takes each line or record as input for a dataset. As in the dataset, each new line presents a column to be made after execution. And perform 9-34 line operation.
9  Convert all the text value to a string so we can perform operations on the data easily.
10  Check if there is no null value coming.
12  Check if all the values contain Employ data, if not consider it as garbage value for this MapReduce operation and discard it.
13  Split each record with colons.and separate the values before and after colon by next nested if condition.
15-33  We have put nested if conditions to check the column and add the data of that column into that particular Array list.

34  Due to this Cleanup method will hold all the map task output.Before writing the TopFivemapper task output into the context, we are arranging the data in a particular structured format as per the columns we want in the final record in line 36-44.

35  Here we have defined our record pattern.
36  The record pattern is written to the output file as column names to the following values.
39 This for loop will put all the data of the split which has been processed by the mapper in a particular format. Finally, all the unstructured data is processed and written to the context as structured data.
41  A variable holds all the columnar value temporarily until loop ends and it is written to the output file in line 43.
42  Checks if the data is not null.
43  As we don’t want to pass anything as key, hence we have kept it NullWritable.
Let us See the driver part of the code where we have done major changes in simple Driver class

Driver class:

                job.setNumReduceTasks(0);
		outputPath.getFileSystem(conf).delete(outputPath);
		System.exit(job.waitForCompletion(true) ? 0 : 1);

Explanation of changed code in Driver Class

1 Because setting reduces task to 0(zero).
3 Deleting the output path automatically from hdfs so that we don’t have deleted it explicitly. And,
5 Exiting the job only if the flag value becomes false
To practice the same you can download the complete code from below link.
Here is the link to Raw code: Link
Let us see the execution inside Hadoop.

Running/execution of code

[[email protected] ~]$ hadoop jar RowToCol.jar /user/prateek/inp/row  /user/prateek/out
17/01/26 06:35:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/01/26 06:35:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
17/01/26 06:35:13 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/01/26 06:35:14 INFO input.FileInputFormat: Total input paths to process : 1
17/01/26 06:35:14 INFO mapreduce.JobSubmitter: number of splits:1
17/01/26 06:35:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1485350378847_0001
17/01/26 06:35:15 INFO impl.YarnClientImpl: Submitted application application_1485350378847_0001
17/01/26 06:35:15 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1485350378847_0001/
17/01/26 06:35:15 INFO mapreduce.Job: Running job: job_1485350378847_0001
17/01/26 06:35:37 INFO mapreduce.Job: Job job_1485350378847_0001 running in uber mode : false
17/01/26 06:35:37 INFO mapreduce.Job:  map 0% reduce 0%
17/01/26 06:35:54 INFO mapreduce.Job:  map 100% reduce 0%
17/01/26 06:35:57 INFO mapreduce.Job: Job job_1485350378847_0001 completed successfully
17/01/26 06:35:57 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=105800
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=695
		HDFS: Number of bytes written=417
		HDFS: Number of read operations=5
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters
		Launched map tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=16305
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=16305
		Total vcore-seconds taken by all map tasks=16305
		Total megabyte-seconds taken by all map tasks=16696320
	Map-Reduce Framework
		Map input records=23
		Map output records=5
		Input split bytes=107
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=78
		CPU time spent (ms)=640
		Physical memory (bytes) snapshot=90243072
		Virtual memory (bytes) snapshot=2055548928
		Total committed heap usage (bytes)=30474240
	File Input Format Counters
		Bytes Read=588
	File Output Format Counters
		Bytes Written=417

Finally, once the code executes successfully and we find the counters reflect some bytes written. We are sure the output is ready.

Checking Output:

We find the output here is column-wise arranged finally. As a result this is changed the format as we started.
Now make this file for input to run any query to find you result optimized.

[[email protected] ~]$ hadoop dfs -cat  /user/prateek/out/par*
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
17/01/26 06:36:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
EmpID	EmpName	EmpSalary	Designation	Project_description
2jv0fs1hj7	Onkar Singh	45L	System Admininsrator 	Eccommerce-website, Database and Networking Administrtor.
2mw0ld1nr7	Prateek kumar	88L	System Designer	Flow of the system is to be Designed.
2hw0co1re7	Kiran Innamuri	93	System Developer	Complete code with production manager.
2vp0os1mt7	Manjunath N	69L	System Owner	Investment and live report with profit counter.
[[email protected] ~]$


Hope you understand the concept to convert a row text into columnar format. Also, use of NullWriteable and many other bytes of code.
More blog are there to start your basic understanding of Big Data and Ecosystem on ACADGILD.

Suggested Reading

HDFS Commands

prateek

An alumnus of the NIE-Institute Of Technology, Mysore, Prateek is an ardent Data Science enthusiast. He has been working at Acadgild as a Data Engineer for the past 3 years. He is a Subject-matter expert in the field of Big Data, Hadoop ecosystem, and Spark.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close