Java Consumer Rebalance Listener
Advanced Kafka Programming Tutorial for Rebalancing Listeners in your Kafka Consumer with Java
Consumer rebalances happen for the following events:
Number of partitions change for any of the subscribed topics
A subscribed topic is created or deleted
An existing member of the consumer group is shutdown or fails
A new member is added to the consumer group
When any of these events are triggered, the provided listener will be invoked twice: first to indicate that the consumer's assignment has been revoked, and then again when the new assignment has been received.
In that case, we have a chance to commit offsets and some cleanup work before our partition is revoked. This includes maybe closing database connections, etc..
If we handle this case gracefully, we will not process duplicate messages through rebalances.
Note that rebalances will only occur during an active call to poll(Duration)
, so callbacks will also only be invoked during that time. You do not need to worry about threading because calls happen in the same consumer thread.
Excerpt from the ConsumerRebalanceListener documentation:
One common use is saving offsets in a custom store. By saving offsets in the onPartitionsRevoked(Collection)
call we can ensure that any time partition assignment changes the offset gets saved.
Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example, consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the number of page views per user for each five-minute window. Let's say the topic is partitioned by the user id so that all events for a particular user go to a single consumer instance. The consumer can keep in memory a running tally of actions per user and only flush these out to a remote data store when its cache gets too big. However if a partition is reassigned, your consumer may want to automatically trigger a flush of this cache, before the new owner takes over consumption.
Note that callbacks only serve as notification of an assignment change. They cannot be used to express acceptance of the change. Hence throwing an exception from a callback does not affect the assignment in any way, as it will be propagated all the way up to the KafkaConsumer.poll(java.time.Duration)
call. If user captures the exception in the caller, the callback is still assumed successful and no further retries will be attempted.
In this example, we have created a ConsumerRebalanceListener
that keeps track of how far we have been consuming in our Kafka topic partitions.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package io.conduktor.demos.kafka;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class ConsumerRebalanceListenerImpl implements ConsumerRebalanceListener {
private static final Logger log = LoggerFactory.getLogger(ConsumerRebalanceListenerImpl.class);
private KafkaConsumer<String, String> consumer;
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
public ConsumerRebalanceListenerImpl(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
public void addOffsetToTrack(String topic, int partition, long offset){
currentOffsets.put(
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, null));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("onPartitionsRevoked callback triggered");
log.info("Committing offsets: " + currentOffsets);
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("onPartitionsAssigned callback triggered");
}
// this is used when we shut down our consumer gracefully
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
return currentOffsets;
}
}
Important things to note:
we track internally in the class the offsets of how far we have consumers using
currentOffsets
in the function
addOffsetToTrack
we make sure to increment the offset by 1 in order to commit the position properlywe use a synchronous
consumer.commitSync
call inonPartitionsRevoked
to block until the offsets are successfully committed
Here is the implementation:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package io.conduktor.demos.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemoRebalanceListener {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoRebalanceListener.class);
public static void main(String[] args) {
log.info("I am a Kafka Consumer with a Rebalance");
String bootstrapServers = "127.0.0.1:9092";
String groupId = "my-fifth-application";
String topic = "demo_java";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// we disable Auto Commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
ConsumerRebalanceListenerImpl listener = new ConsumerRebalanceListenerImpl(consumer);
// get a reference to the current thread
final Thread mainThread = Thread.currentThread();
// adding the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// join the main thread to allow the execution of the code in the main thread
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic), listener);
// poll for new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
// we track the offset we have been committed in the listener
listener.addOffsetToTrack(record.topic(), record.partition(), record.offset());
}
// We commitAsync as we have processed all data and we don't want to block until the next .poll() call
consumer.commitAsync();
}
} catch (WakeupException e) {
log.info("Wake up exception!");
// we ignore this as this is an expected exception when closing a consumer
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
try {
consumer.commitSync(listener.getCurrentOffsets()); // we must commit the offsets synchronously here
} finally {
consumer.close();
log.info("The consumer is now gracefully closed.");
}
}
}
}
Important things to note:
We disable auto commit (otherwise we wouldn't need a rebalance listener)
on every message being successfully synchronously processed, we call
listener.addOffsetToTrack(record.topic(), record.partition(), record.offset());
which allows us to track how far we've been processing in our consumerwhen we're done with a batch we call
consumer.commitAsync();
to commit offsets without blocking our consumer loop.on the consumer shutdown, we finally call again
consumer.commitSync(listener.getCurrentOffsets());
to commit one last time based on how far we've read before closing the consumer.