Monday, July 4, 2016

Deal with corrupted messages in Apache Kafka

Under some strange circumstances it can happen that a message in a Kafka topic is corrupted. This happens often by using 3rd party frameworks together with Kafka. Additionally, Kafka < 0.9 has no lock at Log.read() at the consumer read level, but has a lock on Log.write(). This can cause a rare race condition, as described in KAKFA-2477 [1]. Probably a log entry looks like:

ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$) kafka.message.InvalidMessageException: Message is corrupt (stored crc = xxxxxxxxxx, computed crc = yyyyyyyyyy

Kafka-Tools

Kafka stores the offset of every consumer in Zookeeper. To read out the offsets, Kafka provides handy tools [2]. But also zkCli.sh can be used, at least to display the consumer and the stored offsets. First we need to find the consumer for a topic (> Kafka 0.9):

bin/kafka-consumer-groups.sh --zookeeper management01:2181 --describe --group test

Prior to Kafka 0.9 the only possibility to get this informations was to use zkCli.sh (or similar tools) to find the consumer group. Since the debug with zkCli is a bit frustrating, I personally use kafka-manager from Yahoo [3]. 
Let's assume the consumers are stored in Zookeeper under /consumer, the command to find the offset looks like:

ls /consumer/test/offsets
[1]
get /consumer/test/offsets/1
[15]

With Kafka that command would look like:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-1 --zookeeper zknode1:2181

Group     Topic   Pid   Offset   logSize   Lag   Owner
console-1 test    1     15       337       326   none


After the offset was found, this offset can be incremented to force the consumer to read the next available message. Before doing this, Kafka has to be shutdown. 

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest 16 test

After restart, Kafka should be able read the next message, in the case this message isn’t corrupted, too. And yes, the corrupted message is lost and can’t be restored, so it's always a good idea to implement a CRC check before any message gets to Kafka.

A code based approach is also available [4]. For that a subclass of the ConsumerIterator has to be created, which will catch the message exception, replace it with a dummy message and proceed with the next message. Of course the corrupted message is lost in that case, too.