Kafka Streams state store TTL
Kafka Streams has no built-in TTL for key-value state stores. Learn why state grows forever, why RocksDB TTL resurrects data, and how to expire keys.
Learn why Kafka Streams state grows forever, and how to expire it safely.
Here is a fact that surprises people in production: Kafka Streams has no built-in TTL for key-value state stores. Every key you write to a plain KeyValueStore stays there forever, across restarts, across rebalances, until you delete it. There is no expireAfter setter, no eviction thread, no config flag. An app that aggregates by an unbounded key space (session IDs, request IDs, customer-then-churned IDs) grows its state store without limit until the disk fills or the container is OOMKilled.
The obvious fixes, turn on RocksDB's own TTL, or call store.delete() in a punctuator, both have a sharp edge that brings the "deleted" data back. This page covers why, and the patterns that actually bound the store.
What you'll learn:
- Why windowed stores are bounded but plain key-value stores are not
- The RocksDB-TTL trap, and how "expired" data resurrects on restore
- Three real patterns to expire state, and why a tombstone is the load-bearing step
- Why deleting from the local store is never enough on its own
Why windowed stores are bounded and KV stores aren't
If you've used windowed aggregations, you may assume state expires automatically, and for windowed stores it does. A WindowStore (the store behind TimeWindows aggregations) has a retention period; segments older than retention are dropped wholesale, and the matching changelog topic uses compact,delete so old segments age out of Kafka too. (The changelog's retention.ms is the store retention plus windowstore.changelog.additional.retention.ms, 24 hours by default: a 2-hour store gets a 26-hour changelog.) Windowed state is self-limiting by design.
A plain KeyValueStore, what you get from a non-windowed count(), aggregate(), or reduce(), or any store you build by hand, has no such mechanism. Its changelog is purely log-compacted: compaction keeps the latest value per key forever and only ever removes a key when it sees a tombstone (a record with a null value) for it. No tombstone, no removal. So both halves of the store, the local RocksDB copy and the changelog in Kafka, grow with the cardinality of your key space, not with anything time-bounded.
That single asymmetry is the whole problem. The rest of this page is about how to retrofit expiry onto the unbounded case.
The RocksDB TTL trap
RocksDB, the engine behind the default store, has its own native TTL. It looks like exactly the knob you want. The first trap: you can't even reach the real one. RocksDB's per-value TTL lives in TtlDB, a wrapper database that Kafka Streams never opens (RocksDBStore calls plain RocksDB.open, with zero TTL handling as of Streams 4.3). What a RocksDBConfigSetter can set is options.setTtl, a column-family option that under Streams' default leveled compaction expires nothing: it only schedules old files for cascading compaction to the bottom level. Making that knob drop data at all means switching the store to FIFO compaction, which deletes whole files wholesale. And even real TtlDB semantics would still be a trap, for four reasons that compound:
- It's insertion-time, not access- or update-time.
TtlDBexpires a value a fixed duration after it was written, regardless of how recently you read or updated the logical key. "Keep keys idle for 30 days" is not expressible. - It's non-strict. Expired data is only physically removed during compaction, so a
get()or range scan can still return a value that is technically past its TTL. Your expiry logic becomes nondeterministic. - It gives Streams no eviction callback. When RocksDB drops an expired entry on compaction, Kafka Streams is never told. So Streams never writes a tombstone for that key.
- The config setter is global. A
RocksDBConfigSetterapplies to every store on the instance, you can't TTL one store and not another without branching onstoreName.
The third point is the one that bites. Because Streams isn't notified, the changelog still contains the key, RocksDB quietly dropped it locally, but the source of truth in Kafka was never tombstoned. On the next restore (a rebalance, a failover, a fresh instance), Streams replays that changelog and faithfully re-inserts every "expired" key into the rebuilt store. The data resurrects.
🚫 "I set a RocksDB TTL, so my Kafka Streams state is bounded now."
It isn't. At best, RocksDB's TTL trims the local copy on compaction but never tells Streams, so the changelog keeps every key, and the first restore replays them all straight back into the store. In Streams' default configuration it's worse: the reachable setTtl knob frees nothing locally either. You've added nondeterministic local reads without bounding anything durable. To bound the state you must remove keys in a way the changelog learns about: a tombstone. That's what the patterns below do.
The patterns that actually work
The throughline of every working pattern: expiry must reach the changelog as a tombstone. Deleting only the local RocksDB entry is cosmetic, the changelog still has the key, and restore brings it back. There are three solid approaches, in rough order of preference.
Pattern A: re-key into a windowed store
If your access pattern allows it, the cleanest answer is to not use an unbounded KV store at all. Re-key your data by entity into a WindowStore (or session store) with a retention equal to your desired TTL, and let Streams' built-in windowed-retention machinery age old windows out for free, both locally and in the changelog. No punctuator, no manual tombstones, no resurrection risk. This is the right default whenever "expire after N of inactivity/age" maps onto a window. When it doesn't, you need a true per-key idle timeout, or random-access lookups by key, fall through to B.
Pattern B: a punctuator that scans, deletes, and tombstones
The general-purpose pattern uses the Processor API: store a last-seen timestamp alongside each value, and schedule a wall-clock punctuator that finds stale keys and removes them. The critical detail is how you remove them.
public class TtlProcessor implements Processor<String, Event, String, Event> {
private KeyValueStore<String, ValueAndTimestamp<Event>> store;
private static final Duration TTL = Duration.ofDays(30);
@Override
public void init(ProcessorContext<String, Event> context) {
// _0_ must be registered with Stores.timestampedKeyValueStoreBuilder,
// otherwise this assignment to ValueAndTimestamp values fails at runtime.
this.store = context.getStateStore("entity-store");
// WALL_CLOCK_TIME: fires on the poll cadence even when no records arrive.
// STREAM_TIME would never fire on an idle partition, and idle is exactly
// when you most need to expire stale state.
context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, this::expire);
}
private void expire(long now) {
try (KeyValueIterator<String, ValueAndTimestamp<Event>> it = store.all()) {
while (it.hasNext()) {
KeyValue<String, ValueAndTimestamp<Event>> entry = it.next();
if (now - entry.value.timestamp() > TTL.toMillis()) {
// store.delete() writes a NULL (tombstone) to the changelog.
// That tombstone is what makes the deletion survive a restore.
store.delete(entry.key);
}
}
}
}
// process(...) updates store with ValueAndTimestamp.make(event, now)
} store.delete(key) on a changelogged store writes a null-valued record to the changelog. That null is the tombstone, and it is the entire point: on the next restore, replaying the changelog applies the tombstone and the key stays gone. A bare local removal that skipped the changelog would resurrect.
Two operational landmines, both real:
- The punctuator runs on the stream thread and pauses processing while it runs. A
store.all()scan over millions of keys can block consumption for seconds, risk a poll-timeout-driven rebalance, and grow linearly with state size. Mitigate by storing keys with a timestamp prefix and usingstore.range(...)to scan only the segment that could be expiring, or by time-boxing the punctuator (process for N milliseconds, remember the last key visited, resume on the next tick). Don'tstore.all()over a large store on a tight schedule. - This is state expiry, unrelated to error handling. Routing a bad record elsewhere is a dead letter queue; routing an old key out of the store is TTL. They share no machinery, don't conflate the two.
Pattern C: tombstone the source topic for a KTable
If the unbounded state is a KTable materialized from a source topic, the cleanest eviction is to emit a tombstone to the source topic (from an external job, or a punctuator producing to it). The KTable processes the null, deletes the key from its store, and, because the store's changelog mirrors those deletes and is compacted, the changelog compacts the key away too. One tombstone reclaims space in the local store, the changelog, and (eventually) the source topic. This is the only approach where the deletion propagates all the way back to the source of truth, so nothing downstream that re-reads the topic resurrects it either. (A related primitive for collapsing duplicate keys is covered in deduplication.)
Why local deletion is never enough, restate the trap
It's worth saying once more because it's the mistake everyone makes first. Three of the "obvious" ways to free state, RocksDB native TTL, a Stores-level in-memory eviction, or any code that drops the entry from the local map without going through the store's delete path, all leave the changelog untouched. The local copy shrinks; the durable copy doesn't. Then a restore replays the changelog and the state you "expired" comes straight back, often days later when an unrelated rebalance triggers the restore, which is the worst possible time to discover the bug.
| Approach | Frees local store? | Tombstones changelog? | Survives restore? |
|---|---|---|---|
| RocksDB native TTL | Not under default compaction (FIFO only) | No | No, data resurrects |
store.delete() in a punctuator | Yes | Yes | Yes |
| Tombstone to source topic (KTable) | Yes | Yes (compacts away) | Yes |
| Windowed store + retention (Pattern A) | Yes (segment drop) | Yes (compact,delete) | Yes |
Does Kafka Streams have a built-in TTL?
Not for key-value state stores. There is no expireAfter setting or eviction thread for a plain KeyValueStore, keys live forever until you delete them. Only windowed stores expire automatically, via their retention period. For non-windowed state you implement expiry yourself with a punctuator or by tombstoning.
Why does my Kafka Streams state store grow forever?
Because a plain key-value store has no time bound: its changelog is log-compacted, which keeps the latest value per key indefinitely and only removes a key when it sees a tombstone. If your keys come from an unbounded space (session IDs, request IDs) and you never tombstone them, both the local RocksDB store and the changelog grow with key cardinality until the disk fills.
How do I expire old keys in a Kafka Streams state store?
Three patterns: re-key into a windowed store and let retention drop old windows; or run a wall-clock punctuator that finds stale keys and calls store.delete() (which writes a tombstone to the changelog); or, for a KTable, emit a tombstone to the source topic. In every case the deletion must reach the changelog as a tombstone, or a restore brings the key back.
Does RocksDB's TTL work for Kafka Streams?
No, it's a trap twice over. RocksDB's real per-value TTL (TtlDB) isn't reachable from Streams at all, and the options.setTtl you can set via a RocksDBConfigSetter expires nothing under the default leveled compaction. Even where RocksDB TTL does apply, it's insertion-time-based, only evicts on compaction (so reads can still return expired data), and gives Streams no eviction callback: Streams never tombstones the changelog, so the next state restore replays every "expired" key and the data resurrects. Don't rely on it to bound state.
Why does deleted state come back after a restart?
Because the deletion only touched the local RocksDB copy and never wrote a tombstone to the changelog. The changelog is the source of truth: on restart or rebalance, Streams rebuilds the store by replaying it, re-inserting any key it still contains. Always delete through the store's delete path (store.delete()) so a tombstone is written.
See it in practice with Conduktor
Unbounded state is visible from outside the app: the changelog topic behind a store grows with it. Conduktor Console lets you inspect that changelog topic's size and compaction settings, and watch the consumer-group lag that tells you whether a restore (the moment "expired" keys resurrect) is in progress, so you can confirm your TTL pattern is actually shrinking the durable state, not just the local copy.
Next steps
- State stores, how the changelog makes local state durable, and why that's what TTL has to reach
- Processor API, punctuators,
context.schedule(), and time-boxing the expiry scan - RocksDB tuning, bounding the off-heap memory and disk that unbounded state consumes