# Kafka Streams Processor API and punctuators

*Learn when to drop below the DSL, and how.*

The Kafka Streams DSL, `map`, `filter`, `groupBy`, `join`, covers most stream processing cleanly. Then you hit a problem it can't express: emit a record only if a follow-up event *doesn't* arrive within 30 seconds, deduplicate on a business key, route different records to different topics by content, or flush an aggregate on a wall clock instead of on input. For those, you drop one layer down to the **Processor API**: the imperative core that the DSL itself is built on.

This is not a lower-level dialect of the same thing. The Processor API hands you three capabilities the DSL deliberately hides: direct read/write access to a state store, explicit control over where each record goes (`context.forward()`), and a scheduled callback that fires on a timer (`context.schedule()` with a `Punctuator`). The cost is that you now own correctness, there is no operator doing the right thing for you.

**What you'll learn:**
- What `process()`, `Processor`, and `ProcessorContext` give you over the DSL
- How to attach a state store and read and write it directly
- How `context.forward()` routes records and how punctuators fire on a schedule
- The `STREAM_TIME` vs `WALL_CLOCK_TIME` distinction that decides whether your timer ever fires

## Where the DSL ends

The DSL is a closed set of operators. Each one is stateless or wires up its own state store and decides for you when to emit. That is exactly what you want until your requirement doesn't map onto any operator. Reach for the Processor API when you need:

- **Custom windowing or timeouts**, "alert if a session hasn't closed in N minutes", "expire a pending order after a deadline". The DSL's windows close on stream-time; a timeout on a *quiet* key needs a wall-clock timer the DSL doesn't expose.
- **Deduplication on a business key**, there's no `dedup()` operator. You keep seen-keys in a store and drop repeats yourself (see [deduplication](https://www.conduktor.io/kafka-streams/deduplication)).
- **Content-based routing to many outputs**, `split()`/`branch()` handles fixed predicates; dynamic "send to the topic named in this field" wants `context.forward()` to a chosen child.
- **Manual emit timing**, buffer updates and flush them on a timer rather than per-record or on cache eviction.

If your logic *does* fit the DSL, stay there. The DSL gets co-partitioning, repartition topics, store naming, and exactly-once wiring right for free. The Processor API makes all of that your job.

> **DSL or Processor API, the decision.** Default to the DSL. Drop to the Processor API for a *specific* operator that the DSL can't express, and use `process()` / `processValues()` to splice it into an otherwise-DSL topology, you do not have to rewrite the whole app in one style. Reserve a fully hand-built `Topology` for the rare case where the entire graph is custom. Mixing is the norm, not a compromise.

## process(), Processor, and ProcessorContext

A `Processor<KIn, VIn, KOut, VOut>` is a small object with three methods: `init`, `process`, and `close`. You splice it into a DSL pipeline with `process()` (or `processValues()` when you only touch the value and want to keep the key for repartition purposes):

```java
KStream<String, Order> orders = builder.stream("orders");

orders.process(
    () -> new DedupProcessor(),   // ProcessorSupplier: a fresh instance on every get()
    "dedup-store"                 // names of state stores this processor connects to
);
```

The `ProcessorContext` you receive in `init` is the seam to the runtime. Through it you reach the connected state stores, forward records downstream, schedule punctuators, read record metadata (`recordMetadata()` for topic/partition/offset), and commit (`context.commit()` *requests* a commit; the runtime decides when).

Since Kafka 3.3 (KIP-820), `process()` returns a `KStream` and chains like any other DSL operator: whatever you `context.forward()` flows straight into the next step, so `.to(...)` works right after it. The real difference between the two: `process()` may change the key, so a downstream stateful operation triggers a repartition; `processValues()` fixes the key and avoids it.

## Connecting and using a state store

A processor's store is not implicit. You build and register it, then connect it by name. The store is local, partitioned, and (if persistent) changelog-backed exactly like any DSL store, see [state stores](https://www.conduktor.io/kafka-streams/state-store).

