# Kafka Streams windowing

*Learn how windows work, and why they close when you don't expect.*

A plain aggregation runs forever, the count of orders per customer has no end. Windowing carves that endless stream into bounded buckets of time: orders per customer *per hour*, errors per service *per five minutes*. You group as usual, then add a window, and Kafka Streams keeps a separate running result per key per bucket.

The window *types* are the easy part, four of them, each a one-liner. The hard part, and the source of nearly every windowing bug filed against Kafka Streams, is *when a window closes*. It is not the wall clock. It is **stream-time**, and stream-time only moves when new records arrive. That single fact explains the idle-stream-never-emits problem, the flood of warnings when you reprocess, and why Kafka Streams refuses to implement watermarks at all.

**What you'll learn:**
- The four window types, tumbling, hopping, sliding, session, and when to use each
- Grace periods, and why newer windowed operators default to *no* grace
- Why windows close on stream-time, not wall-clock, the root of most windowing confusion
- Why Kafka Streams rejects watermarks in favour of continuous refinement

## The four window types

You window a grouped stream with `.windowedBy(...)`, then aggregate as normal. The result is keyed by a `Windowed<K>`, the original key *plus* the window it belongs to.

### Tumbling, fixed, non-overlapping

Adjacent, equal-size, gap-free buckets. Every record falls into exactly one window. The default choice for "per hour" / "per minute" rollups.

```java
KTable<Windowed<String>, Long> hourly = events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(5)))
    .count();
```

### Hopping, fixed size, overlapping

A window of size *S* that advances by a smaller step (the "hop" or "advance"). Windows overlap, so a record can land in several. Use it for sliding-style metrics on a fixed grid, "5-minute count, updated every minute."

```java
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
    .advanceBy(Duration.ofMinutes(1)); // hop = 1 min → each record in up to 5 windows
```

A smaller hop means more overlapping windows, more keys, and more state. A hop equal to the size *is* a tumbling window.

### Sliding, windows defined by the data, not a grid

`SlidingWindows` creates windows of a fixed size, but anchored on actual record timestamps rather than a fixed grid, so two records are aggregated together exactly when they fall within the window size of each other. Fewer, more meaningful windows than hopping for "events within *N* minutes of each other," at the cost of more complex semantics.

```java
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(
    Duration.ofMinutes(5), Duration.ofMinutes(1)))
```

### Session, windows defined by gaps of inactivity

A session window grows to include records while they keep arriving, and closes after an *inactivity gap* with no new records for that key. Sessions for the same key that come close enough together merge. The natural fit for user activity: a "browsing session" that ends when the user goes quiet.

```java
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
    Duration.ofMinutes(30), Duration.ofMinutes(5)))
.count();
```

| Window | Shape | Overlap | Use when |
|---|---|---|---|
| Tumbling | Fixed grid, size = step | No | Per-hour / per-minute rollups |
| Hopping | Fixed grid, size > step | Yes | Sliding metric on a fixed grid |
| Sliding | Anchored on record times | Yes | "Events within N of each other" |
| Session | Grows until an inactivity gap | No | User sessions, bursty activity |

## Grace: how late is too late

Records arrive out of order. A record stamped 10:59 can show up at 11:02, after its 10:00–11:00 window looks finished. The **grace period** is how long Kafka Streams keeps a window open for stragglers after its end time. Within grace, a late record still updates its window; past grace, the record is dropped as expired.

```java
TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(10));
// window 10:00–11:00 accepts late records until stream-time reaches 11:10
```

> **Set grace deliberately, don't rely on remembered defaults.** The current factory methods (`ofSizeAndGrace`, `ofTimeDifferenceAndGrace`, `ofInactivityGapAndGrace`) make grace an explicit, required argument, so you choose it, including choosing none, with `ofSizeWithNoGrace`. The default shifted across versions: the original `TimeWindows.of(size)` derived a large, retention-based grace (on the order of a day), while the newer explicit factories force the decision on you. Match grace to how late your data realistically arrives, and don't carry an old version's remembered default into a new one.

## Where the record timestamp comes from

Stream-time is built from *record timestamps*, so it's worth knowing which timestamp that is. By default Kafka Streams uses the `FailOnInvalidTimestamp` extractor, which reads the timestamp Kafka embedded in the record, and that means the topic's timestamp type decides your time semantics:

