Kafka Streams deduplication
Kafka Streams has no dedup operator. Build one: a Processor backed by a windowed state store that emits each business key once and purges old keys.
Build a deduplication operator Kafka Streams doesn't give you.
There is no dedup() in the Kafka Streams DSL. No distinct(), no .deduplicate(). This surprises people, because de-duplicating a stream feels like a primitive operation, and it's a question that comes up constantly. The reason there's no operator is that "duplicate" is a business decision: same key? same payload? same value within five minutes? Kafka can't guess, so it gives you the building blocks and you assemble the operator yourself.
This page is that assembly. We'll cover why exactly-once doesn't do this for you, the canonical pattern (a Processor plus a windowed state store), a complete code sketch you can adapt, and the layered defense that's usually the right production answer.
What you'll learn:
- Why exactly-once and the idempotent producer don't de-duplicate business records
- The canonical pattern: a stateful
Processorkeyed by a dedup id - Why the dedup store must be windowed so its state stays bounded
- The layered defense: idempotent producer + dedup processor + idempotent sink
Why exactly-once doesn't solve this
The first reflex is "I'll turn on exactly-once and the duplicates go away." They won't, and it's worth being precise about why before writing any code.
Exactly-once makes the consume → process → produce cycle inside Kafka atomic. It guarantees each input record is processed once even across retries and crashes. What it cannot do is recognize that two different input records (two distinct offsets) represent the same logical event. To Kafka, those are simply two records, and EOS faithfully processes both.
The idempotent producer has the same blind spot from the other direction. It de-duplicates retries from a single producer session, using a producer id and sequence numbers. It does nothing about the same event sent by two different producers, or the same event sent again after the producer restarted (new producer id, sequence reset), or a batch job that ran twice.
So the duplicates that need this pattern are the ones that arrive as genuinely separate records:
- An upstream service retried at the application layer and published the event twice.
- A producer crashed and replayed from its own source, re-emitting events.
- Two source systems feed the same topic and overlap.
- A backfill or reprocessing job re-published historical events.
None of these are "the cycle ran twice." They're "the data contains two copies." That's a stateful problem: to know a record is a duplicate, you have to remember the ones you've already seen.
The canonical pattern: remember keys, emit on first sight
The pattern is the same one a maintainer will point you to on the mailing list, because it's the only one that works without a remote lookup:
- Pick a dedup key: the field that defines "same event." Often an event id or idempotency key carried in the payload, not the Kafka record key.
- Keep a state store of keys you've already emitted.
- On each record, check the store. Not seen → forward it and record the key. Already seen → drop it.
- Bound the store in time so it doesn't grow forever.
Steps 1–3 are straightforward. Step 4 is the one that separates a pattern that works in production from one that quietly fills your disk, and it's why this needs the Processor API, not the DSL.
Why the store must be windowed
If you back dedup with a plain KeyValueStore, you remember every key you've ever seen, forever. For a high-cardinality stream (order ids, click events, payment references) that store grows without bound. The backing changelog topic grows with it. Eventually the disk fills or the state restore after a rebalance takes minutes because there's so much to replay.
Deduplication is almost always time-bounded in practice: "the same event won't legitimately reappear after N minutes/hours." So you only need to remember keys for that window. The right tool is a WindowStore, which retains entries only for a configured retention period and drops the rest:
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import java.time.Duration;
Duration dedupWindow = Duration.ofHours(1); // how long a key is _0_
WindowBytesStoreSupplier supplier = Stores.persistentWindowStore(
"dedup-store",
dedupWindow, // retention period: entries older than this are purged
dedupWindow, // window size
false); // retainDuplicates = false: one entry per key per window A window store gives you time-bounded state with purging handled by the store itself: you don't have to track expiry manually. The bound extends to the changelog: Kafka Streams creates a window store's changelog with cleanup.policy=compact,delete and retention.ms set to the store retention plus a 24-hour buffer (windowstore.changelog.additional.retention.ms), so the 1-hour store above gets a 25-hour changelog. A KeyValueStore changelog is compact-only and never expires by time. (You can purge a KeyValueStore by hand with a punctuator instead; the window store just makes it the store's job.)
The Processor
Here's a complete dedup processor using the modern process() API. It forwards a record the first time it sees the dedup key within the window, and drops it on any subsequent sighting:
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.time.Duration;
public class DeduplicationProcessor<V> implements Processor<String, V, String, V> {
private final Duration window;
private final java.util.function.Function<V, String> dedupKeyExtractor;
private ProcessorContext<String, V> context;
private WindowStore<String, Long> seen;
public DeduplicationProcessor(Duration window,
java.util.function.Function<V, String> dedupKeyExtractor) {
this.window = window;
this.dedupKeyExtractor = dedupKeyExtractor;
}
@Override
public void init(ProcessorContext<String, V> context) {
this.context = context;
this.seen = context.getStateStore("dedup-store");
}
@Override
public void process(Record<String, V> record) {
String dedupKey = dedupKeyExtractor.apply(record.value());
if (dedupKey == null) {
context.forward(record); // no dedup id: pass through rather than drop
return;
}
long now = record.timestamp();
long from = now - window.toMillis();
boolean alreadySeen = false;
try (WindowStoreIterator<Long> it = seen.fetch(dedupKey, from, now)) {
alreadySeen = it.hasNext();
}
if (!alreadySeen) {
seen.put(dedupKey, now, now); // record this sighting at its event time
context.forward(record); // first time within the window → emit
}
// else: duplicate within the window → drop silently
}
} Wiring it into a topology, registering the store so Kafka Streams creates its changelog:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
StoreBuilder<WindowStore<String, Long>> storeBuilder =
Stores.windowStoreBuilder(supplier, Serdes.String(), Serdes.Long());
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(storeBuilder);
builder.<String, Order>stream("orders")
.process(() -> new DeduplicationProcessor<>(
Duration.ofHours(1), order -> order.eventId()),
"dedup-store")
.to("orders-deduplicated"); Two details that matter:
fetch(key, from, now)scans the window for a prior sighting. If the iterator has any entry, you've seen this key inside the window: drop. Using event time (record.timestamp()) rather than wall-clock means the dedup window tracks when events happened, consistent with how the rest of Kafka Streams reasons about time.- The
WindowStorepurges entries older than its retention period on its own. If you'd rather use a plainKeyValueStore, schedule a punctuator to evict old keys yourself:
// Alternative eviction if you back dedup with a KeyValueStore instead of a WindowStore
context.schedule(Duration.ofMinutes(5), PunctuationType.STREAM_TIME, timestamp -> {
try (KeyValueIterator<String, Long> all = kvStore.all()) {
while (all.hasNext()) {
KeyValue<String, Long> entry = all.next();
if (timestamp - entry.value > window.toMillis()) {
kvStore.delete(entry.key);
}
}
}
}); Note PunctuationType.STREAM_TIME: it advances with record timestamps, so on an idle partition it won't fire: same stream-time behavior that trips up windowing and suppress. It also fires immediately on the first record processed, then on each interval boundary, so don't write eviction logic that assumes the first run comes only after five minutes. If you need eviction to run on a wall-clock cadence regardless of traffic, use PunctuationType.WALL_CLOCK_TIME instead, and accept that it evicts by processing time rather than event time.
Layered defense: where dedup actually belongs
A single dedup processor in the middle of a topology is rarely the whole answer. The robust production posture is layers, each catching what the previous one can't:
| Layer | Catches | Doesn't catch |
|---|---|---|
Idempotent producer (enable.idempotence=true) | Retry duplicates within one producer session | Cross-producer, cross-restart, re-published events |
| Dedup processor (this page) | Genuinely-duplicate records by business key, within a time window | Duplicates older than the window; non-Kafka side effects |
| Idempotent sink (upsert keyed by business id) | Whatever slips past the above, at the final write | Nothing further: it's the last line |
Do you actually need a dedup processor? Before adding stateful dedup, check whether a cheaper layer already covers you. If duplicates only come from producer retries,
enable.idempotence=trueis enough: no state store, no operator. If your only concern is the final write, an idempotent sink (an upsert keyed by a business id) is simpler and more robust than a mid-pipeline processor, and it survives duplicates the dedup window would have missed. Reach for the stateful dedup processor specifically when you need a de-duplicated topic that downstream consumers read, or when the same event genuinely arrives as separate records and the sink can't absorb them. A state store is real operational weight (disk, restore time, a changelog), so add it deliberately, not reflexively.
Choosing the window and the key
Two decisions determine whether this works:
- The dedup key. It must be stable across the duplicate copies: an event id or idempotency key assigned at the source, carried in the payload. A key generated inside your pipeline (a fresh UUID, a timestamp) defeats the whole thing, because each copy gets a different one. If the upstream doesn't provide a stable id, that's the first thing to fix; dedup can't work without one.
- The window length. Long enough to cover the realistic gap between duplicate arrivals (a producer retry storm, a backfill that overlaps live traffic), short enough to keep the store bounded. If duplicates can legitimately arrive days apart, a windowed in-stream dedup is the wrong tool: push the guarantee to an idempotent sink with a permanent unique constraint instead.
How do I deduplicate records in Kafka Streams?
There's no dedup() operator, so you build one with the Processor API: pick a stable dedup key from the payload, keep already-seen keys in a state store, and on each record forward it on first sight and drop it on any later sighting within a time window. Back the store with a WindowStore so old keys are purged and the state stays bounded.
Does exactly-once already deduplicate records for me?
No. Exactly-once makes the consume-process-produce cycle atomic so each input record is processed once, but it cannot tell that two distinct records at two offsets represent the same logical event. The idempotent producer has the same blind spot: it only dedupes retries within one producer session, not the same event from two producers or after a restart.
Why must the Kafka Streams dedup store be windowed?
A plain KeyValueStore remembers every key forever, so for a high-cardinality stream it grows without bound, its changelog grows with it, and state restore after a rebalance slows down. A WindowStore retains keys only for a configured retention period and drops the rest, which matches how dedup works in practice: the same event won't legitimately reappear after N minutes or hours.
How do I choose the dedup key for Kafka Streams deduplication?
Use a key that is stable across the duplicate copies: an event id or idempotency key assigned at the source and carried in the payload, not the Kafka record key. A key generated inside your pipeline (a fresh UUID or timestamp) defeats dedup because each copy gets a different one; if the upstream doesn't provide a stable id, fix that first.
Where should deduplication live in a Kafka Streams pipeline?
Use layers: the idempotent producer catches retry duplicates within one session, the dedup processor catches genuinely-duplicate records by business key within a window, and an idempotent sink (an upsert keyed by business id) is the last line that protects the system of record. Reach for the stateful dedup processor specifically when you need a de-duplicated topic that other consumers read, or when the sink can't be made idempotent.
See it in practice with Conduktor
A dedup processor is backed by a real state store with a real changelog topic, and the duplicates you're chasing live in real topics. Conduktor Console lets you inspect the input and output topics to confirm the processor is actually collapsing duplicates, watch the size and compaction of the dedup store's changelog, and monitor consumer group lag and restore progress after a rebalance, so you can tell whether the dedup window is sized right or quietly growing without bound.
Next steps
- Kafka Streams Processor API:
process(), punctuators, and manual state store access - Why you still see duplicates: when EOS is on but duplicates persist
- State stores: RocksDB, changelogs, and bounding the state this pattern creates