Testing Kafka Streams applications

How to test Kafka Streams: TopologyTestDriver basics, the caching gap that makes tests differ from prod, windowing tests, and CI topology diffs.

Learn how to test a Kafka Streams topology, and what your tests will lie to you about.

Kafka Streams is testable without a broker. TopologyTestDriver runs your real topology in-process, synchronously, in milliseconds, you pipe records in, read records out, no cluster, no rebalancing, no waiting. It is the right default for almost every Streams test. It also behaves differently from production in one specific, well-documented way, and if you don't know that, you'll write tests that pass on output prod will never produce.

This page covers the fast path, the one trap that catches everyone, and how to turn describe() into a CI guardrail against the positional-name shifts that break stateful deploys.

What you'll learn:

  • How TopologyTestDriver with TestInputTopic/TestOutputTopic works, and why it's synchronous
  • The caching gap that makes the test driver emit more updates than prod
  • How to test windowed and suppress-ed logic by advancing time deliberately
  • What the test driver can't catch, and when to reach for Testcontainers

The fast path: TopologyTestDriver

TopologyTestDriver takes a built Topology and the same Properties your app uses, then exposes typed input and output topics. The ergonomic TestInputTopic/TestOutputTopic API arrived in KIP-470; use it rather than the older pipeInput/readOutput calls.

Topology topology = WordCountApp.buildTopology();

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); // never contacted
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
    TestInputTopic<String, String> in =
        driver.createInputTopic("in", new StringSerializer(), new StringSerializer());
    TestOutputTopic<String, Long> out =
        driver.createOutputTopic("out", new StringDeserializer(), new LongDeserializer());

    in.pipeInput("a", "hello world");
    in.pipeInput("b", "hello kafka");

    assertEquals(Map.of("hello", 2L, "world", 1L, "kafka", 1L), out.readKeyValuesToMap());
}

The bootstrap.servers value is never dialed, the driver simulates the broker, and since Kafka 2.8 (KIP-680) you can omit it and application.id entirely; the driver fills in dummy values, and passing your app's real Properties is just convenient. Everything runs on the calling thread: pipeInput processes the record through the entire topology before it returns, so there's nothing to await and no flakiness. For Avro, point the value serde at a MockSchemaRegistryClient via a mock:// URL so SpecificAvroSerde resolves schemas in-memory instead of hitting a real registry, see serde errors for why a real Schema Registry wire format trips up tests.

The caching gap: why tests see what prod won't

Here is the difference that turns a green test suite into a false sense of safety.

In production, a stateful operation does not emit a downstream record for every input. Kafka Streams has a record cache (sized by cache.max.bytes.buffering/statestore.cache.max.bytes) and a commit interval (commit.interval.ms). Updates to the same key are coalesced in the cache and only forwarded when the cache flushes or the commit fires. A key updated 100 times between flushes emits one downstream update with the latest value.

TopologyTestDriver has no record cache, and it commits after every input record. So the same aggregation that emits one coalesced update in prod emits one update per input record under the test driver.

Your test sees more intermediate updates than prod, and prod sees fewer than your test. A windowed count fed three records for the same key emits three running values (1, 2, 3) in TopologyTestDriver, but in production the cache may collapse those into a single emitted 3. If you assert on the exact stream of intermediate values, your test passes on behavior the cache hides in prod, and conversely, a prod consumer never sees the noisy interim records your test asserts on. Assert on final, converged values (read the whole output to a map; check the last value per key), not on every intermediate emission.

This is not a bug in the test driver, it's the cost of being deterministic and synchronous. But it means a test is not a faithful simulation of emission timing. It tells you the topology computes the right final answer; it does not tell you how many times prod will emit along the way. That second question only has a real answer against a real broker.

Testing windows and suppress: advance time on purpose

The test driver gives you full control of record timestamps, and for windowed logic you need it. A window closes when stream time, the maximum record timestamp the task has seen, passes the window end plus the grace period. The driver never advances stream time on its own; it only moves when you pipe a record with a later timestamp. So a window stays open until you push it shut.

This is exactly where suppress(untilWindowCloses(...)) tests go wrong: the suppress buffer only emits the final result after the window closes, and the window only closes when a later-timestamped record arrives. Pipe three records into a window and read the output and you get nothing, not because suppress is broken, but because stream time never moved past the window. The fix in a test is to pipe one more record with a timestamp beyond the window + grace:

// topology windows with: .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
Instant t0 = Instant.parse("2026-01-01T00:00:00Z");
// three events inside a 1-minute tumbling window
in.pipeInput("user-1", "a", t0);
in.pipeInput("user-1", "b", t0.plusSeconds(10));
in.pipeInput("user-1", "c", t0.plusSeconds(20));

assertTrue(out.isEmpty(), "suppress holds the result until the window closes");

// advance stream time past window end (+grace) to flush the suppressed result
in.pipeInput("user-1", "d", t0.plusSeconds(120));

KeyValue<String, Long> finalCount = out.readKeyValue();
assertEquals(3L, finalCount.value); // a, b, c, _10_ lands in the next window

