Free Shipping

Secure Payment

easy returns

24/7 support

Kafka Producer and Consumer

 July 14  | 0 Comments

In our previous post, Guide to Installing Kafka, we had discussed what Kafka is and how to

install it. In this post, we will be taking an in-depth look at Kafka Producer and Consumer in Java.

Before proceeding further, let’s make sure we understand some of the important terminologies related to Kafka.

Topics: In Kafka, a Topic is a category or a stream name to which messages are published.

or every new category of messages, users should define a new topic name.

Kafka Producer:

It is a client or a program, which produces the message and pushes it to the Topic.

Kafka Consumer:

It is a client or a program, which consumes the published messages from the Producer.

Kafka Broker:

Each Kafka cluster consists of one or more servers called Brokers. The message data is replicated and persisted on the Brokers

Let’s create a standalone program which can produce the message and send it to Kafka broker. After that, let’s try writing another program which consumes the published messages created by Kafka producer.

Prerequisites:

  1. Centos 6.7
  2. Java 1.7 or above
  3. Eclipse IDE (We have used Mars)
  4. Zookeeper should be installed and running on port 2181. If you don’t have Zookeeper, click here for the installation guide.
  1. Kafka should be installed and running in port number 9092. If you don’t have Kafka, click here for the installation guide.Hyperlink relevant post

Creating Kafka Producer:

Let’s begin by creating a new Java project in Eclipse and add all the jars present in KAFKA_HOME/lib (KAFKA_HOME is Kafka installed directory).

Now let’s look at the requirements to implement the Producer.

To implement the Producer, we need 3 important classes. They are:

    1. ProducerConfig: This class is used for establishing the connection between Kafka Broker and client.  We should pass all the information like the list of brokers, serializer, class, etc. to be used.
    2. KeyedMessage: Producer uses this class to send messages to Kafka Broker. This class is used to define the topic.
  • Producer: This class is used to send the messages. All the messages are sent in the form of KeyedMessage.

Let’s look at the code now.

Code:

public class DemoProducer {
    final static String TOPIC = "Demo";
      static Producer<String, String> producer;
// method for intializing ProducerConfig  with required properties
    public static void init() {
        Properties producerProperties = new Properties();
        producerProperties .put("metadata.broker.list", "localhost:9092");
        producerProperties .put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig producerConfig = new ProducerConfig(producerProperties );
        producer = new Producer<String, String>(producerConfig);
  }
    public static void send_messages() throws Exception{
//create       BufferedReader instance to read the input from keyboard
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
    while (true){
        System.out.print("Enter message to send to kafka broker (Press 'T' to  terminate producer): ");
      String msg = null;
      msg = reader.readLine(); // Read message from console
      //Define topic name and message
      KeyedMessage<String, String> keyedMsg =
                   new KeyedMessage<String, String>(TOPIC, msg);
      producer.send(keyedMsg); // This publishes message on given topic
      if("T".equals(msg)){ break; }
      System.out.println("--> Message [" + msg + "] sent. To Consumer");
    }
    return;
  }
    public static void main(String[] argv) throws Exception{
 //initialize and establish the connection
    	DemoProducer .init();
//Send the messages
        DemoProducer .send_messages();
//close the producer after sending
        producer.close();
    }
}

 

Kafka Consumer:

To implement the Consumer  we need 4 important classes. They are:

  1. ConsumerConfig: This class is used for establishing the connection between Consumer and Zookeeper.
  1. ConsumerConnector:  This is used for establishing a connection with the Zookeeper. This is the Kafka interface implemented by ZookeeperConsumerConnector class and the ZookeeperConsumerConnector is responsible for all interactions.
  1. KafkaStream: This is a Java Map object which contains topic name as Map key and list of KafkaStream objects as value.  Map<String, List<KafkaStream<K, V>>
  1. ConsumerIterator: ConsumerIterator is used to iterate KafkaStream.
public class DemoConsumer extends  Thread {
    final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = "Demo";
    ConsumerConnector consumerConnector;
    public static void main(String[] argv) throws UnsupportedEncodingException {
        DemoConsumer demoConsumer = new DemoConsumer ();
      demoConsumer.start();
    }
    public DemoConsumer (){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","localhost:2181");
        properties.put("group.id","test-group");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }
    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext())
            System.out.println(new String(it.next().message()));
    }
    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
        for(MessageAndOffset messageAndOffset: messageSet) {
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(new String(bytes, "UTF-8"));
        }
    }
}

 

Let’s start the Kafka server and zookeeper.

Starting Zookeeper:

 

Starting Kafka-server:

Verify Kafka and Zookeeper:

 

Now, open the Eclipse IDE and run both the producer as well as the consumer.

 

Enter the messages in Producer and send them to Kafka stream, which will be received by Consumer.

 

Hope this post has been helpful in understanding the basics of Producer and Consumer in Kafka. Keep visiting our blog for more posts.

In case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our site www.acadgild.com for more updates on Bigdata and other technologies.

>