Possible Reasons Why a Kafka Topic Is Not Being Compacted

There are several possible reasons why a topic, topic partition, or topic partition replica is not being compacted by the Kafka log cleaner thread. One common side effect of this problem is that the entity consumes a disproportionate amount of storage space. In this post, I share several possible reasons why topic compaction would have failed.

1) Broker/Topic cleanup.policy is not set to ‘compact’, or log.cleaner.enable is disabled

The two basic configurations that need to be set for compaction to happen are:

  • log.cleaner.enable (broker config)
    • As of Kafka 0.9.0.1, this config has a default value of “true”. Most clusters will leave this as the default value, unless under special requirements.
  • log.cleanup.policy (broker config), or cleanup.policy (topic config)
    • The order of precedence is: topic config > broker config > Kafka default (“delete”)

As a sanity check, ensure that you have these two configs already set to the right values.

Note: Compaction will still occur even if retention.ms is set to “-1”, as this only affects topics that have the “delete” retention policy.

2) There is only one active segment

The official Apache Kafka documentation on log cleaner states that “the active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.”

Assuming your topic partition folder in the broker looks something like this:

$ ls <TOPIC_PARTITION_LOG_FOLDER_PATH>
00000000000000000000.index
00000000000000000000.log
00000000000000000000.snapshot
00000000000000000000.timeindex
00000000000000058102.index
00000000000000058102.log
00000000000000058102.snapshot
00000000000000058102.timeindex

(EOF)

Then you would have two log segments for the topic partition – “00000000000000000000” and “00000000000000058102” (active). In this case, the latter log segment will not be considered for compaction.

The topic should be compacted by the log cleaner soon, unless one of the following happens (see sections below).

3) Topic is marked as uncleanable

Although this is relative rare, I have personally encountered a topic partition replica “X” being marked as uncleanable due to a log segment corruption. Replica “X” was occupying 300GB of storage, while the other replicas were only 500MB.

When I looked at the broker logs, I noticed the following error:

[2022-01-01 00:00:00,000] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log LOG(dir=<LOG_DIR_PATH>, topic=<TOPIC_NAME>, partition=<PARTITION>, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=123456789). Marking its partition (<TOPIC>-<PARTITION>) as uncleanable (kafka.log.LogCleaner)

kafka.log.LogCleaningException: Batch size 12345 < 987654321, but not processed for log segment <LOG_DIR_PATH>/00000000000000045678.log at position 5123456789
  at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:356)
  at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332)
  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

Caused by: java.lang.IllegalStateException: Batch size 12345 < 987654321, but not processed for log segment <LOG_DIR_PATH>/00000000000000045678.log at position 5123456789
  at kafka.log.Cleaner.growBuffersOrFail(LogCleaner.scala:745)
  at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:983)
  ...
  at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:904)
  at kafka.log.Cleaner.doClean(LogCleaner.scala:523)
  at kafka.log.Cleaner.clean(LogCleaner.scala:511)
  at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:380)
  at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:352)
  ... 3 more

To fix this, 1) delete the log segment’s .index, .timeindex, and .snapshot files, and 2) restart the broker. A broker restart is required as the topic partition evaluation is only done at bootstrap time.

When the broker is restarted, you will see a line like this in the broker logs:

[2022-01-01 00:10:00,000] ERROR [Log partition=<TOPIC_NAME>-<PARTITION>, dir=<LOG_DIR_PATH>] Could not find offset index file corresponding to log file <LOG_DIR_PATH>/00000000000000045678.log, recovering segment and rebuilding index files... (kafka.log.Log)

The time taken to recover the segment and rebuild the index files is mainly dependent on your broker’s available resources (CPU/memory), log segment size, and disk I/O. Once the indexes are recreated, the broker will continue with the rest of the start up process.

4) Log segment is below the min.cleanable.dirty.ratio threshold

The min.cleanable.dirty.ratio determines the “minimum percentage of the partition log that must be “dirty” before Kafka attempts to compact messages (source).” By default, this is set to 0.5 (i.e. 50% of all records in the log segment must be marked as “dirty” before the Kafka log cleanre considers it as a candidate for log compaction.

While this default configuration value likely works for a majority of use cases, there are edge cases where it is ineffective:

  • Assume a log segment has only 6 entries, and from T1 to T4, a unique key is published to the topic
  • At time T5 and T6, a new value appears, each using an existing key
  • At time T7, Kafka uses a new log segment for future records, making the original log segment a candidate for log compaction cleanup
  • However, since the percentage of dirty records (33%) is less than the default threshold of 50%, this log segment is never cleaned up
Time   
T1      [k1:v1] 
T2      [k1:v1, k2:v2]
T3      [k1:v1, k2:v2, k3:v3]
T4      [k1:v1, k2:v2, k3:v3, k4,v4]
T5      [k1:v1 (delete marker), k2:v2, k3:v3, k4,v4, k1,vv1]
T6      [k1:v1 (delete marker), k2:v2 (delete marker), k3:v3, k4,v4, k1,vv1, k2,vv2]

At time T6, number of dirty records is 2/6. As 2/6 is less than 50%, the old log segment is not considered a eligible for cleanup. 

If you have reached this stage and believe that your topic might be suffering from this, you can set a lower min.cleanable.dirty.ratio threshold. Doing so will make the log cleanup thread less tolerable to dirty records, and be more aggressive in cleaning up log segments. Here is an example Kafka CLI command to set this config this config:

$ kafka-configs --bootstrap-server <SERVER_LIST> --alter --add-config min.cleanable.dirty.ratio=<VALUE> --entity-type topics --entity-name <TOPIC_NAME>

5) There are many unique keys

Another scenario to consider is that the topic has a lot of unique keys, legitimately preventing compaction from happening. The default topic configuration is that there is no size limitation, and the retention period is one week.

Depending on your application requirements, you can shorten the retention period/size and set the cleanup.policy to be “compact,delete“, so that older unique keys are deleted as well. While setting this will not trigger a compaction, it would help prevent runaway topic sizes.

6) Log cleaner thread is overwhelmed

Sometimes, the log cleaner thread(s) is/are inundated with a lot of cleanup work, and it can take a while before it processes the topic you want. You can wait it out, and use the kafka-log-dirs CLI command to get and update of how much storage the topic partition (and its replicas) are using.

Alternatively, you can set a custom value of log.cleaner.threads broker config, which defines the number of background threads used for log cleaning. By default, this value is 1, and this might not be a good default for clusters with a lot of topics. While there is no official guidelines on the number to set, a good idea is to not set more than the number of CPU cores available to the Kafka broker.

Summary

Hopefully you are able to understand why your Kafka topic is not being compacted, and have found a solution to it. You can find out other log cleanup configurations (prefix “log.cleaner“) from the official Apache Kafka documentation.

The above list of causes and fixes is not exhaustive – if you have other tips or past encounters with problems like this, feel free to share as a comment in this post. Thanks!

Leave a comment