Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • Streaming Twitter Data using Spark

Streaming Twitter Data using Spark

 July 19  | 1 Comments

In this post, we will be discussing how to stream Twitter data using Spark Streaming. Let’s begin with what Spark Streaming is.

Before going to spark streaming, we recommend our users to get some idea on Spark core and RDD’s.

Spark RDD’s in Scala part-1

Spark RDD’s in Scala part-2

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. In Spark Streaming, the data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, the processed data can be pushed out to the filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which is then processed by the Spark engine to generate the final stream of results in batches.

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from various sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.

Now, we will stream Twitter data using Spark. Spark has a package called Twitter.utils, which contains all the built-in functions to stream data from Twitter.

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
import twitter4j.Status
import org.apache.spark.streaming.twitter.TwitterUtils
object TwitterData {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: TwitterData <ConsumerKey><ConsumerSecret><accessToken><accessTokenSecret>" +
        "[<filters>]")
      System.exit(1)
    }
    val appName = "TwitterData"
    val conf = new SparkConf()
    conf.setAppName(appName).setMaster("local[3]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)
    val cb = new ConfigurationBuilder
    cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
      .setOAuthConsumerSecret(consumerSecret)
      .setOAuthAccessToken(accessToken)
      .setOAuthAccessTokenSecret(accessTokenSecret)
    val auth = new OAuthAuthorization(cb.build)
    val tweets = TwitterUtils.createStream(ssc, Some(auth))
    val englishTweets = tweets.filter(_.getLang() == "en")
    englishTweets .saveAsTextFiles("tweets", "json")
    ssc.start()
    ssc.awaitTermination()
  }
}

 

First, we need to set the Spark streaming context as follows:

val ssc = new StreamingContext(conf, Seconds(5))

Streaming context takes two parameters; your application configuration and the streaming time. As Spark streams data in micro batches, we need to set some time so that for every set time (time_set), be it seconds or milliseconds, it will stream the data. Here, we have set 5 seconds, so for every 5 seconds, it will stream the data from Twitter and save it in a new file.

When we run the above program, we can see the tweets collected in the file tweets-id .json. As we all know that twitter needs 4 keys to authenticate a user i.e., Consumer key, Consumer Secret, Access Token, and Access Token Secret, all these are to be passed as arguments to the program.

Now, we have used the ConfigurationBuilder class to take the keys for Twitter authentication as follows:

val cb = new ConfigurationBuilder
    cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
      .setOAuthConsumerSecret(consumerSecret)
      .setOAuthAccessToken(accessToken)
      .setOAuthAccessTokenSecret(accessTokenSecret)

Now the authorization can be done as follows:

val auth = new OauthAuthorization(cb.build)

Now we will start the spark streaming using the TwitterUtils.createStream class which takes the streamingcontext, authorization_details as the parameters as shown below.

val tweets = TwitterUtils.createStream(ssc, Some(auth))

Now, our streaming application will be able to stream the data and store it in the variable tweets. In the streamed tweets, we will get different languages of tweets. To filter them out, we use the below line

val englishTweets = tweets.filter(_.getLang() == "en")

This line will filter out the tweets that are in English language only. In the next line, we will save the tweets collected every 5 seconds, in a new file.

Filtering the tweets based on language is added to the twitter-4j-3.0.6 jar, but Spark Streaming for Twitter has been developed with twitter-4j-3.0.2 jar. You can download the jar file from here.

After downloading the jar file, add it to the build path and then run it by passing the keys as arguments, as shown below.

Right click on the class–> Run As –> Run Configurations –> Arguments–> Here pass the 4keys separated by tab space.

Now you can run the program as a normal Scala application and the console is shown below.

 

You can see that some of the files have been created in your specified path, and on clicking those files, you will be able to see some part of the files contain the tweets. You refer to the below screen shot for this.

We hope this post has been helpful in understanding how to stream Twitter data using Spark Streaming. In case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.

>