Java Consumer in Threads

A quick lesson for advanced users on running Kafka Consumers in a separate thread


Running a Java Consumer in a separate thread allows you to perform other tasks in the main thread.

This is only recommended if you know about multi-threaded programming, so we will keep this page brief. Most use cases will not require you to use threads.

As multi-threading can be complicated for those who have not used it before, we will leave the advanced users the responsibility of analyzing the code below.

Consumer in Threads Sample code

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 package io.conduktor.demos.kafka; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Date; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class ConsumerDemoThreads { public static void main(String[] args) { ConsumerDemoWorker consumerDemoWorker = new ConsumerDemoWorker(); new Thread(consumerDemoWorker).start(); Runtime.getRuntime().addShutdownHook(new Thread(new ConsumerDemoCloser(consumerDemoWorker))); } private static class ConsumerDemoWorker implements Runnable { private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWorker.class); private CountDownLatch countDownLatch; private Consumer<String, String> consumer; @Override public void run() { countDownLatch = new CountDownLatch(1); final Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-sixth-application"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("demo_java")); final Duration pollTimeout = Duration.ofMillis(100); try { while (true) { final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); for (final ConsumerRecord<String, String> consumerRecord : consumerRecords) { log.info("Getting consumer record key: '" + consumerRecord.key() + "', value: '" + consumerRecord.value() + "', partition: " + consumerRecord.partition() + " and offset: " + consumerRecord.offset() + " at " + new Date(consumerRecord.timestamp())); } } } catch (WakeupException e) { log.info("Consumer poll woke up"); } finally { consumer.close(); countDownLatch.countDown(); } } void shutdown() throws InterruptedException { consumer.wakeup(); countDownLatch.await(); log.info("Consumer closed"); } } private static class ConsumerDemoCloser implements Runnable { private static final Logger log = LoggerFactory.getLogger(ConsumerDemoCloser.class); private final ConsumerDemoWorker consumerDemoWorker; ConsumerDemoCloser(final ConsumerDemoWorker consumerDemoWorker) { this.consumerDemoWorker = consumerDemoWorker; } @Override public void run() { try { consumerDemoWorker.shutdown(); } catch (InterruptedException e) { log.error("Error shutting down consumer", e); } } } }

Output

1 2 3 4 5 6 7 8 9 10 [Thread-0] Consumer poll woke up [Thread-0] Revoke previously assigned partitions demo_java-0 [Thread-0] Member consumer-my-sixth-application-1-fecba584-5838-418d-83b8-c88a528bc0e5 sending LeaveGroup request to coordinator 127.0.0.1:9094 (id: 2147483644 rack: null) due to the consumer is being closed [Thread-0] Resetting generation due to: consumer pro-actively leaving the group [Thread-0] Request joining group due to: consumer pro-actively leaving the group [Thread-0] Metrics scheduler closed [Thread-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter [Thread-0] Metrics reporters closed [Thread-0] App info kafka.consumer for consumer-my-sixth-application-1 unregistered [Thread-1] Consumer closed

Was this content helpful?
0
0
PreviousJava Consumer Seek and Assign
NextKafka Advanced Concepts
Share this page.

Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.