Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • Stateful Streaming in Apache Spark

Stateful Streaming in Apache Spark

 July 19  | 0 Comments

Apache Spark is a general processing engine built on top of the Hadoop eco-system. Spark has a complete setup and a unified framework to process any kind of data. Spark can do batch processing as well as stream processing. Spark has a powerful SQL engine to run SQL queries on the data; it also has an integrated Machine Learning library called MlLib and a graph processing library called GraphX. As it can integrate many things into it, we identify Spark as a unified framework rather than a processing engine.

Now coming to the real-time stream processing engine of Spark. Spark doesn’t process the data in real time it does a near-real-time processing. It means it processes the data in micro batches, in just a few milliseconds.

Here we have got a program where Spark’s streaming context will process the data in micro batches but generally, this processing is stateless. Let’s take we have defined the streaming Context to run for every 10 seconds, it will process the data that is arrived within that 10 seconds, to process the previous data we have something called windows concept, windows cannot give the accumulated results from the starting timestamp.

But what if you need to the accumulate the results from the start of the streaming job. Which means you need to check the previous state of the RDD in order to update the new state of the RDD. This is what is known as stateful streaming in Spark.

Spark provides 2 API’s to perform stateful streaming, which is updateStateByKey and mapWithState.

Now we will see how to perform stateful streaming of wordcount using updateStateByKey. UpdateStateByKey is a function of Dstreams in Spark which accepts an update function as its parameter. In that update function, you need to provide the following parameters newState for the key which is a seq of values and the previous state of key as an Option[?].

Let’s take a word count program, let’s say for the first 10 seconds we have given this data hello every one from acadgild. Now the wordcount program result will be

(one,1)

(hello,1)

(from,1)

(acadgild,1)

(every,1)

Now without writing the updateStateByKey function, if you give some other data, in the next 10 seconds i.e. let’s assume we give the same line hello every one from acadigld. Now we will get the same result in the next 10 seconds also i.e.,

(one,1)

(hello,1)

(from,1)

(acadgild,1)

(every,1)

Now, what if we need an accumulated result of the wordcount which counts my previous results also. This is where stateful streaming comes into the act. In stateful streaming, your key’s previous state will be preserved and it will be updated with new results.

Note: For performing stateful operations, you will need a key value pair because streamingContext remembers the state of your RDD based on the keys itself.

In our previous blog on Kafka-Spark-Streaming integration, we have discussed about how to integrate Apache spark with Kafka and do realtime processing. We recommend our users to go through our previous blog on Kafka Spark integration to generate your input to the Spark streaming job using Kafka-producer console. You can refer the below link for the same.

https://acadgild.com/blog/spark-streaming-and-kafka-integration/

Below is the Spark Scala program to perform stateful streaming using Kafka and Spark streaming.

Here are the Spark and Kafka versions we have used to build this application

Spark Version: 2.1.0

Kafka Version: 0.10.2

Here is the source code of the application:

 

package WordCount
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ State, StateSpec }
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object stateFulWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
    val ssc = new StreamingContext(conf, Seconds(10))
    /*
     * Defingin the Kafka server parameters
     */
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092,localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean))
    val topics = Array("acadgild-topic") //topics list
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams))
    val splits = kafkaStream.map(record => (record.key(), record.value.toString)).flatMap(x => x._2.split(" "))
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    //Defining a check point directory for performing stateful operations
    ssc.checkpoint("hdfs://localhost:9000/WordCount_checkpoint")
    val wordCounts = splits.map(x => (x, 1)).reduceByKey(_+_).updateStateByKey(updateFunc)
    kafkaStream.print() //prints the stream of data received
    wordCounts.print() //prints the wordcount result of the stream
    ssc.start()
    ssc.awaitTermination()
  }
}

Here are the sparkStraeming and Kafka dependencies which you need to add if you are building your application with SBT.

name := "StateSpark"
version := "0.1"
scalaVersion := "2.11.8"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.2.0"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.0"

 

Here are the sparkStraeming and Kafka dependencies which you need to add if you are building your application with Maven.

<dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
</dependencies>

The major difference here is the addition of the update function and the addition of updateStateByKey function to Dstream.

val wordCounts = splits.map(x => (x, 1)).reduceByKey(_+_).updateStateByKey(updateFunc)
   val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      val updatedSum = currentCount+previousCount
      Some(updatedSum)
    }

The updateFunc will work on each key, for every key in the RDD, this update function will run, it will take the last state of your key and it will check for the new values for your and the data operation whatever you want to do for your key and return the new values as a Some().

For working with this update function, you need to mandatorily provide a Checkpoint directory for your SparkStreamingContext as

ssc.checkpoint(“hdfs://localhost:9000/WordCount_checkpoint”)

Because your intermediate values will be stored in this checkpoint directory for fault tolerance, it is suggested that you give your checkpoint directory in HDFS for more fault tolerance.

In the above update function, we are getting the new values of that key as a Seq[Int] and the oldValues of that key as Option[Int](Which is already calculated). Now inside the function, we aggregating the newValues of the key using the foldLeft function and then we are getting the old state value of the key and we are adding the both to the Some() and returning the updated sum of the values.

Let’s check for the results now.

First we enter the below line

hello every one from acadgild

In the below screenshot, you can see the result as

(one,1)

(hello,1)

(from,1)

(acadgild,1)

(every,1)

Now let’s enter the same text again �?hello every one from acadgild’ and check for the accumulated results from the starting of our streaming job. We have got the below result

(one,2)

(hello,2)

(from,2)

(acadgild,2)

(every,2)

In the below screenshot, you can see the accumulated result.

Now let’s enter the same line again and check for the accumulated results.

hello every one from acadgild

Now we have got the result as follows:

(one,3)

(hello,3)

(from,3)

(acadgild,3)

(every,3)

We have got the accumulated results of our keys from the starting. You can see the same result in the below screen shot too.

This is how we can perform stateful streaming using updateStateByKey function.

Enroll for Apache Spark Training conducted by Acadgild and become a successful developer.

>