Big Data Hadoop & Spark

Streaming Twitter Data using Kafka

In this post, we will be discussing how to stream Twitter data using Kafka. Before going through this post, please ensure that you have installed Kafka and Zookeeper services in your system.

You can refer to this post for installing Kafka and this one for installing Zookeeper.

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

Streaming Twitter data using Hosebird

Twitter provides Hosebird client (hbc), a robust Java HTTP library for consuming Twitter’s Streaming API.

Hosebird is the server implementation of the Twitter Streaming API. The Streaming API allows clients to receive Tweets in near real-time. Various resources allow filtered, sampled or full access to some or all Tweets. Every Twitter account has access to the Streaming API and any developer can build applications today. Hosebird also powers the recently announced User Streams feature that streams all events related to a given user to drive desktop Twitter clients.
Let’s begin by starting Kafka and Zookeeper services.
Start Zookeeper server by moving into the bin folder of Zookeeper installed directory by using the
zkServer.sh start command.

Starting zookeeper

Start Kafka server by moving into the bin folder of Kafka installed directory by using the command

./kafka-server-start.sh ../config/server.properties.

Starting kafka server

In Kafka, there are two classes – Producers and Consumers. You can refer to them in detail here.

Producer class to stream twitter data

package kafka;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
public class TwitterKafkaProducer {
    private static final String topic = "hadoop";
    public static void run() throws InterruptedException {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("client.id","camus");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(
                producerConfig);
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(100000);
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.trackTerms(Lists.newArrayList("twitterapi",
                "#AAPSweep"));
        String consumerKey=    TwitterSourceConstant.CONSUMER_KEY_KEY;
        String consumerSecret=TwitterSourceConstant.CONSUMER_SECRET_KEY;
        String accessToken=TwitterSourceConstant.ACCESS_TOKEN_KEY;
        String accessTokenSecret=TwitterSourceConstant.ACCESS_TOKEN_SECRET_KEY;
        Authentication auth = new OAuth1(consumerKey, consumerSecret, accessToken,
                accessTokenSecret);
        Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
                .endpoint(endpoint).authentication(auth)
                .processor(new StringDelimitedProcessor(queue)).build();
        client.connect();
        for (int msgRead = 0; msgRead < 1000; msgRead++) {
            KeyedMessage<String, String> message = null;
            try {
                message = new KeyedMessage<String, String>(topic, queue.take());
            } catch (InterruptedException e) {
                //e.printStackTrace();
                System.out.println("Stream ended");
            }
            producer.send(message);
        }
        producer.close();
        client.stop();
    }
    public static void main(String[] args) {
        try {
            TwitterKafkaProducer.run();
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }
}

Here, Twitter authorization is done through consumerKey,consumerSecret,accessToken,accessTokenSecret. Hence, we are passing them through a class called TwitterSourceConstant.

public class TwitterSourceConstant {
public static final String CONSUMER_KEY_KEY = "xxxxxxxxxxxxxxxxxxxxxxxx";
public static final String CONSUMER_SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxx";
public static final String ACCESS_TOKEN_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxx";
public static final String ACCESS_TOKEN_SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxx";
}

In the private static final String topic = “Hadoop”; of producer class, we will pass our Topic to stream the particular data from Twitter. So, we need to start this Producer class to start streaming data from Twitter.
Now, we will write a Consumer class to print the streamed tweets. The consumer class is as follows:

Consumer class to stream twitter data

package kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer {
    private ConsumerConnector consumerConnector = null;
    private final String topic = "twitter-topic1";
    public void initialize() {
          Properties props = new Properties();
          props.put("zookeeper.connect", "localhost:2181");
          props.put("group.id", "testgroup");
          props.put("zookeeper.session.timeout.ms", "400");
          props.put("zookeeper.sync.time.ms", "300");
          props.put("auto.commit.interval.ms", "100");
          ConsumerConfig conConfig = new ConsumerConfig(props);
          consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
    }
    public void consume() {
          //Key = topic name, Value = No. of threads for topic
          Map<String, Integer> topicCount = new HashMap<String, Integer>();
          topicCount.put(topic, new Integer(1));
          //ConsumerConnector creates the message stream for each topic
          Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
                consumerConnector.createMessageStreams(topicCount);
          // Get Kafka stream for topic 'mytopic'
          List<KafkaStream<byte[], byte[]>> kStreamList =
                                               consumerStreams.get(topic);
          // Iterate stream using ConsumerIterator
          for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                 ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
                 while (consumerIte.hasNext())
                        System.out.println("Message consumed from topic[" + topic + "] : "       +
                                        new String(consumerIte.next().message()));
          }
          //Shutdown the consumer connector
          if (consumerConnector != null)   consumerConnector.shutdown();
    }
    public static void main(String[] args) throws InterruptedException {
          KafkaConsumer kafkaConsumer = new KafkaConsumer();
          // Configure Kafka consumer
          kafkaConsumer.initialize();
          // Start consumption
          kafkaConsumer.consume();
    }
}