- `CreateTime` (the default) is the **producer's** timestamp → event-time semantics, with all the out-of-order and lateness concerns this page is about.
- `LogAppendTime` is the **broker's** ingestion time. It's monotonic per partition, so records effectively *cannot* be late under `LogAppendTime`, there's no out-of-order to be late against. It also means you've thrown away true event time.

You can override the extractor, via `default.timestamp.extractor` or per-stream with `Consumed.withTimestampExtractor(...)`:

- `WallclockTimestampExtractor` ignores the record and returns `System.currentTimeMillis()`, pure processing-time, so windows close on the wall clock no matter what the data says.
- A **custom `TimestampExtractor`** pulls event-time out of the message *payload*, the right move when the meaningful time is a field inside the event, not when it was produced or appended.
- For invalid (negative) timestamps the built-ins differ: `FailOnInvalidTimestamp` throws, `LogAndSkipOnInvalidTimestamp` drops the record, and `UsePartitionTimeOnInvalidTimestamp` substitutes the last good timestamp from that partition.

Pick the wrong extractor and every window below is bucketed by the wrong clock, so settle this before you reason about grace and stream-time.

## The thing that actually confuses everyone: stream-time

Windows do not close because your clock reached 11:05. They close because **stream-time** passed the window's end plus its grace. Understanding stream-time is the whole game.

Stream-time is Kafka Streams' internal notion of "how far through the data we are." It is defined as the **maximum record timestamp the task has seen so far**, and it is *monotonic*, it only moves forward, and it only moves when a new record arrives with a timestamp later than the current maximum. It is driven entirely by the data. It has nothing to do with wall-clock time.

So a window for 10:00–11:00 with 5 minutes of grace closes at the moment a record arrives whose timestamp is ≥ 11:05. Not when your server's clock hits 11:05, when a *record* carrying that timestamp flows through the task.

Stream-time is a property of the **task**, shared across every key that task processes, it is *not* tracked per key. So one busy key advancing stream-time also closes a quiet key's window; only when the task's whole input goes quiet does nothing close. (Per-key time tracking has been proposed upstream but never adopted, so per-task is the model you design around.)

### Why an idle stream never emits its final window

