This tutorial explains how to implement Request / Response paradigm with Apache Kafka using Akka Actors in Java
Apache Kafka or any messaging system is typically used for asynchronous processing wherein client sends a message to Kafka that is processed by background consumers. However, I came across a requirement of implementing request/response paradigm on top of Apache Kafka to use same platform to support both sync and async processing.
In order to achieve this, we need a "correlation id" like concept where we send a message to Kafka and then wait for corresponding response message. While few traditional messaging systems support these concepts, Apache Kafka does not provide this in order to keep things simple at its end.
If we isolate this problem, we just need a mechanism that allows Kafka message consumer to notify corresponding client request thread with data. While we can model our problem using Producer consumer and try to solve it using Java ThreadPools and basic synchronization mechanisms, it will create problem from maintenance and performance perspective. Moreover, we would also need to build clustering support as topic could have multiple partitions and hence messages could go to different consumers(JVMs) all together.
In order to make things more scalable and clean, we will be employing Akka ActorSystem to map response messages pulled by message consumers to corresponding client request threads. While we will be discussing implementation approach from Kafka perspective, it could easily be implemented for other asynchronous systems by tweaking consumer and producer parts.
Here are the pre-requisites needed to follow this tutorial effectively -
- Java 8 or higher
- Kafka Cluster with version 1.0.0 (lower versions could also work by updating kafka-clients version in pom.xml)
- Basic knowledge of Apache Kafka and Akka Actors
Let's start with below component(actually class) diagram of our demo application -
Here are various components(classes) mentioned in abive diagram -
- KafkaMessageProcessor - This class emulates message processing functionality and just pulls messages from request-topic and routes these to response-topic. In real use cases, you would have a different application or framework responsible for processing messages.
- KafkaMessageProducer - This class is responsible for publishing messages to Kafka request-topic. This operation is synchronous and takes place in client request thread.
- KafkaResponseActor - This class represents Akka actors with same name as message keys.This class is responsible for matching client request threads with corresponding response messages.
- KafkaMessageConsumer - This class exceutes in a background thread and pull messages from Kafka response-topic. Once it gets messages, it selects Actor with same name as message key and send Kafka response message to it.
- KafkaRequestResponseDemo - This class represents entry point of this application and is responsible for creating a unique key for each message, creating actor with same name as key, sending message to Kafka request-topic using KafkaMessageProducer, sending ResponseWait message to Actor and waiting for message to be received using Akka's Inbox API.
Code for this application can be found here on Github.
Running Request/Response Demo
It's now time to execute our demo by taking following steps -
Check that your Kafka service is running and accessible by creating two topics: request-topic and response-topic by executing following commands from Kafka home directory -
# Change Zookeeper url, no of partitions and replication factor as applicable to your scenario # Create request-topic ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic request-topic --partitions 1 --replication-factor 1 # Create response-topic ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic response-topic --partitions 1 --replication-factor 1
Verify that topics have been created successfully using following command -
# Change ZooKeeper Url as applicable ./bin/kafka-topics.sh --list --zookeeper localhost:2181
Download runnable jar file of demo from Github dist directory and execute it using following command -
# Pass system property(-Dkafka.bootstrap.servers=<kafkahost:port>) if Kafka is not running on localhost:9092 java -jar kafka-akka-integration-0.0.1-SNAPSHOT.jar
If everything works fine, your program will produce output similar to below -
[Processor]Received message with key: customer_123_b460f1ed-d22b-46d6-a2a5-e428ddcfc0cc [Consumer]Received message with key: customer_123_b460f1ed-d22b-46d6-a2a5-e428ddcfc0cc [INFO] [12/27/2017 08:55:17.698] [main] [akka.actor.ActorSystemImpl(kafka-request-response)] Message Found: true
Note: If your Kafka setup is slow, you may see "TimeoutException" with message "deadline passed". In this case, you can increase message wait timeout by passing in system property as -Dmessage.wait.timeout=10000 in command.
Since Akka approach gives us cluster functionality, this demo could be extended transparently to mutiple instances.
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.