A web browser generates tons of data every day. All of which may not be stored in a structured format. Without structured dataset, no analysis team can start their work. The solution is to preprocess the data and convert the format into the structured format.
The blog, reveals how all the columns for a record which were earlier dispersed in several rows gets stored in one row.
Find the structure of file we saved inside HDFS before execution.
The Dataset is a text format which contains details for Employee for a company.The details are repeated as to mention respective details about Employee. Such as EmpID, EmpName, EmpSalary, etc.
Refer the below screenshot 1.1 on which processing will be done and we will end up getting our data stored in the format as shown in the screenshot 1.2.Screenshot 1.2
Writing a Java MapReduce program will be our approach. All the steps are explained along with the execution steps.
Let us start with getting our dataset inside HDFS.You may find the dataset link below.
Dataset
Command to put local file into HDFS.Refer screenshots below.
hadoop dfs -put /home/acadgild/row  /user/prateek/inp
Below are the parts of code explained. As we now there are three parts of a MapReduce Code
- MAPPER
- REDUCER
- DRIVER
Let us understand the Mapper class working.
Mapper Code:
public class UnstrcturedRowToStructuredColumn { public static class AllFiveMapper extends Mapper<Object, Text, NullWritable, Text> { List<String> EmpID=new ArrayList<String>(); List<String> EmpName=new ArrayList<String>(); List<String> EmpSalary=new ArrayList<String>(); List<String> Designation=new ArrayList<String>(); List<String> Project_description=new ArrayList<String>(); public void map(Object key, Text value, Context context)throws IOException, InterruptedException { String line=value.toString(); if(line.length()>0){ if(line.contains("EmpID")||line.contains("EmpName")||line.contains("EmpSalary")||line.contains("Designation")||line.contains("Project_description") ){ String [] parts=line.split(":"); if(parts[0].contains("EmpID")){ EmpID.add(parts[1]); } else if(parts[0].contains("EmpName")){ EmpName.add(parts[1]); } else if(parts[0].contains("EmpSalary")){ EmpSalary.add(parts[1]); } else if(parts[0].contains("Designation")){ Designation.add(parts[1]); } else if(parts[0].contains("Project_description")){ Project_description.add(parts[1]); } } } } protected void cleanup(Context context) throws IOException,InterruptedException { String patern="EmpID"+"\t"+"EmpName"+"\t"+"EmpSalary"+"\t"+"Designation"+"\t"+"Project_description"; context.write(NullWritable.get(), new Text(patern)); String data=null; for (int i=0;(i<EmpID.size()&&i<EmpName.size());i++) { data=EmpID.get(i).toString()+"\t"+EmpName.get(i).toString()+"\t"+EmpSalary.get(i).toString()+"\t"+Designation.get(i).toString()+"\t"+Project_description.get(i).toString(); if(data.length()>0) context.write(NullWritable.get(),new Text( data)); } } }
Explanation of Mapper Code:
3-7 Creating an Array list for every column in the record of the dataset. Later these will store all the text as per index respective of the keyword.
8 This map method takes each line or record as input for a dataset. As in the dataset, each new line presents a column to be made after execution. And perform 9-34 line operation.
9 Convert all the text value to a string so we can perform operations on the data easily.
10 Check if there is no null value coming.
12 Check if all the values contain Employ data, if not consider it as garbage value for this MapReduce operation and discard it.
13 Split each record with colons.and separate the values before and after colon by next nested if condition.
15-33 We have put nested if conditions to check the column and add the data of that column into that particular Array list.
34 Due to this Cleanup method will hold all the map task output.Before writing the TopFivemapper task output into the context, we are arranging the data in a particular structured format as per the columns we want in the final record in line 36-44.
35 Here we have defined our record pattern.
36 The record pattern is written to the output file as column names to the following values.
39 This for loop will put all the data of the split which has been processed by the mapper in a particular format. Finally, all the unstructured data is processed and written to the context as structured data.
41 A variable holds all the columnar value temporarily until loop ends and it is written to the output file in line 43.
42 Checks if the data is not null.
43 As we don’t want to pass anything as key, hence we have kept it NullWritable.
Let us See the driver part of the code where we have done major changes in simple Driver class
Driver class:
job.setNumReduceTasks(0); outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1);
Explanation of changed code in Driver Class
1 Because setting reduces task to 0(zero).
3 Deleting the output path automatically from hdfs so that we don’t have deleted it explicitly. And,
5 Exiting the job only if the flag value becomes false
To practice the same you can download the complete code from below link.
Here is the link to Raw code: Link
Let us see the execution inside Hadoop.
Running/execution of code
[acadgild@localhost ~]$ hadoop jar RowToCol.jar /user/prateek/inp/row /user/prateek/out 17/01/26 06:35:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/01/26 06:35:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 17/01/26 06:35:13 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/01/26 06:35:14 INFO input.FileInputFormat: Total input paths to process : 1 17/01/26 06:35:14 INFO mapreduce.JobSubmitter: number of splits:1 17/01/26 06:35:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1485350378847_0001 17/01/26 06:35:15 INFO impl.YarnClientImpl: Submitted application application_1485350378847_0001 17/01/26 06:35:15 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1485350378847_0001/ 17/01/26 06:35:15 INFO mapreduce.Job: Running job: job_1485350378847_0001 17/01/26 06:35:37 INFO mapreduce.Job: Job job_1485350378847_0001 running in uber mode : false 17/01/26 06:35:37 INFO mapreduce.Job: map 0% reduce 0% 17/01/26 06:35:54 INFO mapreduce.Job: map 100% reduce 0% 17/01/26 06:35:57 INFO mapreduce.Job: Job job_1485350378847_0001 completed successfully 17/01/26 06:35:57 INFO mapreduce.Job: Counters: 30 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=105800 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=695 HDFS: Number of bytes written=417 HDFS: Number of read operations=5 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=16305 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=16305 Total vcore-seconds taken by all map tasks=16305 Total megabyte-seconds taken by all map tasks=16696320 Map-Reduce Framework Map input records=23 Map output records=5 Input split bytes=107 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=78 CPU time spent (ms)=640 Physical memory (bytes) snapshot=90243072 Virtual memory (bytes) snapshot=2055548928 Total committed heap usage (bytes)=30474240 File Input Format Counters Bytes Read=588 File Output Format Counters Bytes Written=417
Finally, once the code executes successfully and we find the counters reflect some bytes written. We are sure the output is ready.
Checking Output:
We find the output here is column-wise arranged finally. As a result this is changed the format as we started.
Now make this file for input to run any query to find you result optimized.
[acadgild@localhost ~]$ hadoop dfs -cat /user/prateek/out/par* DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 17/01/26 06:36:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable EmpID EmpName EmpSalary Designation Project_description 2jv0fs1hj7 Onkar Singh 45L System Admininsrator Eccommerce-website, Database and Networking Administrtor. 2mw0ld1nr7 Prateek kumar 88L System Designer Flow of the system is to be Designed. 2hw0co1re7 Kiran Innamuri 93 System Developer Complete code with production manager. 2vp0os1mt7 Manjunath N 69L System Owner Investment and live report with profit counter. [acadgild@localhost ~]$
Hope you understand the concept to convert a row text into columnar format. Also, use of NullWriteable and many other bytes of code.
More blog are there to start your basic understanding of Big Data and Ecosystem on ACADGILD.