Just imagine if your data was aggregated in one single place, it’d get much easier to sift through data that has already been categorized, right?
In this blog, we will show you how to aggregate data related to a particular column using Hadoop. If we talk in terms of big data, we will be converting Unstructured to Structured Data.
First up, let’s understand what unstructured and structured data means: Any dataset which is not organized in a defined manner is termed as unstructured data.
Now, I’ll be using some dummy data as the input file in this demo. This unstructured data file will be processed and converted into structured data as the output. The data used may seem very small, but when working with Hadoop, trillions and zillions of bytes of data can easily be structured similarly, as demonstrated in the blog below.
You can copy the dataset below for practice.
File description:
Here, the unstructured data is taken as 2 columns.
The 1st column is the keyword trending in the Big Data ecosystem.
The 2nd column is some information given on the respective keyword.
Columns are separated by TAB(\t). The text in the 2nd column is separated by white spaces. A new line/record starts after ( \n ) / ENTER is found .
DATASET:
bigdata big data is a term for information sets that are so massive or complicated that traditional processing applications are inadequate to cope with them. hadoop is an open-source computer code framework used for distributed storage and process of very massive data sets. pig is a high-level platform for making programs that run on Apache Hadoop. The language for this platform is termed Pig Latin. bigdata Challenges embrace analysis, capture, information curation, search, sharing, storage, transfer, visualization, querying, updating and data privacy. hive Apache Hive is a information warehouse infrastructure engineered on top of Hadoop. hadoop It consists of computer clusters engineered from commodity hardware. hive Hive provides the required SQL abstraction to integrate SQL-like Queries (HiveQL) into the underlying Java API without the requirement to implement queries within the low-level Java API. pig Pig Latin is extended using pig Pig Latin is extended using User defined Functions (UDFs) that the user will write in Java, Python, JavaScript, Ruby or Groovy. hadoop it's scalable. bigdata The term usually refers simply to the utilization of predictive analytics, user behavior analytics, or certain different advanced data analytics strategies that extract value from data, and seldom to a particular size of data set. big data there's very little doubt that the quantities of data currently out there are so massive, big data there's very little doubt that the quantities of data currently out there are so massive, bigdata The term usually refers simply to the utilization of predictive analytics, user behavior analytics, or certain different advanced data analytics strategies that extract value from data, and seldom to a particular size of data set. big data there's very little doubt that the quantities of data currently out there are so massive, however that’s not the most relevant characteristic of this new data system.
Java Map-Reduce Code Explained
MAP Code:
public class Unstructured { public static class TopTenMapper extends Mapper<LongWritable, Text, Text, Text> { List<String> col1= new ArrayList<String>(); List <String> col2 = new ArrayList<String>(); public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String line = value.toString(); if(line != null && line.length() > 0) { if(line.contains("bigdata")||line.contains("hadoop")||line.contains("pig")||line.contains("hive")){ String [] parts = line.split("\t"); col1.add(parts[0]); col2.add(parts[1]); } } } protected void cleanup(Context context) throws IOException, InterruptedException { for(int i = 0; i < col0.size() && i < col1.size(); i++) { context.write(new Text(col0.get(i)),new Text(col1.get(i))); } } }
 Explanation of Map Code:
