# Evolving a Kafka Streams topology without losing state

*Learn why a topology change breaks your app, and how to ship one safely.*

You add one `filter` in the middle of a working Kafka Streams app, deploy it, and the app either refuses to start with a restore error or comes up with an aggregation that has silently reset to zero. Nothing in your diff touched state. The cause is almost never your code: it's that Kafka Streams names its internal topics by an operator's *position* in the topology, and you just shifted every position downstream of that `filter`.

This is the single most under-documented way to break a stateful Streams app, and it's entirely preventable, but only if you do the right thing from the first commit, not after the incident.

**What you'll learn:**
- Why internal store, changelog, and repartition topic names are positional, and what shifts them
- How to name every stateful operator so a refactor stops renaming your state
- Which topology changes are safe rolling deploys, and which require a full reset
- How to snapshot `Topology#describe()` and diff it in CI before a deploy ships

## Internal names are positional

When you build a topology, Kafka Streams assigns every processor, store, and internal topic an autogenerated name. Call `topology.describe()` on a simple count-and-output app and you get this: real node names from the DSL, with the `Topologies:` header and the `-->`/`<--` edge lines trimmed for readability:

```
Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [in])
    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
    Sink: KSTREAM-SINK-0000000004 (topic: out)
```

Look at the numbers. `KSTREAM-SOURCE-0000000000`, then `-0000000001` for the store, `-0000000002` for the aggregate processor, and so on. They are assigned by a global counter as the builder walks your topology in declaration order. The store name `KSTREAM-AGGREGATE-STATE-STORE-0000000001` is what gets baked into the changelog topic name your cluster actually holds:

```
<application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
```

The same positional scheme names repartition topics (`...-repartition`) and the internal stores behind joins. None of these names come from anything you wrote. They come from *where* in the graph the operator sits.

Now insert one stateless operator upstream of the aggregate:

```java
builder.<String, String>stream("in")
    .filter((k, v) -> v != null)     // <-- new operator, takes position 0000000001
    .groupByKey()
    .count()                         // store is now 0000000002, was 0000000001
    .toStream()
    .to("out");
```

Every counter after the `filter` shifts by one. The aggregate's store becomes `KSTREAM-AGGREGATE-STATE-STORE-0000000002`, and its changelog becomes `...-STATE-STORE-0000000002-changelog`. On the next start, the app looks for a store named `...0000000002`, finds an empty changelog (the old `...0000000001-changelog` still holds all your data, now orphaned), and either restores nothing or fails consistency checks. Your running count is gone, sitting in a topic no operator references anymore. Reorder two branches and you get the same effect: positions move, names move, state is stranded.

> 🚫 *"It's an internal refactor: I can rearrange my topology freely between deploys."*

That assumption is exactly what strands state. A refactor that touches the *order or count* of operators rewrites the names Kafka Streams uses to find its own data. The DSL gives you no warning at compile time and, with stateless inserts, often none at startup either. The app just restarts its aggregations from scratch. It doesn't even reprocess old input to rebuild them: the consumer group's committed offsets survive the change (the group id is your unchanged `application.id`), so the new store only counts records that arrive after the redeploy.

## Name everything, from day one

The fix is not to freeze your topology: it's to stop letting the position be the identity. Give every stateful piece an explicit, stable name, so its changelog/repartition/store name no longer depends on where it sits in the graph. Do this on the very first version, because retrofitting names onto an app that already has state in production is itself a breaking change (the names move from positional to explicit: see the reset section).

| What | How to name it | What it stabilizes |
|---|---|---|
| Materialized store (count/aggregate/reduce) | `Materialized.as("orders-per-customer")` | The store name and its `-changelog` topic |
| Repartition (after a re-key) | `Grouped.as("by-customer")` or `Repartitioned.as("by-customer")` | The `-repartition` topic name |
| Stream-stream join | `StreamJoined.with(...).withName("o-p").withStoreName("o-p-store")` | The join's internal stores and changelogs |
| Any operator (for `describe()` readability) | `Named.as("dedupe-step")` | The processor node name in the topology graph |

Concretely:

```java
builder.<String, String>stream("in")
    .filter((k, v) -> v != null, Named.as("drop-nulls"))
    .groupByKey(Grouped.as("by-key"))
    .count(Materialized.as("event-count"))   // changelog: <app-id>-event-count-changelog
    .toStream()
    .to("out");
```

