Java Consumer Seek and Assign
Learn how to use the Seek() and Assign() APIs for your Kafka Consumer with Java
In case you are looking to read specific messages from specific partitions, the .seek()
and .assign()
API may help you.
These APIs are also helpful to replay data from a specific offset.
To use these API, make the following changes:
Remove the
group.id
from the consumer properties (we don't use consumer groups anymore)Remove the subscription to the topic
Use consumer
assign()
andseek()
APIs
The code with these changes is shown in the snippet below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package io.conduktor.demos.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerDemoAssignSeek {
public static void main(String[] args) {
Logger log = LoggerFactory.getLogger(ConsumerDemoAssignSeek.class.getName());
String bootstrapServers = "127.0.0.1:9092";
String topic = "demo_java";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// assign and seek are mostly used to replay data or fetch a specific message
// assign
TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
long offsetToReadFrom = 7L;
consumer.assign(Arrays.asList(partitionToReadFrom));
// seek
consumer.seek(partitionToReadFrom, offsetToReadFrom);
int numberOfMessagesToRead = 5;
boolean keepOnReading = true;
int numberOfMessagesReadSoFar = 0;
// poll for new data
while(keepOnReading){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
numberOfMessagesReadSoFar += 1;
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
if (numberOfMessagesReadSoFar >= numberOfMessagesToRead){
keepOnReading = false; // to exit the while loop
break; // to exit the for loop
}
}
}
log.info("Exiting the application");
}
}
Here we want to read messages from the offset 7
of partition 0
of the topic demo_java
.
Make sure the partition offset of partition 0
of the topic demo_java
is at least 7. Produce a number of messages to the topic to achieve that.
Run the application. The console of the application will display the selected messages.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[main] INFO Subscribed to partition(s): demo_java-0
[main] INFO Seeking to offset 7 for partition demo_java-0
[main] INFO Cluster ID: 8fresFx9R4Kod8UqUrixrg
[main] INFO ConsumerDemoAssignSeek - Key: id_7, Value: hello world 7
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:7
[main] INFO ConsumerDemoAssignSeek - Key: id_8, Value: hello world 8
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:8
[main] INFO ConsumerDemoAssignSeek - Key: id_9, Value: hello world 9
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:9
[main] INFO ConsumerDemoAssignSeek - Key: id_0, Value: hello world 0
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:10
[main] INFO ConsumerDemoAssignSeek - Key: id_1, Value: hello world 1
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:11
[main] INFO ConsumerDemoAssignSeek - Exiting the application
Conduktor & Kafka Consumer Seek / Assign
You've seen how to perform all these tasks using the Kafka Consumer Java API, but there's no need to keep using such a painful method. Conduktor Platform can consume data from Kafka from any specific topic, partition, or offsets within a partition; you can consume infinitely or just a defined amount; export the data to CSV; filter the result set; and way more! Try it now!