```java
// 1. Build the store and add it to the topology
StoreBuilder<KeyValueStore<String, Long>> dedupStore =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("dedup-store"),
        Serdes.String(),
        Serdes.Long());
builder.addStateStore(dedupStore);

// 2. In the processor, fetch it by name in init()
public class DedupProcessor implements Processor<String, Order, String, Order> {
    private ProcessorContext<String, Order> context;
    private KeyValueStore<String, Long> store;

    @Override
    public void init(ProcessorContext<String, Order> context) {
        this.context = context;
        this.store = context.getStateStore("dedup-store");
    }

    @Override
    public void process(Record<String, Order> record) {
        Long seenAt = store.get(record.key());
        if (seenAt != null) {
            return;                          // duplicate: drop it, emit nothing
        }
        store.put(record.key(), record.timestamp());
        context.forward(record);             // novel key: pass it on
    }

    @Override public void close() {}
}
```

This unbounded store will grow forever, the missing piece is a punctuator that purges old keys. That's the next section, and it's also the difference between a toy and a production processor.

## context.forward() for custom routing

In the DSL, where a record goes is implicit in the operator chain. In a processor you say it. `context.forward(record)` sends to all downstream children; you can also forward different records to different named children for content-based routing:

```java
if (order.amount() > LARGE_THRESHOLD) {
    context.forward(record, "to-review");
} else {
    context.forward(record, "to-auto-approve");
}
```

You can forward zero records (a filter), one (a map), or many (a flatMap) per input, and you can forward from inside a punctuator, not only from `process()`. That last point is what makes timed emission possible. One caveat: `recordMetadata()` is empty during a punctuation, because there is no current record; guard it before use.

## context.schedule() and punctuators

A `Punctuator` is a callback you register with `context.schedule(interval, type, punctuator)`. It fires on a schedule independent of input records, which is the whole reason to use it, it lets a processor act when *nothing* is arriving. The `PunctuationType` decides what "schedule" means, and getting this wrong is the most common Processor API mistake.

| `PunctuationType` | Fires when | Fires on idle input? | Use it for |
|---|---|---|---|
| `STREAM_TIME` | Stream-time advances past the interval; stream-time is the highest timestamp of records the task has processed | **No.** It freezes the moment input stops | Logic that must track event time (event-time windowing, watermark-like flushes) |
| `WALL_CLOCK_TIME` | The system clock passes the interval, evaluated on each `poll()` loop, regardless of input | **Yes** | Timeouts, periodic flush, purging state on quiet streams |

