This tutorial provides the steps to implement a basic Apache Kafka producer in Java
Abstract
Apache Kafka is one of mature and reliable service when it comes to stream processing. It servers as a buffer of stream of messages(events) for various types of consumers.
In this tutorial, we will learn about developing producer programs using Java to publish events over Apache Kafka.
Pre-requisites
Here are the pre-requisites needed to follow this tutorial effectively -
- Java 8 or higher
- Kafka Cluster with version 1.0.0 (lower versions could also work by updating kafka-clients version in pom.xml)
- Basic knowledge of Apache Kafka
Implementation Steps
Let's start with creating basic Maven based Java project and add following dependency to pom.xml:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
We will be starting with following SimpleKafkaProducer class, responsible for sending messages to Kafka -
package com.aksain.kafka.producers;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* <code>{@link SimpleKafkaProducer}</code> exposes methods to send messages to Kafka. It is implemented based on
* Singleton to avoid creation of multiple {@link KafkaProducer} instances.
*/
public class SimpleKafkaProducer {
private static final SimpleKafkaProducer INSTANCE = new SimpleKafkaProducer();
private final Producer<String, String> producer;
private SimpleKafkaProducer() {
// Set some properties, ideally these would come from properties file
final Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka brokers in format host1:port1,host2:port2
props.put("acks", "1"); // 0 for no acknowledgements, 1 for leader acknowledgement and -1 for all replica acknowledgements
props.put("linger.ms", "1"); // Frequency of message commits to Kafka
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
/**
* Factory method to get instance.
*
* @return instance of {@link SimpleKafkaProducer}
*/
public static SimpleKafkaProducer getInstance() {
return INSTANCE;
}
/**
* Sends message with input key and value to specified topic name.
*
* @param topicName name of topic to publish messages to
* @param key key of message
* @param value payload of message
*/
public void send(String topicName, String key, String value) {
producer.send(new ProducerRecord<>(topicName, key, value));
}
/**
* Releases pool of buffer space that holds records that haven't yet been transmitted to the server as well as a
* background I/O thread that is responsible for turning these records into requests and transmitting them to
* the cluster.
*/
public void close() {
producer.close();
}
}
And here is KafkaProducerDemo class that contains main method and is responsible for getting SimpleKafkaProducer instance and send 10 messages to Kafka.
package com.aksain.kafka.main;
import com.aksain.kafka.producers.SimpleKafkaProducer;
/**
* @author Amit Kumar
*/
public class KafkaProducerDemo {
public static void main(String[] args) {
final String topicName = "test-topic";
// Get Producer instance in try with resource to get it closed automatically
final SimpleKafkaProducer simpleKafkaProducer = SimpleKafkaProducer.getInstance();
// Send 10 messages
System.out.println("Sending 10 messages to Kafka...");
for (int i = 0; i < 10; i++) {
simpleKafkaProducer.send(topicName, "Key" + i, "Sample Message " + i);
}
// Release producer resources and commit any pending messages. In real cases, it will be done at
// application shutdown. E.g. PreDestroy in Spring
simpleKafkaProducer.close();
System.out.println("Sent 10 messages to Kafka successfully!!!");
// Consume these sent messages using following command from Kafka home directory
// ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
}
}
You can also clone and setup this complete project from Github repository.
Running Message Producer Demo
We will be taking following steps to execute our Message Producer Demo -
Check that your Kafka service is running and accessible by creating a topic with name test-topic by executing following commands from Kafka home directory -
# Change Zookeeper url, no of partitions and replication factor as applicable to your scenario
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test-topic --partitions 1 --replication-factor 1
Verify that topics have been created successfully using following command -
# Change ZooKeeper Url as applicable
./bin/kafka-topics.sh --list --zookeeper localhost:2181
Execute following command from Kafka home directory to read messages that producer will be publishing on topic test-topic -
# Wait for messages on topic: test-topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
Download runnable jar file of demo from Github dist directory and execute it using following command (in a new terminal as console consumer will be running in supervision mode) -
# It assumes that your Kafka is running locally on port 9092
java -jar simple-kafka-producer-0.0.1-SNAPSHOT.jar
# Here is output of above program
Sending 10 messages to Kafka...
Sent 10 messages to Kafka successfully!!!
After your producer program completes, go back to terminal running Kafka console consumer and you should see following output there -
Sample Message 0
Sample Message 1
Sample Message 2
Sample Message 3
Sample Message 4
Sample Message 5
Sample Message 6
Sample Message 7
Sample Message 8
Sample Message 9
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.