When we run the above Consumer class, it will print all the tweets collected in that moment.
We have build this project through Maven and the pom.xml file is as follows:

pom.xml

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0                       http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.twitter</groupId>
  <artifactId>hbc-example</artifactId>
  <version>2.2.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>Hosebird Client Examples</name>
  <properties>
    <git.dir>${project.basedir}/../.git</git.dir>
    <!-- this makes maven-tools not bump us to snapshot versions -->
    <stabilized>true</stabilized>
    <!-- Fill these in via https://dev.twitter.com/apps -->
    <consumer.key>TODO</consumer.key>
    <consumer.secret>TODO</consumer.secret>
    <access.token>TODO</access.token>
    <access.token.secret>TODO</access.token.secret>
  </properties>
  <dependencies>
    <dependency>
      <groupId>com.twitter</groupId>
      <artifactId>hbc-twitter4j</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
</dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-deploy-plugin</artifactId>
        <version>2.7</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.2.1</version>
      </plugin>
    </plugins>
  </build>
</project>

We need to run the Producer and Consumer programs in Eclipse. Therefore, we need to run the Producer to stream the tweets from Twitter. The Eclipse console of the Producer is as shown in the screenshot.

Running producer class in eclipse

Now, let’s run the Consumer class of Kafka. The console of the Consumer with the collected tweets is as shown in the below screenshot.

tweets collection using kafka

Here, we have collected the tweets related to Hadoop topic, which has been set in the Producer class.
We can also check for the topics on which Kafka is running now, using the command

./kafka-topics.sh –zookeeper localhost:2181 –list

We can check the Consumer console simultaneously as well, to check the tweets collected in real-time using, the below command:

./kafka-console-consumer.sh –zookeeper localhost:2181 –topic “hadoop” –from-beginning

Below is the screenshot of the Consumer console with the tweets.

Kafka console

So, this is how we collect streaming data from Twitter using Kafka.

We hope this post has been helpful in understanding how to collect streaming data from Twitter using Kafka. In case of any queries, feel free to comment below and we will get back to you at the earliest.

For more updates on Big Data and other technologies keep visiting our site www.acadgild.com

 

4 Comments

  1. I am getting following error. Please help me out how to resolve this issue
    Exception in thread “main” java.lang.UnsupportedClassVersionError: scala/collection/immutable/StringLike : Unsupported major.minor version 52.0
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at kafkaa.TwitterKafkaProducer.run(TwitterKafkaProducer.java:31)
    at kafkaa.TwitterKafkaProducer.main(TwitterKafkaProducer.java:71)

  2. Exception in thread “main” java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
    at kafka.utils.Pool.(Unknown Source)
    at kafka.producer.ProducerStatsRegistry$.(Unknown Source)
    at kafka.producer.ProducerStatsRegistry$.(Unknown Source)
    at kafka.producer.async.DefaultEventHandler.(Unknown Source)
    at kafka.producer.Producer.(Unknown Source)
    at kafka.javaapi.producer.Producer.(Unknown Source)
    at TwitterKafkaProducer.run(TwitterKafkaProducer.java:30)
    at TwitterKafkaProducer.main(TwitterKafkaProducer.java:70)
    Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    … 8 more

  3. I am getting following exception when I try to run my Consumer class :
    [main-SendThread(18.220.10.232:2181)] INFO org.apache.zookeeper.ClientCnxn – Session establishment complete on server 18.220.10.232/18.220.10.232:2181, sessionid = 0x16104893fd00003, negotiated timeout = 60000
    Exception in thread “main” java.lang.ClassCastException: kafka.javaapi.consumer.ZookeeperConsumerConnector cannot be cast to kafka.consumer.ConsumerConnector
    at Project1.TwitterStreaming.KafkaConsumer.initialize(KafkaConsumer.java:28)
    at Project1.TwitterStreaming.KafkaConsumer.main(KafkaConsumer.java:58)
    My Zookeeper and Kafka Services are running on port 2181 and 9092 resp.
    Thanks in advance!

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close