The trap: `STREAM_TIME` is driven only by records the task actually processes. Stream-time is the *maximum* timestamp seen so far, and it freezes entirely when input stops, so on an idle stream your `STREAM_TIME` punctuator **never fires**. This is the same mechanism that makes [suppress() emit nothing on idle input](https://www.conduktor.io/kafka-streams/suppress-not-emitting). (A single quiet partition does not stall it under default config: `max.task.idle.ms` defaults to 0, and a positive value only makes the task wait up to that bound.) If your job is "flush or time out even when the stream is silent", you need `WALL_CLOCK_TIME`.

Here's the dedup processor completed with a wall-clock punctuator that expires keys older than a TTL, so the store stays bounded even if some keys never reappear:

```java
@Override
public void init(ProcessorContext<String, Order> context) {
    this.context = context;
    this.store = context.getStateStore("dedup-store");

    // Fire on the wall clock so purging happens even when input is quiet.
    this.cancellable = context.schedule(
        Duration.ofMinutes(1),
        PunctuationType.WALL_CLOCK_TIME,
        this::purgeExpired);
}

private void purgeExpired(long now) {
    long cutoff = now - TTL_MS;
    try (KeyValueIterator<String, Long> all = store.all()) {
        while (all.hasNext()) {
            KeyValue<String, Long> entry = all.next();
            if (entry.value < cutoff) {
                store.delete(entry.key);
            }
        }
    }
}
```

Note `now` passed to the punctuator is wall-clock millis here; under `STREAM_TIME` it would be the current stream-time instead. Two scheduling surprises: the first `STREAM_TIME` fire happens on the first record processed, without waiting out the initial interval, and a stream-time jump across several intervals produces a single punctuation at the new time, not one call per missed boundary. The `schedule` call returns a `Cancellable`, hold onto it.

> 🚫 *"The punctuator runs on its own timer thread, so a slow one is harmless."*

A punctuator runs **inline on the stream thread**, in the same loop as `process()`, the two are never concurrent. So a slow punctuator doesn't just delay itself: it blocks record processing, holds up the consumer `poll()`, and delays commits. Iterate too long over a large store and you can blow `max.poll.interval.ms` and trigger a [rebalance](https://www.conduktor.io/kafka-streams/rebalancing). Keep punctuator work cheap, evict in bounded batches, and push heavy work off-thread rather than doing it in the callback. (This is also why wall-clock punctuation is "best effort": its granularity is bounded by how long a processing iteration takes.)

## Cancel punctuators in close()

A scheduled punctuator is tied to the task: its punctuation queue is a per-task field, dropped with the task when it closes, so an uncancelled punctuator cannot outlive a [rebalance](https://www.conduktor.io/kafka-streams/rebalancing) or fire against a store the task no longer owns. Cancel in `close()` anyway: it is the mirror of `init()`, and it is the only way to stop a conditional or re-scheduled punctuator inside a live task:

```java
@Override
public void close() {
    if (cancellable != null) {
        cancellable.cancel();
    }
}
```

`close()` runs when the task is revoked or the app shuts down. Treat it as the mirror of `init()`: release schedules and any resources you opened, and do *not* close the state store yourself, the runtime owns its lifecycle.

## A note on manual commit

Under the default at-least-once guarantee, `context.commit()` lets a processor *request* an offset commit at a logical boundary, for example after a punctuator has flushed a batch downstream. It is a request, not an immediate commit; the runtime still commits on its own interval. Under exactly-once it still works: `commit()` requests closing the current transaction earlier than `commit.interval.ms` (which defaults to 100ms under EOS), but the transaction boundary remains the runtime's, so there is even less reason to call it (see [exactly-once](https://www.conduktor.io/kafka-streams/exactly-once)). Don't reach for the Processor API *just* to control commits, reach for it for state, routing, or timers, and let the processing guarantee handle atomicity.

**When should I use the Processor API instead of the DSL in Kafka Streams?**

Default to the DSL and drop to the Processor API for a *specific* requirement it can't express: custom windowing or timeouts on quiet keys, deduplication on a business key, content-based routing to dynamic outputs, or buffering and flushing on a timer. The DSL gets co-partitioning, repartition topics, store naming, and exactly-once wiring right for free, so only take on that responsibility where you need the control.

**Do I have to rewrite my whole Kafka Streams app to use the Processor API?**

No. Splice a single processor into an otherwise-DSL topology with `process()` or `processValues()` and keep the rest in the DSL. Mixing the two is the norm; reserve a fully hand-built `Topology` for the rare case where the entire graph is custom.

**How do I access a state store directly from a processor?**

Build the store with a `StoreBuilder` and register it via `builder.addStateStore(...)`, then in the processor's `init()` fetch it by name with `context.getStateStore("store-name")`. The store is local, partitioned, and changelog-backed exactly like any DSL store; connect its name when you call `process(...)`.

**How do I schedule a punctuator in Kafka Streams?**

Call `context.schedule(interval, type, punctuator)` in `init()` and hold the returned `Cancellable` so you can cancel it in `close()`. A punctuator fires on a schedule independent of input records, which is what lets a processor act when nothing is arriving, for example purging expired keys from a state store.

**What is the difference between STREAM_TIME and WALL_CLOCK_TIME punctuation?**

`STREAM_TIME` advances only with records the task actually processes, it is the maximum timestamp seen so far, so it freezes when input stops and the punctuator never fires on an idle stream. `WALL_CLOCK_TIME` fires off the system clock on each `poll()` regardless of input, so use it for timeouts, periodic flushes, and purging state on quiet streams; both run inline on the stream thread, so keep the work cheap.

> **See it in practice with Conduktor**
> A Processor API app is still a consumer group writing to internal topics. When you add a manual state store, Kafka Streams creates its changelog topic behind the scenes. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) lets you find that changelog, watch its size and compaction, and track the consumer group lag that tells you whether a punctuator-driven flush is keeping up or falling behind.

## Next steps

- [Deduplication in Kafka Streams](https://www.conduktor.io/kafka-streams/deduplication), the canonical Processor API use case, end to end
- [State stores](https://www.conduktor.io/kafka-streams/state-store), RocksDB, changelogs, and the memory that bites
- [When suppress() emits nothing](https://www.conduktor.io/kafka-streams/suppress-not-emitting), the stream-time stall, the DSL version of the same trap
