Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • Building data pipelines using Kafka Connect and Spark

Building data pipelines using Kafka Connect and Spark

 July 9  | 0 Comments

The Kafka Connect framework comes included with Apache Kafka which helps in integrating Kafka with other systems or other data sources. To copy data from a source to a destination file using Kafka, users mainly opt to choose these Kafka Connectors. For doing this, many types of source connectors and sink connectors are available for Kafka.

The Kafka Connect also provides Change Data Capture (CDC) which is an important thing to be noted for analyzing data inside a database. Kafka Connect continuously monitors your source database and reports the changes that keep happening in the data. You can use this data for real-time analysis using Spark or some other streaming engine.

In this tutorial, we will discuss how to connect Kafka to a file system and stream and analyze the continuously aggregating data using Spark.

Before going through this blog, we recommend our users to go through our previous blogs on Kafka (which we have listed below for your convenience) to get a brief understanding of what Kafka is, how it works, and how to integrate it with Apache Spark.

https://acadgild.com/blog/kafka-producer-consumer/

https://acadgild.com/blog/guide-installing-kafka/

https://acadgild.com/blog/spark-streaming-and-kafka-integration/

We hope you have got your basics sorted out, next, we need you to move into your Kafka’s installed directory, $KAFKA_HOME/config, and check for the file: connect-file-source.properties.

In this file, we need you to edit the following properties:

name=local-file-source //name of your file source
connector.class=FileStreamSource //Connector class – default for FileStream
tasks.max=1 //Number of tasks to run in parallel
file=test.txt //file location - Need to change accordingly
topic=kafka_connect-test //Name of the topic
We modified the above properties to these:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/home/kiran/Desktop/kafka_connect_test.txt
topic=kafka_connect_test

Now, you need to check for the Kafka brokers’ port numbers.

By default, the port number is 9092; If you want to change it, you need to set it in the connect-standalone.properties file.

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

With this, we are all set to build our application.

Now, start the Kafka servers, sources, and the zookeeper servers to populate the data into your file and let it get consumed by a Spark application.

In one of our previous blogs, we had built a stateful streaming application in Spark that helped calculate the accumulated word count of the data that was streamed in. We will implement the same word count application here.

(You can refer to stateful streaming in Spark, here: https://acadgild.com/blog/stateful-streaming-in-spark/)

In the application, you only need to change the topic’s name to the name you gave in the connect-file-source.properties file.

Firstly, start the zookeeper server by using the zookeeper properties as shown in the command below:

zookeeper-server-start.sh kafka_2.11-0.10.2.1/config/zookeeper.properties

Keep the terminal running, open another terminal, and start the Kafka server using the kafka server.properties as shown in the command below:

kafka-server-start.sh kafka_2.11-0.10.2.1/config/server.properties

Keep the terminal running, open another terminal, and start the source connectors using the stand-alone properties as shown in the command below:

connect-standalone.sh kafka_2.11-0.10.2.1/config/connect-standalone.properties kafka_2.11-0.10.2.1/config/connect-file-source.properties

Keep all the three terminals running as shown in the screenshot below:

Now, whatever data that you enter into the file will be converted into a string and will be stored in the topics on the brokers.

You can use the console consumer to check the output as shown in the screenshot below:

In the above screenshot, you can see that the data is stored in the JSON format. As also seen in the standalone properties of the Kafka file, we have used key.converter and value.converter parameters to convert the key and value into the JSON format which is a default constraint found in Kafka Connect.

Now using Spark, we need to subscribe to the topics to consume this data. In the JSON object, the data will be presented in the column for “payload.”

So, in our Spark application, we need to make a change to our program in order to pull out the actual data. For parsing the JSON string, we can use Scala’s JSON parser present in:

scala.util.parsing.json.JSON.parseFull

And, the final application will be as shown below:

object stateFulWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(10))
/*
* Defingin the Kafka server parameters
*/
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("kafka_connect_test") //topics list
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val splits = kafkaStream.map(record => (record.key(), record.value.toString)).
flatMap(x => scala.util.parsing.json.JSON.parseFull(x._2).get.asInstanceOf[Map[String, Any]].get("payload"))
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
val updatedSum = currentCount+previousCount
Some(updatedSum)
}
//Defining a check point directory for performing stateful operations
ssc.checkpoint("hdfs://localhost:9000/WordCount_checkpoint")
val wordCounts = splits.flatMap(x => x.toString.split(" ")).map(x => (x, 1)).reduceByKey(_+_).updateStateByKey(updateFunc)
wordCounts.print() //prints the wordcount result of the stream
ssc.start()
ssc.awaitTermination()
}
}

Now, we will run this application and provide some inputs to the file in real-time and we can see the word counts results displayed in our Eclipse console.

Now, push that data into the file.

For whatever data that you enter into the file, Kafka Connect will push this data into its topics (this typically happens whenever an event occurs, which means, whenever a new entry is made into the file).

The Spark streaming job will continuously run on the subscribed Kafka topics. Here, we have given the timing as 10 seconds, so whatever data that was entered into the topics in those 10 seconds will be taken and processed in real time and a stateful word count will be performed on it.

In this case, as shown in the screenshot above, you can see the input given by us and the results that our Spark streaming job produced in the Eclipse console. We can also store these results in any Spark-supported data source of our choice.

And this is how we build data pipelines using Kafka Connect and Spark streaming!

We hope this blog helped you in understanding what Kafka Connect is and how to build data pipelines using Kafka Connect and Spark streaming. Keep visiting our website, www.acadgild.com, for more updates on big data and other technologies.

>