Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • streaming-twitter-data-using-kafka

streaming-twitter-data-using-kafka

 July 19  | 0 Comments

In this post, we will be discussing how to stream Twitter data using Kafka. Before going through this post, please ensure that you have installed Kafka and Zookeeper services in your system.

You can refer to this post for installing Kafka and this one for installing Zookeeper.

Streaming Twitter data using Hosebird

Twitter provides Hosebird client (hbc), a robust Java HTTP library for consuming Twitter’s Streaming API.

Hosebird is the server implementation of the Twitter Streaming API. The Streaming API allows clients to receive Tweets in near real-time. Various resources allow filtered, sampled or full access to some or all Tweets. Every Twitter account has access to the Streaming API and any developer can build applications today. Hosebird also powers the recently announced User Streams feature that streams all events related to a given user to drive desktop Twitter clients.
Let’s begin by starting Kafka and Zookeeper services.
Start Zookeeper server by moving into the bin folder of Zookeeper installed directory by using thezkServer.sh start command.

Start Kafka server by moving into the bin folder of Kafka installed directory by using the command

./kafka-server-start.sh ../config/server.properties.

In Kafka, there are two classes – Producers and Consumers. You can refer to them in detail here.

Producer class to stream twitter data

package kafka;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
public class TwitterKafkaProducer {
    private static final String topic = "hadoop";
    public static void run() throws InterruptedException {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("client.id","camus");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(
                producerConfig);
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(100000);
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        endpoint.trackTerms(Lists.newArrayList("twitterapi",
                "#AAPSweep"));
        String consumerKey=    TwitterSourceConstant.CONSUMER_KEY_KEY;
        String consumerSecret=TwitterSourceConstant.CONSUMER_SECRET_KEY;
        String accessToken=TwitterSourceConstant.ACCESS_TOKEN_KEY;
        String accessTokenSecret=TwitterSourceConstant.ACCESS_TOKEN_SECRET_KEY;
        Authentication auth = new OAuth1(consumerKey, consumerSecret, accessToken,
                accessTokenSecret);
        Client client = new ClientBuilder().hosts(Constants.STREAM_HOST)
                .endpoint(endpoint).authentication(auth)
                .processor(new StringDelimitedProcessor(queue)).build();
        client.connect();
        for (int msgRead = 0; msgRead < 1000; msgRead++) {
            KeyedMessage<String, String> message = null;
            try {
                message = new KeyedMessage<String, String>(topic, queue.take());
            } catch (InterruptedException e) {
                //e.printStackTrace();
                System.out.println("Stream ended");
            }
            producer.send(message);
        }
        producer.close();
        client.stop();
    }
    public static void main(String[] args) {
        try {
            TwitterKafkaProducer.run();
        } catch (InterruptedException e) {
            System.out.println(e);
        }
    }
}

Here, Twitter authorization is done through consumerKey,consumerSecret,accessToken,accessTokenSecret. Hence, we are passing them through a class called TwitterSourceConstant.

public class TwitterSourceConstant {
public static final String CONSUMER_KEY_KEY = "xxxxxxxxxxxxxxxxxxxxxxxx";
public static final String CONSUMER_SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxx";
public static final String ACCESS_TOKEN_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxx";
public static final String ACCESS_TOKEN_SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxx";
}

In the private static final String topic = “Hadoop”; of producer class, we will pass our Topic to stream the particular data from Twitter. So, we need to start this Producer class to start streaming data from Twitter.
Now, we will write a Consumer class to print the streamed tweets. The consumer class is as follows:

Consumer class to stream twitter data

package kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer {
    private ConsumerConnector consumerConnector = null;
    private final String topic = "twitter-topic1";
    public void initialize() {
          Properties props = new Properties();
          props.put("zookeeper.connect", "localhost:2181");
          props.put("group.id", "testgroup");
          props.put("zookeeper.session.timeout.ms", "400");
          props.put("zookeeper.sync.time.ms", "300");
          props.put("auto.commit.interval.ms", "100");
          ConsumerConfig conConfig = new ConsumerConfig(props);
          consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
    }
    public void consume() {
          //Key = topic name, Value = No. of threads for topic
          Map<String, Integer> topicCount = new HashMap<String, Integer>();
          topicCount.put(topic, new Integer(1));
          //ConsumerConnector creates the message stream for each topic
          Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
                consumerConnector.createMessageStreams(topicCount);
          // Get Kafka stream for topic 'mytopic'
          List<KafkaStream<byte[], byte[]>> kStreamList =
                                               consumerStreams.get(topic);
          // Iterate stream using ConsumerIterator
          for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                 ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
                 while (consumerIte.hasNext())
                        System.out.println("Message consumed from topic[" + topic + "] : "       +
                                        new String(consumerIte.next().message()));
          }
          //Shutdown the consumer connector
          if (consumerConnector != null)   consumerConnector.shutdown();
    }
    public static void main(String[] args) throws InterruptedException {
          KafkaConsumer kafkaConsumer = new KafkaConsumer();
          // Configure Kafka consumer
          kafkaConsumer.initialize();
          // Start consumption
          kafkaConsumer.consume();
    }
}

When we run the above Consumer class, it will print all the tweets collected in that moment.
We have build this project through Maven and the pom.xml file is as follows:

pom.xml

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0                       http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.twitter</groupId>
  <artifactId>hbc-example</artifactId>
  <version>2.2.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>Hosebird Client Examples</name>
  <properties>
    <git.dir>${project.basedir}/../.git</git.dir>
    <!-- this makes maven-tools not bump us to snapshot versions -->
    <stabilized>true</stabilized>
    <!-- Fill these in via https://dev.twitter.com/apps -->
    <consumer.key>TODO</consumer.key>
    <consumer.secret>TODO</consumer.secret>
    <access.token>TODO</access.token>
    <access.token.secret>TODO</access.token.secret>
  </properties>
  <dependencies>
    <dependency>
      <groupId>com.twitter</groupId>
      <artifactId>hbc-twitter4j</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.1</version>
</dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-deploy-plugin</artifactId>
        <version>2.7</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.2.1</version>
      </plugin>
    </plugins>
  </build>
</project>

We need to run the Producer and Consumer programs in Eclipse. Therefore, we need to run the Producer to stream the tweets from Twitter. The Eclipse console of the Producer is as shown in the screenshot.

 

Now, let’s run the Consumer class of Kafka. The console of the Consumer with the collected tweets is as shown in the below screenshot.

 

Here, we have collected the tweets related to Hadoop topic, which has been set in the Producer class.
We can also check for the topics on which Kafka is running now, using the command

./kafka-topics.sh –zookeeper localhost:2181 –list

We can check the Consumer console simultaneously as well, to check the tweets collected in real-time using, the below command:

./kafka-console-consumer.sh –zookeeper localhost:2181 –topic “hadoop” –from-beginning

Below is the screenshot of the Consumer console with the tweets.

 

So, this is how we collect streaming data from Twitter using Kafka.

We hope this post has been helpful in understanding how to collect streaming data from Twitter using Kafka. In case of any queries, feel free to comment below and we will get back to you at the earliest.

For more updates on Big Data and other technologies keep visiting our site www.acadgild.com

>