Big Data Hadoop & Spark

Spark Streaming and Kafka Integration

Spark streaming and Kafka Integration are the best combinations to build real-time applications. Spark is an in-memory processing engine on top of the Hadoop ecosystem, and Kafka is a distributed public-subscribe messaging system. Kafka can stream data continuously from a source and Spark can process this stream of data instantly with its in-memory processing primitives. By integrating Kafka and Spark, a lot can be done. We can even build a real-time machine learning application.

Spark streaming and Kafka Integration

Before going with Spark streaming and Kafka Integration, let’s have some basic knowledge about Kafka by going through our previous blog on Kafka.

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

Kafka Producers and Consumers

You can install Kafka by going through this blog:

Installing Kafka

Though, let’s get started with the integration. First, we need to start the daemon.

Start the zookeeper server in Kafka by navigating into $KAFKA_HOME with the command given below:

./bin/zookeeper-server-start.sh config/zookeeper.properties

Keep the terminal running, open one new terminal, and start the Kafka broker using the following command:

./bin/kafka-server-start.sh config/server.properties

After starting, leave both the terminals running, open a new terminal, and create a Kafka topic with the following command:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic acdgild-topic

Note down the port number and the topic name here, you need to pass these as parameters in Spark.
After creating a topic below you will get a message that your topic is created.
Created topic “acadgild-topic”

You can also check the topic list using the following command:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

Now for sending messages to this topic, you can use the console producer and send messages continuously. You can use the following commands to start the console producer.

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic acadgild-topic

You can see all the 4 consoles in the screen shot below:

You can now send messages using the console producer terminal.

Hadoop

Now in Spark, we will develop an application to consume the data that will do the word count for us. Our Spark application is as follows:

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
object WordCount {
  def main( args:Array[String] ){
    val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
    val ssc = new StreamingContext(conf, Seconds(10))
    val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("acadgild-topic" -> 5))
//need to change the topic name and the port number accordingly
    val words = kafkaStream.flatMap(x =>  x._2.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    kafkaStream.print()  //prints the stream of data received
    wordCounts.print()   //prints the wordcount result of the stream
    ssc.start()
    ssc.awaitTermination()
  }
}

kafkaUtils provides a method called createStream in which we need to provide the input stream details, i.e., the port number where the topic is created and the topic name.

The parameters of a static ReceiverInputDstream are as follows:

createStream(StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

Parameters

ssc – StreamingContext object

zkQuorum – Zookeeper quorum (hostname:port,hostname:port,..)

groupId – The group id for this consumer

topics – Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

storageLevel – Storage level to use for storing the received objects (default: StorageLevel.MEMORY_AND_DISK_SER_2)

After receiving the stream of data, you can perform the Spark streaming context operations on that data.

The above streaming job will run for every 10 seconds and it will do the wordcount for the data it has received in those 10 seconds.

Here is an example, we are sending a message from the console producer and the Spark job will do the word count instantly and return the results as shown in the screenshot below:

Spark streaming and Kafka Integration

Here are the Maven dependencies of our project:

Note: In order to convert you Java project into a Maven project, right click on the project—> Configure —> Convert to Maven project

Now in the target–>pom.xml file, add the following dependency configurations. Then all the required dependencies will get downloaded automatically.

<dependencies>
  	<dependency>
  		<groupId>org.apache.spark</groupId>
  		<artifactId>spark-streaming_2.11</artifactId>
  		<version>1.6.3</version>
  	</dependency>
  	<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.11 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.3</version>
</dependency>

This is how you can perform Spark streaming and Kafka Integration in a simpler way by creating the producers, topics, and brokers from the command line and accessing them from the Kafka create stream method.

We hope this blog helped you in understanding how to build an application having Spark streaming and Kafka Integration.

Enroll for Apache Spark Training conducted by Acadgild for a successful career growth.

Spark

3 Comments

  1. You can save the resultant rdd to the hdfs location like :
    wordCounts.saveAsTextFile(“/hdfs location”)

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close