Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • Building a Real Time Application using Kafka and Spark

Building a Real Time Application using Kafka and Spark

 July 9  | 0 Comments

Analyzing real-time streaming data with accuracy and storing this lightning fast data has become one of the biggest challenges in the world of big data. One of the best solutions for tackling this problem is building a real-time streaming application with Kafka and Spark and storing this incoming data into HBase using Spark.

In this blog, we will be discussing on how to build a real-time stateful streaming application using Kafka and Spark and storing these results in HBase in real time. Before going through this blog, we recommend our users to go through our previous blogs on Kafka, Spark Streaming, and Hbase. Click Here for Kafka and Spark Intergration. Beginners Guide of HBase, Stateful Streaming Blog Link

Here is the source code of our streaming application which runs every 10 seconds and stores the results back to the HBase.

package WordCount
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{ State, StateSpec }
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor, HColumnDescriptor }
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.mapreduce.{ TableInputFormat, TableOutputFormat }
import org.apache.hadoop.hbase.client.{ HBaseAdmin, Put, HTable }
object Kafka_HBase {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("Kafka_Spark_Hbase")
val ssc = new StreamingContext(conf, Seconds(10))
/*
* Defingin the Kafka server parameters
*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("acadgild_topic") //topics list
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val splits = kafkaStream.map(record => (record.key(), record.value.toString)).flatMap(x => x._2.split(" "))
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
val updatedSum = currentCount+previousCount
Some(updatedSum)
}
//Defining a check point directory for performing stateful operations
ssc.checkpoint("hdfs://localhost:9000/WordCount_checkpoint")
val cnt = splits.map(x => (x, 1)).reduceByKey(_ + _).updateStateByKey(updateFunc)
def toHBase(row: (_, _)) {
val hConf = new HBaseConfiguration()
hConf.set("hbase.zookeeper.quorum", "localhost:2182")
val tableName = "Streaming_wordcount"
val hTable = new HTable(hConf, tableName)
val tableDescription = new HTableDescriptor(tableName)
//tableDescription.addFamily(new HColumnDescriptor("Details".getBytes()))
val thePut = new Put(Bytes.toBytes(row._1.toString()))
thePut.add(Bytes.toBytes("Word_count"), Bytes.toBytes("Occurances"), Bytes.toBytes(row._2.toString))
hTable.put(thePut)
}
val Hbase_inset = cnt.foreachRDD(rdd => if (!rdd.isEmpty()) rdd.foreach(toHBase(_)))
ssc.start()
ssc.awaitTermination()
}
}

Given below are the Maven dependencies for the application:

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-protocol -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-hadoop2-compat -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-annotations -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-annotations</artifactId>
<version>1.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>

Technology stacks used:

Spark Version 2.1.0

Kafka Version 0.10.2

HBase Version 0.13

Note: In this application, we are performing stateful streaming so the occurrences of the words will be accumulated right from the starting state of the streaming application. You can refer to our stateful streaming using our Spark blog to know more about it.

Now this application will calculate the accumulated word counts of the words and update the results back to HBase.

Now, let us run this application as a normal Spark streaming application and produce some data through our Kafka console producer and check for the word count results in HBase.

In the screenshot below, you can see that our streaming application is running.

Let us give some input now.

In the above screenshot, you can see the word count results in HBase, let’s give the same input again and check for the accumulated results.

Of these results in HBase, we can again build a Hive external table using Hive-HBase storage handler and query the results. You can also read our blog post on HBase Write Using Hive to know how to build an external table on a table in HBase.

This is how we can build real-time robust streaming applications using Kafka, Spark, and HBase. Keep visiting our website, www.acadgild.com, for more updates on Big Data and other technologies.

Enroll for Big Data and Hadoop Training conducted by Acadgild and become a successful big data developer.

>