Big Data Hadoop & Spark

Loading Files into Hdfs Using Flume's Spool Directory

In our previous blog Streaming Twitter Data Using Flume we knew about the basics for flume and how to use it for fetching data from twitter.
Let’s look another way to use this flume for fetching data from local file system to HDFS.

There are three sources for the above scenario:

Exec :- Exec source runs a given Unix command on start-up one time and expects that process to continuously produce data on standard output location with no regular interval. If the process exits for any reason, the source also exits and will produce no further data.
Example: PUT

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

Spool directory :- This source lets you insert data by placing files into a “spooling” directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. After a given file has been fully read into the channel, it is renamed to indicate completion (or optionally deleted).
Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability,uniquely-named files must be dropped into the spooling directory

Netcat :- This source listens on a given port and turns each line of text into an Flume event and sent it via the connected channel.

Let’s see advantages about Spool directory Flume

⦁ It will fetch all file in local file system acting like a  SOURCE and aggregating them as one single file inside HDFS.
⦁ It helps HDFS to work more efficiently, as it is good with operations on large files rather than more numbers of files.
⦁ Because of the aggregated file as mentioned in first point namenode need to keep less numbers of information as numbers of file will be less.

NOTE:- Hadoop Daemons should be running and can be confIrmed by “ jps ” command
(refer the image below )

Also Flume should be downloaded and updated in .bashrc file.

Step-by-step Demonstration : Data Streaming from Local File System to HDFS

All the steps are followed by images for better reference.
In this case we will be using spool directory as our source and HDFS as destination.
Here we will need another agent for flume as source of data here is fetched from Local File System instead from twitter.

Go to the below link and download the configuration file present which contains agent details.

https://drive.google.com/folderview?id=0B1QaXx7tpw3SU3Eyc0pDbmx6SDg&usp=sharing

Save the file and keep in your downloads directory.

We need to move the AcadgildLocal.conf file inside flume/conf directory.

We need to make two changes inside AcadgildLocal.conf as followed.

Hadoop
1)agent1.sources.source1_1.spoolDir is set with input path as in local file system path.
2)agent1.sinks.hdfs-sink1_1.hdfs.path is set with output path as in HDFS path.


Creating the folder as specified in AcadgildLocal.conf file will make our ”spooling “directory.

Also we need to make destination directory inside HDFS as mentioned in AcadgildLocal.conf.


We can now open another terminal and start flume agent by the following command
Command: flume-ng agent –n agent1 –f /home/hadoop/HADOOP/apache-flume-1.6.0-bin/conf/AcadgildLocal.conf

This will conform the agent is running and we can leave this terminal running at background.

For our dummies dataset we will be creating 3 different test file which will act as 3 log files created at different times from same web server.

The sample data inside the files contains list of webpage , sessionID, sessionIN, sessionOUT.

Hadoop

Our next step needs to place our test file inside spooling directory(source).
We will copy the files which we created in the previous steps.

One by one we will place all the test file inside flume_sink directory. Wait for a moment and you can see the filename change to ” filename. COMPLETED ”.

We can now check the resultant temporary file inside HDFS at destination path generated by Flume.
By the listing command we can find only one file inside HDFS /flume_sink.
Command: hadoop dfs -ls <destination_path>

We do cat to the temp file to see all the data aggregated inside one file.

For any queries regarding this blog please reply below in the comment section. Keep visiting our website Acadgild for more updates on Big Data and other technologies. Click here to learn Big Data Hadoop Development.

Hadoop

Related Popular Courses:

BIG DATA AND HADOOP

DATA SCIENCE ONLINE COURSE

DATA ANALYST COURSES

ANDROID DEVELOPER COURSE

BLOCKCHAIN TECHNOLOGY

prateek

An alumnus of the NIE-Institute Of Technology, Mysore, Prateek is an ardent Data Science enthusiast. He has been working at Acadgild as a Data Engineer for the past 3 years. He is a Subject-matter expert in the field of Big Data, Hadoop ecosystem, and Spark.

