This tutorial will provide you with the instructions to configure message retention time in Apache Kafka.
Abstract
Unlike many other messaging systems, Apache Kafka follows a message retention policy that is independent of its consumers/subscribers. Hence, it is very important to configure correct/appropriate retention policy for messages to avoid any data loss.
For example, If we configure message retention time too less, we run the risk of loosing messages before these have been processed by all of our applications. On the other hand, retention time is too high, these messages would be siting useless in Apache partitions consuming system resources.
While optimum message retention policy is very subjective to use cases, we will talk about options provided by Apache Kafka to configure message retention.
Configuring Message Retention
Apache Kafka uses Log data structure to manage its messages. Log data structure is basically an ordered set of Segments whereas a Segment is a collection of messages. Apache Kafka provides retention at Segment level instead of at Message level. Hence, Kafka keeps on removing Segments from its end as these violate retention policies.
Apache Kafka provides us with the following retention policies -
- Time based Retention
- Size based Retention
Time based Retention Policy
Under this policy, we configure the maximum time a Segment (hence messages) can live for. Once a Segment has spanned configured retention time, it is marked for deletion or compaction depending on configured cleanup policy. Default retention time for Segments is 7 days.
Here are the parameters (in decreasing order of priority) that you can set in your Kafka broker properties file:
# Configures retention time in milliseconds
log.retention.ms=1680000
# Used if log.retention.ms is not set
log.retention.minutes=1680
# Used if log.retention.minutes is not set
log.retention.hours=168
Apart from Kafka Broker level configuration, it is also possible to configure retention time by means of Topic level configuration using alter command. E.g.. below command can be used to set retention time as 1680 seconds for a Topic with name my-topic:
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000
If required, it is possible to remove Topic level retention time configuration using below command -
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --delete-config retention.ms
Note: Topic level configuration will always override Broker level configurations.
Size based Retention Policy
In this policy, we configure the maximum size of a Log data structure for a Topic partition. Once Log size reaches this size, it starts removing Segments from its end. This policy is not popular as this does not provide good visibility about message expiry. However it can come handy in a scenario where we need to control the size of a Log due to limited disk space.
Here are the parameters that you can set in your Kafka broker properties file:
# Configures maximum size of a Log
log.retention.bytes=104857600
Apart from Kafka Broker level configuration, it is also possible to configure retention size by means of Topic level configuration using alter command. E.g.. below command can be used to set retention size as 100MB for a Topic with name my-topic:
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.bytes=104857600
If required, it is possible to remove Topic level retention time configuration using below command -
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --delete-config retention.bytes
Note: Topic level configuration will always override Broker level configurations.
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.