# Kafka Streams with Spring Boot

*Wire Kafka Streams into Spring Boot the right way.*

There are two completely different ways to run Kafka Streams under Spring Boot, and people constantly conflate them. One is **Spring for Apache Kafka** (`spring-kafka`): you write a real `StreamsBuilder` topology and Spring manages its lifecycle. The other is **Spring Cloud Stream**: you write `java.util.function.Function` beans and a binder wires them to topics. Same engine underneath, very different code and config.

This page covers both, side by side, so you can tell which one a Stack Overflow answer is even talking about, and pick the right one for your service.

**What you'll learn:**
- The `spring-kafka` approach: `@EnableKafkaStreams`, `StreamsBuilderFactoryBean`, and `spring.kafka.streams.*`
- The Spring Cloud Stream functional binder: `Function<KStream, KStream>` beans and declarative bindings
- When to pick which, and why mixing them up causes half the confusion
- The lifecycle, error-handling, and testing details Spring hides from you

## Two libraries, one engine

Both options run the exact same `org.apache.kafka:kafka-streams` library underneath, the topology, [state stores](https://www.conduktor.io/kafka-streams/state-store), changelog topics, and rebalancing all behave identically. What differs is how much Spring writes for you.

| | Spring for Apache Kafka (`spring-kafka`) | Spring Cloud Stream (KStream binder) |
|---|---|---|
| You write | A `KStream<...>` topology with the DSL | A `Function<KStream, KStream>` bean |
| Topics wired by | You, in the topology (`stream(...)`, `to(...)`) | The binder, via `spring.cloud.stream.bindings.*` |
| Config namespace | `spring.kafka.streams.*` | `spring.cloud.stream.kafka.streams.*` |
| Lifecycle | `StreamsBuilderFactoryBean` | The binder |
| Control you keep | Full, it's a normal topology | Less, bindings are declarative |
| Good fit | One app, full control, interactive queries | Many small functions, event-driven microservices, multi-binder |

Neither is "more correct." `spring-kafka` is closer to the metal and the better default when you want to *see* your topology. Spring Cloud Stream trades that visibility for declarative bindings and a uniform programming model across messaging systems.

> **Versions.** Spring for Apache Kafka 4.0 (GA Nov 2025) and Spring Cloud Stream 5.x track the Kafka 4.x clients. If you're on Spring Boot 3.x you're on the earlier `spring-kafka` 3.x line, and Spring Cloud Stream 4.x stays there too (kafka-clients 3.8.x). The APIs below are stable across both, but check the [Spring Kafka Streams reference](https://docs.spring.io/spring-kafka/reference/streams.html) for your exact version.

## Approach 1: spring-kafka and @EnableKafkaStreams

Add the dependency, annotate a config class with `@EnableKafkaStreams`, and Spring auto-configures a `StreamsBuilderFactoryBean` for you. You then declare one or more `@Bean public KStream<...>` methods that take the auto-configured `StreamsBuilder` and define the topology. Spring builds it and manages `start()`/`close()` for you.

```xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
```

```java
@Configuration
@EnableKafkaStreams
public class StreamsConfig {

    @Bean
    public KStream<String, String> pipeline(StreamsBuilder builder) {
        KStream<String, String> stream = builder.stream("words-input");
        stream
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
            .filter((k, word) -> !word.isEmpty())
            .groupBy((k, word) -> word)
            .count(Materialized.as("word-counts"))
            .toStream()
            .to("words-output", Produced.with(Serdes.String(), Serdes.Long()));
        return stream;
    }
}
```

You do **not** create a `KafkaStreams` instance or call `start()` yourself, the factory bean does it after the context refreshes. Config goes in `application.yml` under `spring.kafka.streams`:

```properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=wordcount-app
# default serdes have no default since Kafka 3.0 (KIP-741); without these the app fails at startup
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties[default.value.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
# anything not exposed as a typed property goes under .properties.*
spring.kafka.streams.properties[processing.guarantee]=exactly_once_v2
spring.kafka.streams.properties[num.stream.threads]=2
```

Raw Kafka Streams requires both `application.id` (the [identity that names your consumer group, internal topics, and state directory](https://www.conduktor.io/kafka-streams/getting-started)) and `bootstrap.servers`. Boot softens both, in opposite directions. `application-id` falls back to `spring.application.name`; the context only fails (with `InvalidConfigurationPropertyValueException`) when both are unset. And `bootstrap-servers` silently defaults to `localhost:9092`, so a missing broker config never fails fast: it just points your app at localhost, which is its own production footgun.

### Lifecycle and error handling

The `StreamsBuilderFactoryBean` is where you hook the things a raw `KafkaStreams` exposes. Spring lets you customize it after auto-configuration:

```java
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return factoryBean -> {
        factoryBean.setStateListener((newState, oldState) ->
            log.info("State {} -> {}", oldState, newState));
        factoryBean.setStreamsUncaughtExceptionHandler(ex ->
            StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
    };
}
```

The `StateListener` is your signal for `REBALANCING → RUNNING` transitions and for the `ERROR` state. The uncaught-exception handler decides what happens when a stream thread dies, `REPLACE_THREAD`, `SHUTDOWN_CLIENT`, or `SHUTDOWN_APPLICATION`, the same decision covered in [dead letter queues](https://www.conduktor.io/kafka-streams/dead-letter-queue). Don't leave it on the default: since Kafka 2.8 (KIP-671) the default with no handler is `SHUTDOWN_CLIENT`, so one poison record halts the entire client (it transitions through `PENDING_ERROR` into `ERROR` and every thread stops). Set the handler deliberately, `REPLACE_THREAD` if you want to keep running, and alert on `ERROR` from the state listener.

### Reaching the KafkaStreams instance

For [interactive queries](https://www.conduktor.io/kafka-streams/interactive-queries) you need the live `KafkaStreams` object. Spring exposes it through the factory bean:

```java
@Autowired
private StreamsBuilderFactoryBean factoryBean;

public Long countFor(String word) {
    KafkaStreams streams = factoryBean.getKafkaStreams(); // null until the factory bean has started
    ReadOnlyKeyValueStore<String, Long> store = streams.store(
        StoreQueryParameters.fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore()));
    return store.get(word);
}
```

`getKafkaStreams()` returns `null` until the factory bean has started during context refresh, and `null` again after `stop()`, so endpoints can NPE during graceful shutdown too. The null check alone isn't the right gate either: a non-null instance still rejects `store(...)` with `InvalidStateStoreException` until the state reaches `RUNNING`. Guard on the state or the state listener; see the [interactive queries](https://www.conduktor.io/kafka-streams/interactive-queries) page for the full pattern.

## Approach 2: Spring Cloud Stream functional binder

Spring Cloud Stream inverts the model. You don't touch `StreamsBuilder`. You write a bean of type `Function<KStream<K,V>, KStream<K2,V2>>` (or `Consumer<KStream<...>>` for a sink), and the binder connects its input and output to topics declared in config.

```xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
```

```java
@Bean
public Function<KStream<String, String>, KStream<String, Long>> pipeline() {
    return input -> input
        .flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
        .filter((k, word) -> !word.isEmpty())
        .groupBy((k, word) -> word)
        .count(Materialized.as("word-counts"))
        .toStream();
}
```

```properties
spring.cloud.function.definition=pipeline
spring.cloud.stream.bindings.pipeline-in-0.destination=words-input
spring.cloud.stream.bindings.pipeline-out-0.destination=words-output
spring.cloud.stream.kafka.streams.binder.application-id=wordcount-app
spring.cloud.stream.kafka.streams.binder.brokers=localhost:9092
```

The binding names follow the convention `<function-name>-in-<index>` / `-out-<index>`. The function name comes from `spring.cloud.function.definition`. This is the model Spring documents as current, the old annotation-based `@EnableBinding` / `@StreamListener` API was **removed in Spring Cloud Stream 4.0**, so any tutorial using those annotations is stale. Use the functional `Function`/`Consumer` style.

> 🚫 *"I'll just add `@EnableKafkaStreams` to my Spring Cloud Stream app to make the state store work."*

That line is the conflation in a nutshell. `@EnableKafkaStreams` is a `spring-kafka` annotation; it has no role in a Spring Cloud Stream binder app, where the binder owns the lifecycle. Mixing the two gives you two competing attempts to start Streams. Pick one stack and stay in it.

## Testing, same as plain Kafka Streams

Spring changes how the topology is *wired*, not what it *is*. So the unit-testing story is unchanged: extract the topology and drive it with `TopologyTestDriver`, no broker and no Spring context required. That's the fast, deterministic test, covered in [testing Kafka Streams](https://www.conduktor.io/kafka-streams/testing).

For an `@EnableKafkaStreams` app, refactor the topology into a plain method or `@Bean` that takes a `StreamsBuilder` so a test can build it in isolation:

```java
StreamsBuilder builder = new StreamsBuilder();
new StreamsConfig().pipeline(builder);            // your topology method
Properties props = new Properties();
props.put("default.key.serde", Serdes.StringSerde.class);
props.put("default.value.serde", Serdes.StringSerde.class);
try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
    TestInputTopic<String, String> in = driver.createInputTopic(
        "words-input", new StringSerializer(), new StringSerializer());
    in.pipeInput("the quick brown the");
    // assert on the output topic / state store
}
```

No `application.id` or bootstrap config needed since Kafka 3.2. The default serde props are still required because the topology above relies on them, and default serdes have had no default since Kafka 3.0; use explicit serdes (`Consumed.with`, `Grouped.with`) in the topology and you can drop the props entirely.

For an end-to-end test that exercises the real binder and Spring config, use `EmbeddedKafka` (from `spring-kafka-test`) or [Testcontainers](https://www.conduktor.io/kafka-streams/testing), slower, but it catches binding and serde wiring that `TopologyTestDriver` can't see.

## Which one should you pick?

- **Reach for `spring-kafka` + `@EnableKafkaStreams`** when you have one app with one (or a few) topologies, you want the topology explicit in your code, and you need [interactive queries](https://www.conduktor.io/kafka-streams/interactive-queries) or fine control over the `StreamsBuilderFactoryBean` lifecycle. It's the smaller abstraction and the easier one to debug.
- **Reach for Spring Cloud Stream** when you're building many small event-driven functions, you already use the framework for other binders (RabbitMQ, plain Kafka), and you value declarative bindings and a uniform model over seeing the topology spelled out.

If you're unsure, start with `spring-kafka`. It's closer to the [getting-started](https://www.conduktor.io/kafka-streams/getting-started) mental model, and you can always move to the binder later.

**What's the difference between @EnableKafkaStreams and Spring Cloud Stream?**

`@EnableKafkaStreams` is part of `spring-kafka`: you write a real `StreamsBuilder` topology and Spring manages its lifecycle via `StreamsBuilderFactoryBean`. Spring Cloud Stream is a separate library where you write `Function<KStream, KStream>` beans and a binder wires them to topics declaratively. They run the same engine but are not meant to be combined.

**Why does my Spring Boot Kafka Streams app fail to start?**

The most common cause since Kafka 3.0 is missing default serdes: set `default.key.serde` and `default.value.serde` under `spring.kafka.streams.properties`, or use explicit serdes everywhere, or startup throws a `ConfigException`. A missing `application-id` falls back to `spring.application.name`; Boot only throws `InvalidConfigurationPropertyValueException` when both are unset. `bootstrap-servers` never fails fast: it defaults to `localhost:9092`.

**How do I access interactive queries in a Spring Boot Kafka Streams app?**

With `spring-kafka`, autowire the `StreamsBuilderFactoryBean` and call `getKafkaStreams()` to get the live `KafkaStreams`, then `streams.store(...)`. It returns `null` until the factory bean has started, and even a non-null instance throws `InvalidStateStoreException` from `store(...)` until the state reaches `RUNNING`, so wait on the state listener. See the interactive queries page for multi-instance routing.

**How do I test a Spring Boot Kafka Streams app?**

Unit-test the topology with `TopologyTestDriver`, extract it into a method that takes a `StreamsBuilder` so no broker or Spring context is needed. For full binder/wiring coverage use `EmbeddedKafka` or Testcontainers, which start a real broker.

**Can I put my whole topology in one @Bean?**

Yes. A single `@Bean public KStream<...> pipeline(StreamsBuilder builder)` can hold the entire topology; the auto-configured `StreamsBuilderFactoryBean` builds and starts it. You can also split across multiple `@Bean` methods sharing the same injected `StreamsBuilder`.

> **See it in practice with Conduktor**
> Spring hides the wiring, not the runtime: under the hood your app is still a consumer group plus the changelog and repartition topics it creates. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) shows the `<application-id>-...` internal topics, the consumer-group lag that tells you whether a restore is still running after a deploy, and the partition assignment across instances, the signals Spring's logs don't surface on their own.

## Next steps

- [Build your first Kafka Streams app](https://www.conduktor.io/kafka-streams/getting-started), the plain-Java baseline Spring wraps, and why `application.id` matters
- [Testing Kafka Streams](https://www.conduktor.io/kafka-streams/testing), `TopologyTestDriver` and when to reach for Testcontainers
- [Interactive queries](https://www.conduktor.io/kafka-streams/interactive-queries), serve state-store data over REST from a Spring app