16 Comments

  1. Nice read , I have a question on the integrated tables. Basically i want to understand the below two scenarios
    1. What if i want to delete a record from HBase ? Can i trigger a HQL query in Hive(1.0 or higher) and get this done ?
    2. Does this support transaction ? if i am inserting 100 records, all 100 should go in if successful or 0 in case of failure

    1. Hi Arjun,
      If you want to delete the record from hbase through hive storage handler , then hive table should support transactions.
      But Hive transactions have very limited support , we can perform transactions only if
      1. Table is stored in ORC file format
      2. Table is Bucketed at least one column
      For more details on hive transaction support refer the below blog
      https://acadgild.com/blog/transactions-in-hive/

  2. when i run the agent its shows the below error
    A fatal error occurred while running. Exception follows.
    org.apache.commons.cli.MissingOptionException: Missing required option: n
    at org.apache.commons.cli.Parser.checkRequiredOptions(Parser.java:299)
    at org.apache.commons.cli.Parser.parse(Parser.java:231)
    at org.apache.commons.cli.Parser.parse(Parser.java:85)
    at org.apache.flume.node.Application.main(Application.java:265

  3. Thanks for sharing this informative article. But there is a problem with the googledrive link. When I open it an warning pops up like ‘no file in directory’. Could you please check the file?
    Kind regards

  4. Hi Prateek,
    Thank you.
    your 3 flume articles very helpful. I got an overview of the Flume and some hands-on experience.
    What are interceptors in flume? can you give me some idea.

  5. Hi Prateek,
    after succesfully getting this message
    16/12/31 04:04:07 INFO node.Application: Starting Sink hdfs_sink
    16/12/31 04:04:07 INFO node.Application: Starting Source local
    16/12/31 04:04:07 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/cloudera/Desktop/flume_sink2
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean.
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: local: Successfully registered new MBean.
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: local started
    when i copy logfile (say test1.txt) to folder flume_sink2 it creates below error
    java.io.FileNotFoundException: /home/cloudera/Desktop/flume_sink2/.flumespool/.flumespool-main.meta (Permission denied)
    i think my user cloudera need to have all flume services permission .How to do this .??
    Also when i do with user root
    i gets below error
    16/12/31 04:02:30 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    java.lang.IllegalStateException: Channel closed [channel=fileChannel]. Due to java.io.IOException: File /root/.flume/file-channel/data/log-1 has bad version 7f000000

  6. Hi Prateek
    after succesfully getting this message
    16/12/31 04:04:07 INFO node.Application: Starting Sink hdfs_sink
    16/12/31 04:04:07 INFO node.Application: Starting Source local
    16/12/31 04:04:07 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/cloudera/Desktop/flume_sink2
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs_sink: Successfully registered new MBean.
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs_sink started
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: local: Successfully registered new MBean.
    16/12/31 04:04:07 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: local started
    when i copy logfile (say test1.txt) to folder flume_sink2 it creates below error
    java.io.FileNotFoundException: /home/cloudera/Desktop/flume_sink2/.flumespool/.flumespool-main.meta (Permission denied)
    perhaps my user cloudera need to have all flume services permission .How to do this .??
    Also when i do with user root
    i gets below error
    16/12/31 04:02:30 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    java.lang.IllegalStateException: Channel closed [channel=fileChannel]. Due to java.io.IOException: File /root/.flume/file-channel/data/log-1 has bad version 7f000000

  7. Hi Prateek
    Thanks for the post. Its really good.
    My Question is, I need to transfer files from spool directory to HDFS. Am provided with dataset of size 4gb and when i try to move into HDFS, the files written in HDFS is very small (1mb or 2mb) results in large number of files. I need to write files with larger size ( say 100 to 128mb ), I had attached my conf file can yu help me in achieving this
    agent1.sources = source1 source2
    agent1.channels = channel1 channel2
    agent1.sinks = sink1 sink2
    agent1.sources.source1.type = spooldir
    agent1.sources.source1.spoolDir = /home/datasets/US Hotel Offerings
    agent1.sources.source2.type = spooldir
    agent1.sources.source2.spoolDir = /home/datasets/US Museum Review -Trip Advisor
    agent1.sinks.sink1.type = hdfs
    agent1.sinks.sink1.hdfs.path = hdfs://emr-header-1:9000/user/ogs/direct/US_Hotel_Offerings
    agent1.sinks.sink1.hdfs.filePrefix = H_Offerings
    agent1.sinks.sink1.hdfs.fileSuffix = .avro
    agent1.sinks.sink1.hdfs.serializer = avro_event
    agent1.sinks.sink1.hdfs.serializer = DataStream
    agent1.sinks.sink1.hdfs.callTimeout = 2000000
    agent1.sinks.sink1.hdfs.rollCount = 0
    agent1.sinks.sink1.hdfs.rollInterval = 0
    #agent1.sinks.sink1.serializer.codeC =
    agent1.sinks.sink2.type = hdfs
    agent1.sinks.sink2.hdfs.path = hdfs://emr-header-1:9000/user/ogs/direct/US_Museum_Reviews
    agent1.sinks.sink2.hdfs.filePrefix = M_Reviews
    agent1.sinks.sink2.hdfs.fileSuffix = .avro
    agent1.sinks.sink2.hdfs.serializer = avro_event
    agent1.sinks.sink2.hdfs.serializer = DataStream
    agent1.sinks.sink2.hdfs.callTimeout = 2000000
    agent1.sinks.sink2.hdfs.rollCount = 0
    agent1.sinks.sink2.hdfs.rollInterval = 0
    #agent1.sinks.sink2.serializer.codeC =
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 1000000
    agent1.channels.channel1.transactionCapacity = 1000000
    agent1.channels.channel2.type = memory
    agent1.channels.channel2.capacity = 1000000
    agent1.channels.channel2.transactionCapacity = 1000000
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1
    agent1.sources.source2.channels = channel2
    agent1.sources.sink2.channel = channel2
    2. Next Question is, What type of channel is more preferable when the size of source file is said to be larger. ( Memory or File )

  8. Hi, I could not upload the files from spooldir directory. Successfully changes the status to COMPLETED but not reflected in HDFS.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close