This is the single most-reported windowing surprise, and it follows directly from the definition. If no new records arrive, stream-time does not advance. If stream-time does not advance, the window never reaches end + grace. If the window never closes, any logic waiting for it to close, most importantly [`suppress(untilWindowCloses(...))`](https://www.conduktor.io/kafka-streams/suppress-not-emitting), emits **nothing**.

Picture a low-traffic partition: the last event of the hour arrives at 10:58, then the stream goes quiet. Stream-time is stuck at 10:58. The 10:00–11:00 window will not close until *the next record after 11:05* shows up, which on a quiet partition could be minutes or hours later, or never. The final result you were waiting for simply never fires. Teams hit this, conclude `suppress` is broken, and reach for the notorious workaround of injecting a dummy future-timestamped event just to shove stream-time forward.

It is not broken. It is doing exactly what stream-time semantics require. The detailed symptom-to-cause checklist, sparse partitions, one lagging partition holding back a task, test harnesses that never advance time, buffer config, lives in [the suppress() trap](https://www.conduktor.io/kafka-streams/suppress-not-emitting).

> 🚫 *"My windowed aggregation stopped emitting, so the app is stuck."*
> More likely the stream went quiet and stream-time stopped advancing. Windows close on stream-time, and stream-time is the max record timestamp seen, no new records, no progress, no window close.

### Why reprocessing floods you with "skipping expired" warnings

The same mechanism explains the opposite failure. When you reset an app and reprocess a topic from the beginning, the first records you read are *old*, hours or days in the past. The very first record sets stream-time to its timestamp; but as you race through history, stream-time leaps forward fast. A record from Tuesday read just after a record from Friday is now far older than `stream-time − grace`, so it is dropped as expired, and Kafka Streams logs a warning for each one. Replaying a large topic can therefore produce a torrent of "skipping record for expired window" messages. They are expected during a full reprocess, not a sign of corruption; the records being skipped are genuinely outside any open window given how quickly stream-time advanced. The warning itself is highly diagnostic, it includes the topic, partition, offset, record timestamp, window range, and current stream-time, and each drop increments the task-level `dropped-records-total` metric, which is worth alerting on in normal operation since expired drops are otherwise silent data loss.

## Why Kafka Streams doesn't use watermarks

If you come from Flink, you expect *watermarks*: a special marker the system propagates to declare "no more events older than T will arrive," which triggers windows to fire. Kafka Streams deliberately does **not** do this. It uses stream-time plus **continuous refinement** instead.

The reasoning, laid out in the Apache Kafka design (KIP-328) and Confluent's writing on the topic, is that watermarks force a single, up-front decision about completeness and a one-shot trigger: the window fires once, late data after the watermark is awkward to handle, and you must guess the watermark delay correctly. Kafka Streams takes the opposite stance, a windowed aggregation emits an *updated result every time the window changes* (modulated by the record cache, as covered in [aggregations](https://www.conduktor.io/kafka-streams/aggregations)), and the grace period defines exactly when it stops accepting changes. There is no watermark to tune and no single firing moment; the result is continuously correct-so-far and is finalized when grace expires. `suppress` is the opt-in layer you add *on top* when you want one emission per window instead of the continuous stream.

The trade-off is real and worth stating plainly: continuous refinement means downstream consumers must tolerate seeing intermediate results, and getting a single clean result per window requires `suppress`, which reintroduces the stream-time dependency above. Neither model is free; they move the difficulty to different places.

## A complete windowed example

Putting it together, count events per key per hour, with 5 minutes of grace, and write the closed-ish results to a topic. Note how the windowed key is unpacked back to the plain key on the way out:

```java
events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(5)))
    .count(Materialized.as("events-per-hour"))            // KTable<Windowed<String>, Long>
    .toStream()
    .to("hourly-counts",
        Produced.with(
            WindowedSerdes.timeWindowedSerdeFrom(String.class, Duration.ofHours(1).toMillis()),
            Serdes.Long()));

// If you only want the original key downstream, map the Windowed<K> back to K:
// .toStream((Windowed<String> wk, Long count) -> wk.key())
```

Two practicalities. The result key is a `Windowed<String>`, so serializing it needs a windowed serde, a common source of [serde errors](https://www.conduktor.io/kafka-streams/serdes) if you forget. The serde takes the window size because the serialized key only carries the window *start*; the end is reconstructed from the size at read time (the size-less `timeWindowedSerdeFrom(Class)` overload was deprecated by KIP-659 and removed in Kafka 4.0+). And without `suppress`, this emits the continuous stream of per-window updates described above, not one row per window.

**What are the four window types in Kafka Streams?**

Tumbling (fixed, non-overlapping buckets on a grid), hopping (fixed size that advances by a smaller step, so windows overlap), sliding (fixed size anchored on actual record timestamps), and session (grows while records keep arriving and closes after an inactivity gap). You apply one with `.windowedBy(...)` after grouping.

**What is the difference between a tumbling and a hopping window?**

A tumbling window has its size equal to its advance step, so every record lands in exactly one window with no overlap. A hopping window advances by a smaller step than its size, so windows overlap and a record can land in several, a hop equal to the size is just a tumbling window.

**What is a grace period and how does it handle late records?**

The grace period is how long Kafka Streams keeps a window open for out-of-order stragglers after its end time. Within grace a late record still updates its window; past grace the record is dropped as expired. Newer factory methods like `ofSizeAndGrace` make grace an explicit, required choice.

**Why does my windowed aggregation stop emitting on a quiet stream?**

Windows close on stream-time, not the wall clock, and stream-time is the maximum record timestamp the task has seen. If no new records arrive, stream-time does not advance, the window never reaches end-plus-grace, and anything waiting on the close, like `suppress(untilWindowCloses(...))`, emits nothing.

**Why does reprocessing flood my logs with "skipping expired" warnings?**

When you reset and replay a topic from the beginning, stream-time leaps forward as you race through old records. A record read just after a much newer one is now older than `stream-time − grace`, so it is dropped as expired and logged, expected during a full reprocess, not corruption.

> **See it in practice with Conduktor**
> A windowed aggregation creates a changelog topic whose keys carry the window boundaries, and it can grow large when windows or grace are wide. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) lets you inspect that changelog topic, see its size and compaction, and watch the consumer group lag, useful for telling whether a window simply hasn't closed yet versus a real processing backlog.

## Next steps

- [The suppress() trap](https://www.conduktor.io/kafka-streams/suppress-not-emitting), get one result per window, and why it emits nothing on quiet streams
- [Aggregations](https://www.conduktor.io/kafka-streams/aggregations), count, reduce, and aggregate, and the continuous-update model
- [Joins](https://www.conduktor.io/kafka-streams/joins), windowed stream-stream joins and the co-partitioning requirement
