Recent Tutorials and Articles
    Developing Apache Kafka Consumers in Java
    Published on: 2018-05-25 16:05:46
    Posted By: Amit Kumar

    This tutorial provides the steps to implement a basic Apache Kafka consumer in Java

    Abstract


    Apache Kafka is one of mature and reliable service when it comes to stream processing. It servers as a buffer of stream of messages(events) for various types of consumers.

    In this tutorial, we will learn about developing consumer programs using Java to pull events from Apache Kafka. We will also spend some time understanding consumer groups and see them in action.

     

    Pre-requisites


    Here are the pre-requisites needed to follow this tutorial effectively - 

    • Java 8 or higher
    • Kafka Cluster with version 1.0.0 (lower versions could also work by updating kafka-clients dependency version)
    • Basic knowledge of Apache Kafka

     

    Implementation Steps


    Let's start with creating basic Maven based Java project and add following dependency to pom.xml:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>

    We will be starting with following SimpleKafkaConsumer class, responsible for pulling messages from Kafka. It takes list of topic names, consumer group id and consumer code as inputs to its constructor.

    package com.aksain.kafka.consumers;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collection;
    import java.util.Properties;
    import java.util.function.BiConsumer;
    
    /**
     * {@link SimpleKafkaConsumer} pulls messages from provided topics in Apache Kafka and calls specified BiConsumer
     * for every pulled message.
     */
    public class SimpleKafkaConsumer extends Thread {
        private final KafkaConsumer<String, String> kafkaConsumer;
        private final BiConsumer<String, String> messageConsumer;
    
        /**
         *
         * @param topics to pull messages from
         * @param consumerGroupId to define group of this consumer
         * @param messageConsumer to call for each pulled message
         */
        public SimpleKafkaConsumer(
                Collection<String> topics, String consumerGroupId, BiConsumer<String, String> messageConsumer) {
    
            this.messageConsumer = messageConsumer;
    
            final Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092"); // Kafka message brokers in format host1:port1,host2:port2
            props.put("group.id", consumerGroupId); // Consumer group
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            kafkaConsumer = new KafkaConsumer<>(props);
            kafkaConsumer.subscribe(topics);
        }
    
        @Override
        public void run() {
            while (!isInterrupted()) {
                for (ConsumerRecord<String, String> consumerRecord : kafkaConsumer.poll(1000)) {
                    messageConsumer.accept(consumerRecord.key(), consumerRecord.value());
                }
            }
        }
    }

     

    And here is KafkaConsumerDemo class that contains main method and is responsible for reading topic names and group id from command line arguments to create and finally start SimpleKafkaConsumer instance.

    package com.aksain.kafka.main;
    
    import com.aksain.kafka.consumers.SimpleKafkaConsumer;
    
    import java.util.Arrays;
    
    /**
     * @author Amit Kumar
     */
    public class KafkaConsumerDemo {
        public static void main(String[] args) {
            if(args.length < 2) {
                System.out.println("Usage java -jar com.aksain.kafka.main.KafkaConsumerDemo <topicnames> <consumergroupid>");
                System.exit(1);
            }
    
            final String[] topicNames = args[0].split("\\s,\\s");
            final String groupId = args[1];
    
            new SimpleKafkaConsumer(
                    Arrays.asList(topicNames)
                    , groupId
                    , (key, value) -> System.out.println("Message Key: " + key + ", Value: " + value)
            ).start();
    
            // Produce messages using following command from Kafka home directory
            // ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
        }
    }

    You can also clone and setup this complete project from Github Repository.

     

    Running Message Consumer Demo


    We will be taking following steps to execute our Message Consumer Demo -

    Check that your Kafka service is running and accessible by creating a topic with name test-topic by executing following commands from Kafka home directory -

    # Change Zookeeper url, no of partitions and replication factor as applicable to your scenario
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test-topic --partitions 1 --replication-factor 1

    Verify that topics have been created successfully using following command - 

    # Change ZooKeeper Url as applicable
    ./bin/kafka-topics.sh --list --zookeeper localhost:2181

    Download runnable jar file of demo from Github dist directory and execute it to pull messages from test-topic using following command (in a new terminal window) - 

    # It assumes that your Kafka is running locally on port 9092
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar test-topic test

    Go back to Kafka terminal and execute following command from Kafka home directory to send messages that consumer will be pulling from topic test-topic -

    # Publish messages on topic: test-topic
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
    
    # Enter a message and press enter to send it to test-topic. Press ctrl+c to exit
    >First message for Java consumer
    >Second message for Java consumer
    >Third message for Java consumer
    >Fourth message for Java consumer
    >Fifth message for Java consumer
    >

    Go to your consumer program terminal and you should see pulled messages on console as follows -

    # Key is null as messages produced from console producer always have null keys
    Message Key: null, Value: First message for Java consumer
    Message Key: null, Value: Second message for Java consumer
    Message Key: null, Value: Third message for Java consumer
    Message Key: null, Value: Fourth message for Java consumer
    Message Key: null, Value: Fifth message for Java consumer

     

    Understanding Consumer Groups


    Consumer groups are quite useful and enable us to do parallel processing using multiple consumer instances for topics with multiple partitions. In a consumer group, each consumer instance pulles messages from different partition(s).

    E.g. If we had a topic with 4 partitons and 2 consumer instances pulling data from that topic, each consumer instance will be pulling messages from 2 partitions. If we increase number of consumer instance to 3, 1 consumer will be pulling messages from 2 partitions while rest of consumers will only be pulling from 1 partition each. Likewise, if we further increase consumer instances to 4, each consumer instance will be pulling messages from exactly one partition. However, if we have more consumer instances than number of partitions, additional consumer instances will be idle and only act as failover for other instances.

    Please note that above is true only for consumer instances belonging to same consumer group.

    Let's see all this in action by taking following steps - 

    Create a topic named multi-partition-topic (not a good name though) with 4 partitions by executing following command from Kafka home directory - 

    # Change Zookeeper url, no of partitions and replication factor as applicable to your scenario
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic multi-partition-topic --partitions 4 --replication-factor 1

    Execute following command from a terminal window (we will call it producer terminal from now on) to publish messages to newly created topic.

    # Publish messages on topic: test-topic
    ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic multi-partition-topic

    Start two consumer instances from two terminal windows (consumer 1 terminal & consumer 2 terminal) using following command -

    # It assumes that your Kafka is running locally on port 9092
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar multi-partition-topic test-consumer-group

    Go to producer window and publish following four messages and you will see 2 of these messages picked up by each consumer instance - 

    >message1
    >message2
    >message3
    >message4
    >

    Now let's start another 2 consumer instances with same consumer command from 2 different terminals (consumer 3 & consumer 4 terminals) - 

    # It assumes that your Kafka is running locally on port 9092
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar multi-partition-topic test-consumer-group

    Let's now go back to producer window and publish following four messages and you will see 1 of these messages picked up by each consumer instance - 

    >message5
    >message6
    >message7
    >message8
    >

    It's time to start another (5th) consumer instance from a different terminal (consumer 5 terminal) using same command -

    # It assumes that your Kafka is running locally on port 9092
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar multi-partition-topic test-consumer-group

    Let's now go back to producer window and publish following four messages and you will see that one of consumer instance will not get any message - 

    >message9
    >message10
    >message11
    >message12
    >

    Finally, terminate any of consumer instances and send 4 messages again to check each message is still picked by rest of consumer instances. As we can see, adding consumers to a group provide us with distributed and parallel processing of messages.

    On the other hand, if we want all of our consumer instances to receive all messages, please ensure that you start them with different consumer group id. Let's see this in action -

    Close all running consumer instances and re-start those using following respective commands for each consumer instance -

    # It assumes that your Kafka is running locally on port 9092
    
    # Command for consumer instance 1
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar multi-partition-topic test-consumer-group1
    
    # Command for consumer instance 2
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar multi-partition-topic test-consumer-group2
    
    # Command for consumer instance 3
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar multi-partition-topic test-consumer-group3
    
    # Command for consumer instance 4
    java -jar simple-kafka-consumer-0.0.1-SNAPSHOT.jar multi-partition-topic test-consumer-group4

    Go to producer window and publish following four messages and you will see all of these messages picked up by each consumer instance - 

    >message13
    >message14
    >message15
    >message16
    >

     

    As we saw in this tutorial, Apache Kafka provides us with ability for distributed/parallel processing within an application using consumer group as well as ability to receive all messages across different applications.

     

     

    Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.

    Posted By: Amit Kumar
    Published on: 2018-05-25 16:05:46

    Comment Form is loading comments...