The arithmetic only works because grace is zero: with ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofHours(24)) the record at t0+120s would not close the window, and readKeyValue() would fail on an empty topic. (The legacy TimeWindows.of() with its hidden 24-hour default grace is removed in Kafka 4.x, so modern code always states grace explicitly.) The same trick, explicitly stepping stream time forward, is how you test grace periods, session-window merges, and anything that fires on window close. If your suppress emits nothing in production (not just in a test), the cause is the same mechanism but a different trigger: a quiet partition that never advances stream time. That's its own deep dive, see why suppress() emits nothing, and windowing covers stream time versus wall-clock in full.

What the test driver won't catch

TopologyTestDriver validates your topology's logic. It does not validate the things that depend on a real cluster:

  • Missing source topics. The driver creates input/output topics on demand. In production, reading from a topic that doesn't exist is a startup failure the driver never reproduces.
  • Real serde and Schema Registry behavior. A mock:// registry isn't the real wire format, real subject-compatibility rules, or a registry that's momentarily down. The driver also won't surface a ClassCastException from a mismatched default serde the way a real run does under load.
  • Rebalancing, restore, and standby replicas. There's no consumer group, so slow rebalances and state restore time, the failures that actually page you, are invisible. The driver restores no changelog because there's no broker to hold one.
  • The caching/commit timing described above.

For those, you need a real broker. Testcontainers spins up a throwaway Kafka in Docker for the test, so you can run the full app, produce to real topics, and assert on real consumed output, at the cost of seconds per test instead of milliseconds. Use it for a handful of integration tests (serde round-trips against a real registry, a real restart-and-restore) and keep the bulk of your coverage in the fast TopologyTestDriver tier.

Snapshot the topology to guard deploys

The most valuable test in a stateful Streams project isn't about output at all, it's a compatibility check. Internal store, changelog, and repartition topic names are positional: insert or reorder one operator and every downstream name shifts, orphaning the old state. (The full mechanism, and the naming discipline that prevents it, is in topology evolution.)

You can catch that shift before it ships. topology.describe() renders the entire wiring, including the autogenerated names:

Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [in])
    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
    Sink: KSTREAM-SINK-0000000004 (topic: out)

(Trimmed for readability: the real output starts with a Topologies: header and adds -->/<-- wiring lines after every node. Your committed snapshot will contain all of it.)

Snapshot that string to a committed file and assert against it:

@Test
void topologyMatchesSnapshot() throws Exception {
    String actual = MyApp.buildTopology().describe().toString();
    String expected = Files.readString(Path.of("src/test/resources/topology.txt"));
    assertEquals(expected, actual); // fails the build if any internal name moved
}

When a pull request shifts a ...0000000001 to ...0000000002, this test fails with a diff, and a reviewer decides whether the change is a safe rolling deploy or needs the application reset tool. The decision moves into code review, where it's cheap, instead of into a restore failure at 3am, where it isn't. Kafka 4.3 adds a built-in, stricter version of this guardrail: set ensure.explicit.internal.resource.naming=true and the topology refuses to build at all if any internal store, changelog, or repartition topic is unnamed.

How do I unit test a Kafka Streams topology?

Use TopologyTestDriver, which runs your real topology in-process and synchronously with no broker. Feed records through a TestInputTopic and read results from a TestOutputTopic; pipeInput processes each record through the whole topology before it returns, so there's nothing to await and no flakiness.

What is TopologyTestDriver and how do I use it?

TopologyTestDriver takes a built Topology plus the same Properties your app uses and exposes typed input and output topics via createInputTopic/createOutputTopic. The bootstrap.servers value is never dialed, the driver simulates the broker, and since Kafka 2.8 you can omit it and application.id entirely, and the modern TestInputTopic/TestOutputTopic API from KIP-470 is preferred over the older pipeInput/readOutput calls.

Why does my Kafka Streams topology behave differently in TopologyTestDriver than in production?

The test driver has no record cache and commits after every input record, so a stateful operation emits one update per input, while production coalesces updates in the cache and emits fewer. Assert on final, converged values (read the whole output to a map, check the last value per key), not on every intermediate emission, or your test will pass on behavior the cache hides in prod.

How do I test windowed operations and suppress in Kafka Streams?

The driver only advances stream time when you pipe a record with a later timestamp, and a window closes when stream time passes the window end plus grace. So pipe your in-window events, then pipe one more record with a timestamp beyond window-end-plus-grace to push the window shut and flush a suppress(untilWindowCloses(...)) result, otherwise the output is empty because stream time never moved.

When should I use Testcontainers instead of TopologyTestDriver?

Use Testcontainers (or EmbeddedKafka) for the things that need a real cluster: missing source topics, real Schema Registry wire format and compatibility, rebalancing, state restore, and the real caching/commit timing the driver can't reproduce. Keep those to a handful of integration tests at seconds each, and keep the bulk of coverage in the fast TopologyTestDriver tier.

See it in practice with Conduktor

Once a tested topology is deployed, Conduktor Console shows the real changelog and repartition topics it created and the consumer group lag behind them, the production signals your test driver can't simulate. It's how you confirm that the behavior your tests proved on a final, converged result actually holds against a real broker, and that a restore finished after a deploy. Conduktor observes the Kafka your Streams app runs on; it doesn't replace your test suite.

Next steps