Build your first Kafka Streams app
A runnable Kafka Streams Java example: the WordCount topology, the dependency and config you need, and the shutdown hook everyone forgets.
Get a real Kafka Streams app running.
The fastest way to understand Kafka Streams is to run one. This walks through WordCount, the canonical first app, as a complete Java program: the dependency, the minimal config, the topology, and how to start and (cleanly) stop it. Every piece here is the real thing, not pseudocode.
By the end you'll have an app that reads a topic of lines, counts words continuously, and writes the running counts to an output topic, and you'll understand the few config choices that decide whether it behaves in production.
What you'll learn:
- The one dependency you need, in Maven and Gradle
- The minimal configuration, and why
application.idis the most important line - The WordCount topology, operator by operator
- How to start the app, shut it down cleanly, and read its output
The dependency
Kafka Streams ships as a single JAR. Add it to a Java project, if you don't have one yet, start from Maven or Gradle.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.9.0</version>
</dependency> Gradle:
implementation 'org.apache.kafka:kafka-streams:3.9.0' kafka-streams pulls in kafka-clients transitively, so you don't add it separately. Add an SLF4J binding (such as slf4j-simple) so you see the library's logs, Kafka Streams is quiet by default, and during a rebalance or a state restore those logs are how you know what it's doing.
The minimal configuration
Four properties get a Streams app running. Two of them, the serdes and the offset reset, are convenience defaults; the other two are not optional.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); APPLICATION_ID_CONFIG, identifies your app. More on this below; it's the line that matters most.BOOTSTRAP_SERVERS_CONFIG, your Kafka cluster.- The default serdes, how records are serialized and deserialized. We default both to
Stringhere; the moment a value isn't a string you override it per operator (a frequent source of serde errors). AUTO_OFFSET_RESET_CONFIG=earliest, on first run there's no committed offset, and a plain consumer would default tolatestand wait for new records. Kafka Streams already overrides that default toearliestfor you (one ofStreamsConfig's built-in consumer overrides, in 3.9 and 4.x alike), so this line is documentation of intent rather than a behavior change. Still worth writing: it makes the first-run behavior explicit instead of buried in the library.
application.id is doing three jobs at once
application.id looks like a label. It isn't, it's an identity that Kafka Streams reuses for three different things:
- The consumer group id. All instances sharing an
application.idform one consumer group and split the partitions between them. This is how the app scales. - The prefix for every internal topic it creates, the changelog and repartition topics behind your state stores are all named
.-... - The local state directory name on disk, where RocksDB keeps its files.
Because one string drives all three, choosing it carelessly has real consequences:
Treat
application.idas a namespace, and never reuse one by accident. Two different apps pointed at the sameapplication.idwill join the same consumer group, fight over partitions, and write to each other's changelog and state directories, corrupting both. A copy-paste typo that collides with another app's id has caused exactly this in production. Make it unique and descriptive per app, and change it deliberately, knowing it resets the consumer group offsets and orphans the old internal topics.
The topology
Now the actual logic. WordCount reads lines, splits them into words, counts each word, and writes the counts out. Here's the whole topology, this runs as-is on Apache Kafka 3.9.0:
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("words-input")
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.filter((key, word) -> !word.isEmpty())
.groupBy((key, word) -> word)
.count(Materialized.as("word-counts"))
.toStream()
.to("words-output", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build(); Reading it operator by operator:
| Operator | What it does |
|---|---|
stream("words-input") | Reads the input topic as a KStream of lines |
flatMapValues(... split ...) | Splits each line into words, one input line becomes many word records |
filter(word not empty) | Drops the empty strings the regex split can produce |
groupBy((k, word) -> word) | Re-keys by the word itself, so all occurrences of a word group together |
count(Materialized.as("word-counts")) | Counts per word into a named state store, producing a KTable |
toStream() | Converts that running count table back into a stream of updates |
to("words-output", Produced.with(...)) | Writes out, telling Kafka the value is now a Long, not a String |
groupBy changes the key, which forces Kafka Streams to repartition the data through an internal topic so that every instance of a given word ends up on the same task, that's automatic, but it's why a stateful app creates topics behind your back. And the final Produced.with(Serdes.String(), Serdes.Long()) is the explicit, safe choice rather than strictly mandatory here: count() sets a Long serde on its materialization and Kafka Streams propagates it through toStream() into to(), so this exact topology happens to work without it. But the moment an operator that loses type information sits between the aggregation and to() (a map or mapValues, say), propagation breaks, the default String serde kicks back in, and you get a ClassCastException wrapped in a StreamsException at runtime. Always state the serde at the sink. Naming the store (
Materialized.as("word-counts")) is not cosmetic. An unnamed store gets a positional name derived from its place in the topology, which shifts the moment you insert an operator upstream, orphaning the state. The name also anchors the internal topics: this topology createsand-word-counts-repartition , all from that one string. Name your stores from day one. See evolving a topology.-word-counts-changelog
Start it, and shut it down cleanly
A Topology is just a description. To run it, wrap it in a KafkaStreams instance and start(). The part people forget is the other half: stopping cleanly.
KafkaStreams streams = new KafkaStreams(topology, props);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start(); start() kicks off the stream threads and returns immediately, the processing runs in the background. The shutdown hook is what makes it behave on Ctrl+C, a SIGTERM, or a Kubernetes pod eviction: streams.close() flushes state and commits offsets. It deliberately does not leave the consumer group: the member lingers until session.timeout.ms expires (45 seconds by default), so a quick restart doesn't trigger a rebalance. When an instance is going away for good, call close(CloseOptions) with leaveGroup(true) to remove it immediately.
Skip the hook and a hard kill leaves uncommitted offsets and unflushed state, so the next start re-restores from the changelog and may reprocess records. The shutdown hook is one line. It saves you a slow, confusing restart later.
Run it
With a local Kafka cluster on localhost:9092, create the topics, start the app, and feed it some lines:
# create the input and output topics
kafka-topics.sh --create --topic words-input --bootstrap-server localhost:9092
kafka-topics.sh --create --topic words-output --bootstrap-server localhost:9092
# produce a few lines (type, then Ctrl+D)
kafka-console-producer.sh --topic words-input --bootstrap-server localhost:9092
> the quick brown fox
> the lazy dog Then read the output. The values are Long, so the consumer needs the right deserializer and the keys printed:
kafka-console-consumer.sh --topic words-output \
--bootstrap-server localhost:9092 \
--from-beginning \
--property print.key=true \
--value-deserializer org.apache.kafka.common.serialization.LongDeserializer Don't panic when nothing appears for the first half-minute. With the default config the record cache holds updates and flushes them on commit, and commit.interval.ms defaults to 30 seconds, so the output topic stays empty until the first commit. Then everything arrives at once:
quick 1
the 2
brown 1
fox 1
lazy 1
dog 1 Note the shows up once, as 2. Both lines landed inside one commit interval, so the cache merged the intermediate the 1 into the final count before emitting. That's still the KTable doing its job: each output record is the current count for that word, not a one-time final answer. Send more lines after the next commit and the counts keep climbing as new records. To watch every single update flow through, the the 1 then the 2 most tutorials show, which reflects TopologyTestDriver semantics, not real-broker defaults, set statestore.cache.max.bytes to 0. (If you want a single final emission per window instead of running updates, that's windowing plus suppression; see aggregations.)
That's a complete Kafka Streams app: a dependency, five config lines, a topology, and a clean shutdown.
What Maven dependency do I need for Kafka Streams?
Add org.apache.kafka:kafka-streams (this page uses version 3.9.0). It pulls in kafka-clients transitively, so you don't add that separately; also add an SLF4J binding so you can see the library's logs during rebalances and state restores.
What is the application.id config and why does it matter?
application.id is an identity Kafka Streams reuses for three things: the consumer group id, the prefix for every internal topic it creates, and the local state directory on disk. Because one string drives all three, two apps sharing an application.id will fight over partitions and corrupt each other's state, so keep it unique per app.
Why does my Kafka Streams app read nothing on first run?
Usually not the offset reset: Kafka Streams overrides the consumer's latest default to earliest, so a fresh app reads the input topic from the beginning (a plain KafkaConsumer does not get this override). The likelier cause is the record cache plus the 30-second default commit.interval.ms: nothing reaches the output topic until the first commit. Wait out the interval, or set statestore.cache.max.bytes to 0 to see every update immediately.
Why do I need a shutdown hook in a Kafka Streams app?
streams.start() returns immediately and runs in the background, so a clean stop needs streams.close() wired to a shutdown hook. Without it, a hard kill leaves uncommitted offsets and unflushed state, forcing the next start to re-restore from the changelog and possibly reprocess records.
Why does my WordCount output show the same word with increasing counts?
An aggregation returns a KTable, so each output record is the current running count for that key, not a one-time final answer, the will appear as 1, then 2, and so on as more lines arrive. To collapse those updates into one result per time window instead of every update, you reach for windowing plus suppress(untilWindowCloses); the record cache only reduces how many updates you see, it never yields a single final value.
See it in practice with Conduktor
Once your first app is running, it shows up on the cluster as a consumer group plus a set of internal topics. Conduktor Console lets you watch
words-outputfill up, find thewordcount-app-...repartition and changelog topics it created, and check the consumer group's lag, the fastest way to confirm the app is actually keeping up.
Next steps
- Kafka Streams architecture, how a topology becomes tasks and threads
- Aggregations, go beyond count with reduce and aggregate
- State stores, what
Materialized.as(...)actually created