Kafka Streams with Spring Boot
Kafka Streams with Spring Boot two ways: spring-kafka with @EnableKafkaStreams, and the Spring Cloud Stream functional binder, plus which to pick and why.
Wire Kafka Streams into Spring Boot the right way.
There are two completely different ways to run Kafka Streams under Spring Boot, and people constantly conflate them. One is Spring for Apache Kafka (spring-kafka): you write a real StreamsBuilder topology and Spring manages its lifecycle. The other is Spring Cloud Stream: you write java.util.function.Function beans and a binder wires them to topics. Same engine underneath, very different code and config.
This page covers both, side by side, so you can tell which one a Stack Overflow answer is even talking about, and pick the right one for your service.
What you'll learn:
- The
spring-kafkaapproach:@EnableKafkaStreams,StreamsBuilderFactoryBean, andspring.kafka.streams.* - The Spring Cloud Stream functional binder:
Functionbeans and declarative bindings - When to pick which, and why mixing them up causes half the confusion
- The lifecycle, error-handling, and testing details Spring hides from you
Two libraries, one engine
Both options run the exact same org.apache.kafka:kafka-streams library underneath, the topology, state stores, changelog topics, and rebalancing all behave identically. What differs is how much Spring writes for you.
Spring for Apache Kafka (spring-kafka) | Spring Cloud Stream (KStream binder) | |
|---|---|---|
| You write | A KStream<...> topology with the DSL | A Function bean |
| Topics wired by | You, in the topology (stream(...), to(...)) | The binder, via spring.cloud.stream.bindings.* |
| Config namespace | spring.kafka.streams. | spring.cloud.stream.kafka.streams. |
| Lifecycle | StreamsBuilderFactoryBean | The binder |
| Control you keep | Full, it's a normal topology | Less, bindings are declarative |
| Good fit | One app, full control, interactive queries | Many small functions, event-driven microservices, multi-binder |
spring-kafka is closer to the metal and the better default when you want to see your topology. Spring Cloud Stream trades that visibility for declarative bindings and a uniform programming model across messaging systems. Versions. Spring for Apache Kafka 4.0 (GA Nov 2025) and Spring Cloud Stream 5.x track the Kafka 4.x clients. If you're on Spring Boot 3.x you're on the earlier
spring-kafka3.x line, and Spring Cloud Stream 4.x stays there too (kafka-clients 3.8.x). The APIs below are stable across both, but check the Spring Kafka Streams reference for your exact version.
Approach 1: spring-kafka and @EnableKafkaStreams
Add the dependency, annotate a config class with @EnableKafkaStreams, and Spring auto-configures a StreamsBuilderFactoryBean for you. You then declare one or more @Bean public KStream<...> methods that take the auto-configured StreamsBuilder and define the topology. Spring builds it and manages start()/close() for you.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency> @Configuration
@EnableKafkaStreams
public class StreamsConfig {
@Bean
public KStream<String, String> pipeline(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("words-input");
stream
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.filter((k, word) -> !word.isEmpty())
.groupBy((k, word) -> word)
.count(Materialized.as("word-counts"))
.toStream()
.to("words-output", Produced.with(Serdes.String(), Serdes.Long()));
return stream;
}
} You do not create a KafkaStreams instance or call start() yourself, the factory bean does it after the context refreshes. Config goes in application.yml under spring.kafka.streams:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=wordcount-app
# default serdes have no default since Kafka 3.0 (KIP-741); without these the app fails at startup
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties[default.value.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
# anything not exposed as a typed property goes under .properties.*
spring.kafka.streams.properties[processing.guarantee]=exactly_once_v2
spring.kafka.streams.properties[num.stream.threads]=2 Raw Kafka Streams requires both application.id (the identity that names your consumer group, internal topics, and state directory) and bootstrap.servers. Boot softens both, in opposite directions. application-id falls back to spring.application.name; the context only fails (with InvalidConfigurationPropertyValueException) when both are unset. And bootstrap-servers silently defaults to localhost:9092, so a missing broker config never fails fast: it just points your app at localhost, which is its own production footgun.
Lifecycle and error handling
The StreamsBuilderFactoryBean is where you hook the things a raw KafkaStreams exposes. Spring lets you customize it after auto-configuration:
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return factoryBean -> {
factoryBean.setStateListener((newState, oldState) ->
log.info("State {} -> {}", oldState, newState));
factoryBean.setStreamsUncaughtExceptionHandler(ex ->
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
};
} The StateListener is your signal for REBALANCING → RUNNING transitions and for the ERROR state. The uncaught-exception handler decides what happens when a stream thread dies, REPLACE_THREAD, SHUTDOWN_CLIENT, or SHUTDOWN_APPLICATION, the same decision covered in dead letter queues. Don't leave it on the default: since Kafka 2.8 (KIP-671) the default with no handler is SHUTDOWN_CLIENT, so one poison record halts the entire client (it transitions through PENDING_ERROR into ERROR and every thread stops). Set the handler deliberately, REPLACE_THREAD if you want to keep running, and alert on ERROR from the state listener.
Reaching the KafkaStreams instance
For interactive queries you need the live KafkaStreams object. Spring exposes it through the factory bean:
@Autowired
private StreamsBuilderFactoryBean factoryBean;
public Long countFor(String word) {
KafkaStreams streams = factoryBean.getKafkaStreams(); // null until the factory bean has started
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore()));
return store.get(word);
} getKafkaStreams() returns null until the factory bean has started during context refresh, and null again after stop(), so endpoints can NPE during graceful shutdown too. The null check alone isn't the right gate either: a non-null instance still rejects store(...) with InvalidStateStoreException until the state reaches RUNNING. Guard on the state or the state listener; see the interactive queries page for the full pattern.
Approach 2: Spring Cloud Stream functional binder
Spring Cloud Stream inverts the model. You don't touch StreamsBuilder. You write a bean of type Function (or Consumer for a sink), and the binder connects its input and output to topics declared in config.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency> @Bean
public Function<KStream<String, String>, KStream<String, Long>> pipeline() {
return input -> input
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.filter((k, word) -> !word.isEmpty())
.groupBy((k, word) -> word)
.count(Materialized.as("word-counts"))
.toStream();
} spring.cloud.function.definition=pipeline
spring.cloud.stream.bindings.pipeline-in-0.destination=words-input
spring.cloud.stream.bindings.pipeline-out-0.destination=words-output
spring.cloud.stream.kafka.streams.binder.application-id=wordcount-app
spring.cloud.stream.kafka.streams.binder.brokers=localhost:9092 The binding names follow the convention / -out-. The function name comes from spring.cloud.function.definition. This is the model Spring documents as current, the old annotation-based @EnableBinding / @StreamListener API was removed in Spring Cloud Stream 4.0, so any tutorial using those annotations is stale. Use the functional Function/Consumer style.
🚫 "I'll just add
@EnableKafkaStreamsto my Spring Cloud Stream app to make the state store work."
That line is the conflation in a nutshell. @EnableKafkaStreams is a spring-kafka annotation; it has no role in a Spring Cloud Stream binder app, where the binder owns the lifecycle. Mixing the two gives you two competing attempts to start Streams. Pick one stack and stay in it.
Testing, same as plain Kafka Streams
Spring changes how the topology is wired, not what it is. So the unit-testing story is unchanged: extract the topology and drive it with TopologyTestDriver, no broker and no Spring context required. That's the fast, deterministic test, covered in testing Kafka Streams.
For an @EnableKafkaStreams app, refactor the topology into a plain method or @Bean that takes a StreamsBuilder so a test can build it in isolation:
StreamsBuilder builder = new StreamsBuilder();
new StreamsConfig().pipeline(builder); // your topology method
Properties props = new Properties();
props.put("default.key.serde", Serdes.StringSerde.class);
props.put("default.value.serde", Serdes.StringSerde.class);
try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
TestInputTopic<String, String> in = driver.createInputTopic(
"words-input", new StringSerializer(), new StringSerializer());
in.pipeInput("the quick brown the");
// assert on the output topic / state store
} No application.id or bootstrap config needed since Kafka 3.2. The default serde props are still required because the topology above relies on them, and default serdes have had no default since Kafka 3.0; use explicit serdes (Consumed.with, Grouped.with) in the topology and you can drop the props entirely.
For an end-to-end test that exercises the real binder and Spring config, use EmbeddedKafka (from spring-kafka-test) or Testcontainers, slower, but it catches binding and serde wiring that TopologyTestDriver can't see.
Which one should you pick?
- Reach for
spring-kafka+@EnableKafkaStreamswhen you have one app with one (or a few) topologies, you want the topology explicit in your code, and you need interactive queries or fine control over theStreamsBuilderFactoryBeanlifecycle. It's the smaller abstraction and the easier one to debug. - Reach for Spring Cloud Stream when you're building many small event-driven functions, you already use the framework for other binders (RabbitMQ, plain Kafka), and you value declarative bindings and a uniform model over seeing the topology spelled out.
If you're unsure, start with spring-kafka. It's closer to the getting-started mental model, and you can always move to the binder later.
What's the difference between @EnableKafkaStreams and Spring Cloud Stream?
@EnableKafkaStreams is part of spring-kafka: you write a real StreamsBuilder topology and Spring manages its lifecycle via StreamsBuilderFactoryBean. Spring Cloud Stream is a separate library where you write Function beans and a binder wires them to topics declaratively. They run the same engine but are not meant to be combined.
Why does my Spring Boot Kafka Streams app fail to start?
The most common cause since Kafka 3.0 is missing default serdes: set default.key.serde and default.value.serde under spring.kafka.streams.properties, or use explicit serdes everywhere, or startup throws a ConfigException. A missing application-id falls back to spring.application.name; Boot only throws InvalidConfigurationPropertyValueException when both are unset. bootstrap-servers never fails fast: it defaults to localhost:9092.
How do I access interactive queries in a Spring Boot Kafka Streams app?
With spring-kafka, autowire the StreamsBuilderFactoryBean and call getKafkaStreams() to get the live KafkaStreams, then streams.store(...). It returns null until the factory bean has started, and even a non-null instance throws InvalidStateStoreException from store(...) until the state reaches RUNNING, so wait on the state listener. See the interactive queries page for multi-instance routing.
How do I test a Spring Boot Kafka Streams app?
Unit-test the topology with TopologyTestDriver, extract it into a method that takes a StreamsBuilder so no broker or Spring context is needed. For full binder/wiring coverage use EmbeddedKafka or Testcontainers, which start a real broker.
Can I put my whole topology in one @Bean?
Yes. A single @Bean public KStream<...> pipeline(StreamsBuilder builder) can hold the entire topology; the auto-configured StreamsBuilderFactoryBean builds and starts it. You can also split across multiple @Bean methods sharing the same injected StreamsBuilder.
See it in practice with Conduktor
Spring hides the wiring, not the runtime: under the hood your app is still a consumer group plus the changelog and repartition topics it creates. Conduktor Console shows the
internal topics, the consumer-group lag that tells you whether a restore is still running after a deploy, and the partition assignment across instances, the signals Spring's logs don't surface on their own.-...
Next steps
- Build your first Kafka Streams app, the plain-Java baseline Spring wraps, and why
application.idmatters - Testing Kafka Streams,
TopologyTestDriverand when to reach for Testcontainers - Interactive queries, serve state-store data over REST from a Spring app