Big Data Hadoop & Spark

Streaming Twitter Data using Spark

In this post, we will be discussing how to stream Twitter data using Spark Streaming. Let’s begin with what Spark Streaming is.

Before going to spark streaming, we recommend our users to get some idea on Spark core and RDD’s.

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

Spark RDD’s in Scala part-1

Spark RDD’s in Scala part-2

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. In Spark Streaming, the data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, the processed data can be pushed out to the filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which is then processed by the Spark engine to generate the final stream of results in batches.

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from various sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.

Now, we will stream Twitter data using Spark. Spark has a package called Twitter.utils, which contains all the built-in functions to stream data from Twitter.

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
object TwitterData {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: TwitterData <ConsumerKey><ConsumerSecret><accessToken><accessTokenSecret>" +
        "[<filters>]")
      System.exit(1)
    }
    val appName = "TwitterData"
    val conf = new SparkConf()
    conf.setAppName(appName).setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)
    val cb = new ConfigurationBuilder
    cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
      .setOAuthConsumerSecret(consumerSecret)
      .setOAuthAccessToken(accessToken)
      .setOAuthAccessTokenSecret(accessTokenSecret)
    val auth = new OAuthAuthorization(cb.build)
    val tweets = TwitterUtils.createStream(ssc, Some(auth))
    val englishTweets = tweets.filter(_.getLang() == "en")
    englishTweets .saveAsTextFiles("tweets", "json")
    ssc.start()
    ssc.awaitTermination()
  }
}

Hadoop

First, we need to set the Spark streaming context as follows:

val ssc = new StreamingContext(conf, Seconds(5))

Streaming context takes two parameters; your application configuration and the streaming time. As Spark streams data in micro batches, we need to set some time so that for every set time (time_set), be it seconds or milliseconds, it will stream the data. Here, we have set 5 seconds, so for every 5 seconds, it will stream the data from Twitter and save it in a new file.

When we run the above program, we can see the tweets collected in the file tweets-id .json. As we all know that twitter needs 4 keys to authenticate a user i.e., Consumer key, Consumer Secret, Access Token, and Access Token Secret, all these are to be passed as arguments to the program.

Now, we have used the ConfigurationBuilder class to take the keys for Twitter authentication as follows:

val cb = new ConfigurationBuilder
    cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
      .setOAuthConsumerSecret(consumerSecret)
      .setOAuthAccessToken(accessToken)
      .setOAuthAccessTokenSecret(accessTokenSecret)

Now the authorization can be done as follows:

val auth = new OauthAuthorization(cb.build)

Now we will start the spark streaming using the TwitterUtils.createStream class which takes the streamingcontext, authorization_details as the parameters as shown below.

val tweets = TwitterUtils.createStream(ssc, Some(auth))

 Now, our streaming application will be able to stream the data and store it in the variable tweets. In the streamed tweets, we will get different languages of tweets. To filter them out, we use the below line

val englishTweets = tweets.filter(_.getLang() == "en")

This line will filter out the tweets that are in English language only. In the next line, we will save the tweets collected every 5 seconds, in a new file.

Filtering the tweets based on language is added to the twitter-4j-3.0.6 jar, but Spark Streaming for Twitter has been developed with twitter-4j-3.0.2 jar. You can download the jar file from here.

After downloading the jar file, add it to the build path and then run it by passing the keys as arguments, as shown below.

Right click on the class–> Run As –> Run Configurations –> Arguments–> Here pass the 4keys separated by tab space.

Now you can run the program as a normal Scala application and the console is shown below.

You can see that some of the files have been created in your specified path, and on clicking those files, you will be able to see some part of the files contain the tweets. You refer to the below screen shot for this.

We hope this post has been helpful in understanding how to stream Twitter data using Spark Streaming. In case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.

Spark

3 Comments

  1. Hi,
    I tried the above program but I could not see the tweets saved in the output folder. Empty part-0000 files are created.
    Please help me regarding this.

  2. Hi,
    I am using a linux terminal and using spark-submit to run the script Therefore I am trying to pass the Twitter keys as arguments at the end, by default spark-submit arguments are contained within brackets [ ].
    I tried all ways to pass the keys however I keep getting the <4 arg error message " Usage : TwitterData ….."
    Please let me know how to passt the argument at the end.
    Thanks.

  3. Assume the dataset given below will be streamed. It must be streamed into a queuing system. It must be processed on cluster using spark give out the following results
    Get all the comments whose score is >5
    Get all the UserReputation who have commented more than twice with score 1000
    Which badge of users has the most reputation.
    can you please help me.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close