5,6:Â Â Used for creating an Arraylist for every column in the record of the dataset
7 : Helps create a function named, map, which takes input as a long writeable for key & text for value, it also gives an output to the reducer as context.
9: Data taken as input by map function is converted into string type and stored in a variable called line.
10: The loop runs until the data is finished in the dataset.
12 It filters the dataset for each record.Checks every record specifically for matching characters in the beginning. If the string matches, then lines 13, 14, and 15 execute. And discards other garbage values.
13 In this string, TAB(\t) is looked for.
14Â the values before TAB is saved in arraylist index 0 as col0
15Â The values after TAB are saved in arraylist, index 0 as col1.
20Â A function is named cleanup to write data in a columnar format.
22Â The loop executes until the array list holds value.
24Â Â Writes the array list elements into context and sends it to the reducer.
REDUCE CODE:
public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{ try{ String names=new String(); for(Text val : value){ names+=val.toString()+"\n\t"; } context.write(key,new Text(names)); }catch(Exception e){ e.printStackTrace(); } } }
Explanation of Reduce Code:
1,3 The reduce function is defined to do extra aggregation and summation to the output for mapper and for giving the final output to the MapReduce.
5Â We are using the try and catch loop for every new record made by the map function and decide what to do further inside the loop.
7 An Object variable name is created.
9Â The For Loop is executed until the record is read from value and forwarded to execute line 11.
11Â Each record is converted to string from the text. And at the end of each record, a new line and a tab value is inserted.
14Â Context will write data back to the driver class. This will be the main output. Hence, again the string values are converted to text.
15,16Â If any error takes place in the process, catch will take the process, and through an error message using inbuilt exception library.
DRIVER CODE
public static void main(String[] args) throws Exception { Configuration conf= new Configuration(); Job job = new Job(conf,"UNSTUCTURED"); job.setJarByClass(Unstructured.class); job.setMapperClass(TopTenMapper.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
The explanation for Driver Class:
5 Helps declare class name for every mapper to run inside Hadoop
7 Helps declare the mapper name as TopTenMapper
8 Helps declare the mapper name as Reduce
9-12Â Classes are included in the main class, whereas, to clarify the Output key type and the output value type of the Map function and Mapper class, respectively.
13,14Â Classes are set to define the initial input and final output for the program to read and write to the HDFS.
16Â Helps create an object to take the 2nd argument in command, as the output path for program
18 Declaring to take the 1st argument in command, as the input path for program
19  Declaring to use an object created in line16 to get the final output file in output path.
21 Deleting the output path automatically from HDFS so that we don’t have to delete it explicitly
23Â Exiting the job only if the flag value becomes false
How to execute map-Reduce code
Before running the jar file load the data in HDFS.
Syntax:
hadoop dfs -put  <file path in local to be put inside HDFS> <path in HDFS where file is to be written>
DEPRECATED: Use of this script to execute hdfs command is deprecated.Instead use the hdfs command for it.
17/01/15 01:54:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Check for the Java code inside Eclipse for no errors and make a JAR file(unstruc.jar) of it.
Run hadoop with below code:
Syntax:Â hadoop jar <path to jar file> <path to input file in HDFS> <path to output file in HDFS>
hadoop jar unstruc.jar /user/prateek/inp/unstructured /user/prateek/out
[[email protected] ~]$ hadoop jar unstruc.jar /user/prateek/inp/unstructured /user/prateek/out 17/01/15 01:54:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable 17/01/15 01:54:22 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 17/01/15 01:54:23 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/01/15 01:54:23 INFO input.FileInputFormat: Total input paths to process : 1 17/01/15 01:54:24 INFO mapreduce.JobSubmitter: number of splits:1 17/01/15 01:54:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1484151679915_0019 17/01/15 01:54:24 INFO impl.YarnClientImpl: Submitted application application_1484151679915_0019 17/01/15 01:54:24 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1484151679915_0019/ 17/01/15 01:54:24 INFO mapreduce.Job: Running job: job_1484151679915_0019 17/01/15 01:54:35 INFO mapreduce.Job: Job job_1484151679915_0019 running in uber mode : false 17/01/15 01:54:35 INFO mapreduce.Job:  map 0% reduce 0% 17/01/15 01:54:40 INFO mapreduce.Job:  map 100% reduce 0% 17/01/15 01:54:48 INFO mapreduce.Job:  map 100% reduce 100% 17/01/15 01:54:48 INFO mapreduce.Job: Job job_1484151679915_0019 completed successfully 17/01/15 01:54:48 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1537 FILE: Number of bytes written=215845 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1613 HDFS: Number of bytes written=2540 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=3710 Total time spent by all reduces in occupied slots (ms)=4900 Total time spent by all map tasks (ms)=3710 Total time spent by all reduce tasks (ms)=4900 Total vcore-seconds taken by all map tasks=3710 Total vcore-seconds taken by all reduce tasks=4900 Total megabyte-seconds taken by all map tasks=3799040 Total megabyte-seconds taken by all reduce tasks=5017600 Map-Reduce Framework Map input records=11 Map output records=11 Map output bytes=1503 Map output materialized bytes=1537 Input split bytes=116 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=1537 Reduce input records=11 Reduce output records=4 Spilled Records=22 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=120 CPU time spent (ms)=1210 Physical memory (bytes) snapshot=294400000 Virtual memory (bytes) snapshot=4113739776 Total committed heap usage (bytes)=165810176 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=1497 File Output Format Counters Bytes Written=254 |
The above code will write the output file in HDFS.
For convenient reading, you can get the file from HDFS to a local location and see that the unstructured data has been now structured.
Syntax:
hadoop dfs -get <file path inside HDFS to get> <output path for the file to be saved>
DEPRECATED: Use of this script to execute hdfs command is deprecated.Instead use the hdfs command for it.
17/01/15 01:55:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
[[email protected] ~]$
OUTPUT FILE:
bigdata there's very little doubt that the quantities of data currently out there are so massive, however that’s not the most relevant characteristic of this new data system. The term usually refers simply to the utilization of predictive analytics, user behavior analytics, or certain different advanced data analytics strategies that extract value from data, and seldom to a particular size of data set. Challenges embrace analysis, capture, information curation, search, sharing, storage, transfer, visualization, querying, updating and data privacy. big data is a term for information sets that are so massive or complicated that traditional processing applications are inadequate to cope with them. hadoop it's scalable. It consists of computer clusters engineered from commodity hardware. cated that traditional processing applications are inadequate to cope with them. egies that extract value from data, and seldom to a particular size of data set. is an open-source computer code framework used for distributed storage and process of very massive data sets. hive Hive provides the required SQL abstraction to integrate SQL-like Queries (HiveQL) into the underlying Java API without the requirement to implement queries within the low-level Java API. Apache Hive is a information warehouse infrastructure engineered on top of Hadoop. pig Pig Latin is extended using User defined Functions (UDFs) that the user will write in Java, Python, JavaScript, Ruby or Groovy. is a high-level platform for making programs that run on Apache Hadoop. The language for this platform is termed Pig Latin.
If you want to practice same for larger dataset taken from a R8 router at a particular time with multiple column values, follow the link for dataset and data-dictionary.
Visit ACADGILD for more trending blogs.