This tutorial will provide you with Java program to consume messages from Apache Kafka using Apache Storm.
Abstract
Now a days, quite a few systems are modeled using Event Stream processing paradigm. These applications see the client requests or state changes as set of Events (immutable messages). In order to achieve scalability and reliability, these events are then published on to a messaging system. This messaging system is subscribed by different applications responsible for processing the events.
This paradigm requires us to integrate messaging systems with computing applications. In this tutorial, we will be looking at integration of Apache Kafka (a distributed publish-subscribe messaging system) with Apache Storm (a distributed real-time processing engine).
Note: Instructions in this tutorial will only work with Kafka client api version 0.8.2.2 and below. Since Kafka has drastically changed its consumer api, new Storm Kafka client api was developed that we will cover in subsequent tutorials. You can subscribe to our tutorials to get notified through email whenever we publish a new tutorial.
Pre-requisites
Here are pre-requisites to follow instructions in this tutorial effectively -
- Basic knowledge of Apache Storm
- Basic knowledge of Apache Kafka
- Running cluster of Apache ZooKeeper (required by Kafka)
- Running cluster of Apache Kafka
- Eclipse with Maven Plugin
Setting Up Maven Java Project
Creating Maven Project:
We will be starting with creating a Maven project in Eclipse IDE by following below steps -
- Open New Project wizard in Eclipse IDE as shown below:
- On next screen, select option Create a simple project to create quick project as below:
- Enter Group Id and Artifiact Id on next screen and finally click on Finish to create the project as below:
At this point, you will start seeing your new project (in my case, it is kafka-storm-integration) in Project Explorer.
Adding Kafka and Storm Dependencies:
Here is how our pom.xml will look like after adding dependencies for Storm and Kafka -
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aksain.kafka.integrations</groupId>
<artifactId>kafka-storm-integration</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<storm.version>1.0.2</storm.version>
<kafka.version>0.8.2.2</kafka.version>
</properties>
<dependencies>
<!-- Storm library -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- Storm-Kafka integration library -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- Kafka client libraries as Storm-Kafka integration library does not include these -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<!-- Excluded to avoid version issues between Kafka zookeeper api and
Storm-kafka zookeeper api -->
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<!-- Excluded to avoid Pre-emptive StackOverflowException due to version/implementation
issues between Kafka slf4j/log4j api and Storm slf4j/log4js api -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
After adding this dependency, Eclipse will automatically start downloading the libraries from Maven repository. Please be patient as it may take a while for Eclipse to download the jars and build your project.
Writing a program for integrating Kafka with Storm
Since we are ready with our Java project and all the required dependencies, it's time to write some code!
We will be starting with writing a class called LoggerBolt that will log the messages consumed from Kafka -
package com.aksain.kafka.storm.bolt;
import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
/**
* @author Amit Kumar
*/
public class LoggerBolt extends BaseBasicBolt{
private static final long serialVersionUID = 1L;
private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
LOG.info(input.getString(0));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}
We however would not create a spout for Kafka as Storm provides an out of the box implementation called KafkaSpout for consuming messages. We will only be passing SpoutConf object containing following details from command line arguments -
- ZooKeeper host urls such as localhost:2181
- Topic Name
- ZooKeeper root path (path where kafka configurations are managed in ZooKeeper)
- Consumer id
Here is the main Demo class responsible for creating topology with KafkaSpout and LoggerBolt and finally submitting it to Local cluster (embedded version of Storm in Eclipse) -
package com.aksain.kafka.storm;
import java.util.HashMap;
import org.apache.log4j.Logger;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import com.aksain.kafka.storm.bolt.LoggerBolt;
/**
* @author Amit Kumar
*/
public class KafkaStormIntegrationDemo {
private static final Logger LOG = Logger.getLogger(KafkaStormIntegrationDemo.class);
public static void main(String[] args) {
// Log program usages and exit if there are less than 4 command line arguments
if(args.length < 4) {
LOG.fatal("Incorrect number of arguments. Required arguments: <zk-hosts> <kafka-topic> <zk-path> <clientid>");
System.exit(1);
}
// Build Spout configuration using input command line parameters
final BrokerHosts zkrHosts = new ZkHosts(args[0]);
final String kafkaTopic = args[1];
final String zkRoot = args[2];
final String clientId = args[3];
final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
// Build topology to consume message from kafka and print them on console
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// Create KafkaSpout instance using Kafka configuration and add it to topology
topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
//Route the output of Kafka Spout to Logger bolt to log messages consumed from Kafka
topologyBuilder.setBolt("print-messages", new LoggerBolt()).globalGrouping("kafka-spout");
// Submit topology to local cluster i.e. embedded storm instance in eclipse
final LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafka-topology", new HashMap<>(), topologyBuilder.createTopology());
}
}
Finally, let's put a log4j.properties file with following contents into src/main/resources to enable logging -
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# Root logger option
log4j.rootLogger=INFO, stdout
Executing the program
It's now time to execute our program. However, we first need to ensure that we have a topic with some messages in our Apache Kafka cluster.
You can execute following command from your Kafka home directory to create a topic called 'storm-test-topic' -
./bin/kafka-topics.sh --create --topic storm-test-topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Execute following command to verify whether topic has been created successfully -
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic storm-test-topic
You can now publish messages to this new topic using following command. Simply write messages on console and press enter to send.
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic storm-test-topic
Since now we have a topic with few messages, let's run our program to consume these messages.
We can simply submit this program to embedded Storm cluster in Eclipse just like any other Java program with command line arguments - Right Click On KafkaStormIntegrationDemo Program -> Run As -> Run Configurations...
In Run Configurations... dialog box, switch to Arguments tab and add following to Program arguments -
localhost:2181 storm-test-topic /brokers storm-consumer
Once program executes successfully, it will take some time to start Storm and then connect to ZooKeeper and Kafka. After it has activated our topology, we will start kafka messages in console. Here are some of the message logs that i got printed on my console -
22377 [Thread-16-print-messages-executor[3 3]] INFO c.a.k.s.b.LoggerBolt - First message sent by Kafka console producer
22379 [Thread-16-print-messages-executor[3 3]] INFO c.a.k.s.b.LoggerBolt - Second message to be consumed by Storm
22380 [Thread-16-print-messages-executor[3 3]] INFO c.a.k.s.b.LoggerBolt - Yet another message for Storm to consume and log on console
22383 [Thread-16-print-messages-executor[3 3]] INFO c.a.k.s.b.LoggerBolt - This is demo message for demo of kafka and storm integration
You can infact send messages while program is running and those messages will be consumed and logged in eclipse console by Storm.
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.