Now the store is `event-count` and its changelog is `<application.id>-event-count-changelog`, regardless of how many operators you add before or after it. You can insert a `filter`, add a branch, or reorder steps, and the count keeps finding its state. This is what [naming a state store](https://www.conduktor.io/kafka-streams/state-store) buys you beyond tidiness, and it's the foundation everything below depends on.

Since Kafka 4.1 you can make this a hard rule: set `ensure.explicit.internal.resource.naming` to `true` ([KIP-1111](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1111%3A+Enforcing+Explicit+Naming+for+Kafka+Streams+Internal+Topics)) and building a topology with any auto-named internal topic or store throws a `TopologyException` listing the offenders. With the config off, recent versions still log a build-time WARN naming every unnamed changelog, so the signal is in your logs either way.

The capability is old and stable: explicit operator naming landed in [KIP-307](https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL), and the [DSL topology naming guide](https://kafka.apache.org/documentation/streams/developer-guide/dsl-topology-naming.html) in the Apache docs is the canonical reference. Bill Bejeck's [Optimizing Kafka Streams Applications](https://www.confluent.io/blog/optimizing-apache-kafka-streams-applications/) walks through how naming interacts with topology optimization. Treat all three as required reading before your first stateful deploy.

> **Naming alone doesn't let you move a store between operators.** A stable name keeps the changelog name fixed when *positions* shift around it. It does not migrate data if you change the operator's semantics: switching `count()` to `aggregate()`, or changing a join's window, produces state that is incompatible regardless of the name. Naming protects against accidental renames, not against genuinely different computations.

## Optimization can reshape the topology too

Naming protects you from accidental position shifts in *your* code. One more thing can restructure the topology underneath you: the `topology.optimization` config. It defaults to `NO_OPTIMIZATION`, but when enabled it rewrites the logical plan before it becomes a physical topology: for example reusing a source topic directly as a `KTable`'s changelog (skipping a separate changelog topic), merging repartition topics that feed multiple downstream operators, and collapsing a stream-stream self-join onto a single store. Accepted values are `none`, `all`, or a comma-separated list of granular flags: `reuse.ktable.source.topics`, `merge.repartition.topics`, and `single.store.self.join`.

The catch for *this* page: turning optimization on or off (or upgrading to a version that adds a new optimization) can change which internal topics exist and how they're wired, which is its own kind of topology change. Pin the specific flags you want in production rather than `all`, so a library upgrade doesn't silently restructure your internal topics out from under your running state. As always, verify the new plan with `topology.describe()` (see below) before you deploy.

## Which changes are safe, and which need a reset

Not every topology change is dangerous. Group the changes you make into three buckets.

**Safe as a rolling deploy** (names stable, state semantics unchanged):
- Adding, removing, or editing a *stateless* operator (`map`, `filter`, `mapValues`, `peek`) when stores and repartitions are explicitly named.
- Changing the logic *inside* a stateless lambda.
- Adding a brand-new, independently named stateful sub-topology that reads existing topics: the old state is untouched; the new store restores from its own (initially empty) changelog.

**Needs care (same name, different computation):**
- Changing what a named stateful operator computes (count → sum, new window size, new join type). The changelog name is stable but its *contents* no longer mean what the new code expects. You usually need a new store name and a controlled rebuild.

**Requires the reset tool** (the operator identity or input semantics changed and old state must be discarded):
- You shipped without explicit names and now need to add them: the positional names are abandoned, so the old changelogs are orphaned and you must start clean.
- You changed the meaning of the computation enough that replaying from the existing changelog would be wrong.
- You want to reprocess input topics from the beginning under a new topology.

For that last bucket, Kafka ships [`kafka-streams-application-reset`](https://kafka.apache.org/documentation/streams/developer-guide/app-reset-tool.html). It does three different things to three kinds of topic:

```bash
kafka-streams-application-reset \
  --application-id orders-aggregator \
  --input-topics in \
  --bootstrap-server broker:9092
```

| Topic kind | What the tool does |
|---|---|
| Input topics (you pass `--input-topics`) | Resets the app's consumer offsets to **earliest** so it reprocesses from the start |
| Intermediate topics (user-managed topics that are both an output and an input of the app, the old `through()` pattern, removed in Kafka 4.0; pass them via `--intermediate-topics`, deprecated in 4.x) | Skips to the end (seeks to the current offset) so old in-flight data isn't reprocessed |
| Internal topics (changelog, repartition, including topics created by `.repartition()`) | **Deletes** them so they're recreated cleanly on next start |

Two operational rules that the tool will not protect you from:

1. **Stop every instance first.** The tool resets offsets and deletes internal topics; if any instance is still running and committing, you get a corrupted, half-reset state. Scale the deployment to zero, run the tool, then scale back up.
2. **Triple-check the `--application-id`.** The application ID is the prefix for *all* of an app's internal topics and its consumer group. A typo that happens to match another app's ID will delete *that* app's changelogs and reset *its* offsets. There is no undo. This is the same footgun that makes a careless `application.id` dangerous when [building your first app](https://www.conduktor.io/kafka-streams/getting-started).

After a reset, also wipe the local state directory (`state.dir`) on every instance, or each one will try to reuse stale local RocksDB data that no longer matches the deleted changelogs.

## Catch the break in CI, not in production

You don't want to discover a positional shift from a 3am restore failure. The topology is fully describable before it ever runs: `topology.describe()` returns a `TopologyDescription` whose `toString()` is the untrimmed version of the block at the top of this page, `Topologies:` header and edge lines included. Snapshot it and diff it.

```java
@Test
void topologyMatchesSnapshot() {
    Topology topology = OrdersApp.buildTopology();
    String actual = topology.describe().toString();
    String expected = Files.readString(Path.of("src/test/resources/topology.txt"));
    assertEquals(expected, actual);   // fails if any internal name or wiring changed
}
```

Commit `topology.txt` alongside the code. When a pull request changes the topology, this test fails and prints a diff of the old vs new description, including any shifted `...0000000002` names. A reviewer then makes a deliberate call: is this a safe rolling change, or does it need the reset tool and a maintenance window? The point is that the decision happens in review, with the diff in front of you, instead of after the deploy when the state is already orphaned. This snapshot lives naturally inside your [Kafka Streams test suite](https://www.conduktor.io/kafka-streams/testing), which already runs the topology through `TopologyTestDriver`.

The same discipline pays off when you scale. Repartition topic names and partition counts are part of the topology, so a change that alters them interacts with how tasks are assigned across instances. See [scaling and parallelism](https://www.conduktor.io/kafka-streams/scaling).

**How do I change a Kafka Streams topology without losing state?**

Give every stateful operator an explicit, stable name with `Materialized.as(...)`, `Grouped.as(...)`, or `StreamJoined.withName(...)` so its changelog and repartition topic names stop depending on the operator's position. With names fixed, you can insert, remove, or reorder operators around it and the store keeps finding its state.

**Why does my Kafka Streams app fail to start or reset state after a code change?**

Kafka Streams names internal stores, changelogs, and repartition topics by an operator's *position* in the topology, assigned by a global counter in declaration order. Insert or reorder one operator and every downstream name shifts, so the app looks for a renamed changelog, finds it empty, and either restores nothing or fails consistency checks: your old data is orphaned in a topic nothing references.

**Which Kafka Streams topology changes are safe versus need a reset?**

Safe rolling deploys: adding or editing a *stateless* operator, changing logic inside a stateless lambda, or adding a new independently named stateful sub-topology. A reset is required when you add explicit names to an app that shipped without them, change what a named operator computes, or want to reprocess input topics from the beginning.

**When do I need the kafka-streams-application-reset tool?**

Use it when an operator's identity or input semantics changed and the old state must be discarded: for example after retrofitting names onto a positional app, or to reprocess inputs from the start. It resets input-topic offsets to earliest, skips intermediate topics to the end, and deletes internal changelog/repartition topics; stop every instance first and triple-check the `--application-id`, as there is no undo.

**How do I detect an incompatible topology change before deploying?**

Snapshot `topology.describe().toString()` to a committed file and assert against it in a test, so any shifted internal name fails the build with a diff. A reviewer then decides in code review whether the change is a safe rolling deploy or needs the reset tool, instead of discovering it from a restore failure in production.

> **See it in practice with Conduktor**
> When a topology change orphans state, the evidence is in the topics. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) shows the changelog and repartition topics your app created, so you can spot a stale `...-STATE-STORE-0000000001-changelog` that nothing reads anymore, confirm the new changelog is filling, and watch the consumer group lag that tells you whether a restore is still running after a deploy. Conduktor doesn't run your Streams app; it gives you the view of the internal topics and offsets the reset tool acts on.

## Next steps

- [Test your Kafka Streams topology](https://www.conduktor.io/kafka-streams/testing): snapshot `describe()` and diff it in CI
- [Kafka Streams state stores](https://www.conduktor.io/kafka-streams/state-store): why naming a store keeps its changelog stable
- [Build your first Kafka Streams app](https://www.conduktor.io/kafka-streams/getting-started): where `application.id` becomes your topic prefix and group ID
