# Java consumer seek and assign

*Read from specific partitions and offsets using seek() and assign()*

When you need to read from specific partitions or replay data from a particular offset, the `seek()` and `assign()` APIs bypass consumer groups entirely.

**What you'll learn:**
- When to use seek() and assign() instead of consumer groups
- How to read from a specific partition and offset
- How to implement a bounded consumer (read N messages)

To use these API, make the following changes:

- Remove the `group.id` from the consumer properties (we don't use consumer groups anymore).
- Remove the subscription to the topic.
- Use consumer `assign()` and `seek()` APIs.

The code with these changes is shown in the snippet below.

```java
package io.conduktor.demos.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerDemoAssignSeek {
    public static void main(String[] args) {

        Logger log = LoggerFactory.getLogger(ConsumerDemoAssignSeek.class.getName());

        String bootstrapServers = "127.0.0.1:9092";
        String topic = "demo_java";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // assign and seek are mostly used to replay data or fetch a specific message

        // assign
        TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
        long offsetToReadFrom = 7L;
        consumer.assign(Arrays.asList(partitionToReadFrom));

        // seek
        consumer.seek(partitionToReadFrom, offsetToReadFrom);

        int numberOfMessagesToRead = 5;
        boolean keepOnReading = true;
        int numberOfMessagesReadSoFar = 0;

        // poll for new data
        while(keepOnReading){
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records){
                numberOfMessagesReadSoFar += 1;
                log.info("Key: " + record.key() + ", Value: " + record.value());
                log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
                if (numberOfMessagesReadSoFar >= numberOfMessagesToRead){
                    keepOnReading = false; // to exit the while loop
                    break; // to exit the for loop
                }
            }
        }

        log.info("Exiting the application");

    }
}
```

Here we want to read messages from the offset `7` of partition `0` of the topic `demo_java`.

## Partition offset

Make sure the partition offset of partition `0` of the topic `demo_java` is at least 7. Produce a number of messages to the topic to achieve that.

Run the application. The console of the application will display the selected messages.

```
[main] INFO Subscribed to partition(s): demo_java-0
[main] INFO Seeking to offset 7 for partition demo_java-0
[main] INFO Cluster ID: 8fresFx9R4Kod8UqUrixrg
[main] INFO ConsumerDemoAssignSeek - Key: id_7, Value: hello world 7
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:7
[main] INFO ConsumerDemoAssignSeek - Key: id_8, Value: hello world 8
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:8
[main] INFO ConsumerDemoAssignSeek - Key: id_9, Value: hello world 9
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:9
[main] INFO ConsumerDemoAssignSeek - Key: id_0, Value: hello world 0
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:10
[main] INFO ConsumerDemoAssignSeek - Key: id_1, Value: hello world 1
[main] INFO ConsumerDemoAssignSeek - Partition: 0, Offset:11
[main] INFO ConsumerDemoAssignSeek - Exiting the application
```

## Next steps

- [Consumer in threads](https://www.conduktor.io/kafka/java-consumer-in-threads) for multi-threaded consumers
- [Rebalance listener](https://www.conduktor.io/kafka/java-consumer-rebalance-listener) for manual offset management
- [Consumer concepts](https://www.conduktor.io/kafka/kafka-consumers) for foundational understanding
