# Build your first Kafka Streams app

*Get a real Kafka Streams app running.*

The fastest way to understand Kafka Streams is to run one. This walks through WordCount, the canonical first app, as a complete Java program: the dependency, the minimal config, the topology, and how to start and (cleanly) stop it. Every piece here is the real thing, not pseudocode.

By the end you'll have an app that reads a topic of lines, counts words continuously, and writes the running counts to an output topic, and you'll understand the few config choices that decide whether it behaves in production.

**What you'll learn:**
- The one dependency you need, in Maven and Gradle
- The minimal configuration, and why `application.id` is the most important line
- The WordCount topology, operator by operator
- How to start the app, shut it down cleanly, and read its output

## The dependency

Kafka Streams ships as a single JAR. Add it to a Java project, if you don't have one yet, start from [Maven](https://www.conduktor.io/kafka/creating-a-kafka-java-project-using-maven-pom-xml) or [Gradle](https://www.conduktor.io/kafka/creating-a-kafka-java-project-using-gradle-build-gradle).

```xml
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.9.0</version>
</dependency>
```

Gradle:

```groovy
implementation 'org.apache.kafka:kafka-streams:3.9.0'
```

`kafka-streams` pulls in `kafka-clients` transitively, so you don't add it separately. Add an SLF4J binding (such as `slf4j-simple`) so you see the library's logs, Kafka Streams is quiet by default, and during a [rebalance](https://www.conduktor.io/kafka-streams/rebalancing) or a state restore those logs are how you know what it's doing.

## The minimal configuration

Four properties get a Streams app running. Two of them, the serdes and the offset reset, are convenience defaults; the other two are not optional.

```java
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
```

- **`APPLICATION_ID_CONFIG`**, identifies your app. More on this below; it's the line that matters most.
- **`BOOTSTRAP_SERVERS_CONFIG`**, your Kafka cluster.
- **The default serdes**, how records are serialized and deserialized. We default both to `String` here; the moment a value isn't a string you override it per operator (a frequent source of [serde errors](https://www.conduktor.io/kafka-streams/serdes)).
- **`AUTO_OFFSET_RESET_CONFIG=earliest`**, on first run there's no committed offset, and a plain consumer would default to `latest` and wait for new records. Kafka Streams already overrides that default to `earliest` for you (one of `StreamsConfig`'s built-in consumer overrides, in 3.9 and 4.x alike), so this line is documentation of intent rather than a behavior change. Still worth writing: it makes the first-run behavior explicit instead of buried in the library.

### application.id is doing three jobs at once

`application.id` looks like a label. It isn't, it's an identity that Kafka Streams reuses for three different things:

1. The **consumer group id**. All instances sharing an `application.id` form one [consumer group](https://www.conduktor.io/kafka/kafka-consumer-groups-and-consumer-offsets) and split the partitions between them. This is how the app scales.
2. The **prefix for every internal topic** it creates, the changelog and repartition topics behind your [state stores](https://www.conduktor.io/kafka-streams/state-store) are all named `<application.id>-...`.
3. The **local state directory** name on disk, where RocksDB keeps its files.

Because one string drives all three, choosing it carelessly has real consequences:

> **Treat `application.id` as a namespace, and never reuse one by accident.** Two different apps pointed at the same `application.id` will join the same consumer group, fight over partitions, and write to each other's changelog and state directories, corrupting both. A copy-paste typo that collides with another app's id has caused exactly this in production. Make it unique and descriptive per app, and change it deliberately, knowing it resets the consumer group offsets and orphans the old internal topics.

## The topology

Now the actual logic. WordCount reads lines, splits them into words, counts each word, and writes the counts out. Here's the whole topology, this runs as-is on Apache Kafka 3.9.0:

```java
StreamsBuilder builder = new StreamsBuilder();

builder.<String, String>stream("words-input")
    .flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
    .filter((key, word) -> !word.isEmpty())
    .groupBy((key, word) -> word)
    .count(Materialized.as("word-counts"))
    .toStream()
    .to("words-output", Produced.with(Serdes.String(), Serdes.Long()));

Topology topology = builder.build();
```

Reading it operator by operator:

| Operator | What it does |
|---|---|
| `stream("words-input")` | Reads the input topic as a [KStream](https://www.conduktor.io/kafka-streams/kstream-ktable-globalktable) of lines |
| `flatMapValues(... split ...)` | Splits each line into words, one input line becomes many word records |
| `filter(word not empty)` | Drops the empty strings the regex split can produce |
| `groupBy((k, word) -> word)` | Re-keys by the word itself, so all occurrences of a word group together |
| `count(Materialized.as("word-counts"))` | Counts per word into a **named** state store, producing a `KTable<String, Long>` |
| `toStream()` | Converts that running count table back into a stream of updates |
| `to("words-output", Produced.with(...))` | Writes out, telling Kafka the value is now a `Long`, not a `String` |

Two details worth pausing on. `groupBy` changes the key, which forces Kafka Streams to **repartition** the data through an internal topic so that every instance of a given word ends up on the same task, that's automatic, but it's why a stateful app creates topics behind your back. And the final `Produced.with(Serdes.String(), Serdes.Long())` is the explicit, safe choice rather than strictly mandatory here: `count()` sets a `Long` serde on its materialization and Kafka Streams propagates it through `toStream()` into `to()`, so this exact topology happens to work without it. But the moment an operator that loses type information sits between the aggregation and `to()` (a `map` or `mapValues`, say), propagation breaks, the default `String` serde kicks back in, and you get a `ClassCastException` wrapped in a `StreamsException` at runtime. Always state the serde at the sink.

> **Naming the store (`Materialized.as("word-counts")`) is not cosmetic.** An unnamed store gets a *positional* name derived from its place in the topology, which shifts the moment you insert an operator upstream, orphaning the state. The name also anchors the internal topics: this topology creates `<application.id>-word-counts-repartition` and `<application.id>-word-counts-changelog`, all from that one string. Name your stores from day one. See [evolving a topology](https://www.conduktor.io/kafka-streams/topology-evolution).

## Start it, and shut it down cleanly

A `Topology` is just a description. To run it, wrap it in a `KafkaStreams` instance and `start()`. The part people forget is the other half: stopping cleanly.

```java
KafkaStreams streams = new KafkaStreams(topology, props);

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

streams.start();
```

`start()` kicks off the stream threads and returns immediately, the processing runs in the background. The shutdown hook is what makes it behave on `Ctrl+C`, a `SIGTERM`, or a Kubernetes pod eviction: `streams.close()` flushes state and commits offsets. It deliberately does *not* leave the consumer group: the member lingers until `session.timeout.ms` expires (45 seconds by default), so a quick restart doesn't trigger a rebalance. When an instance is going away for good, call `close(CloseOptions)` with `leaveGroup(true)` to remove it immediately.

Skip the hook and a hard kill leaves uncommitted offsets and unflushed state, so the next start re-restores from the changelog and may reprocess records. The shutdown hook is one line. It saves you a slow, confusing restart later.

## Run it

With a local Kafka cluster on `localhost:9092`, create the topics, start the app, and feed it some lines:

```bash
# create the input and output topics
kafka-topics.sh --create --topic words-input  --bootstrap-server localhost:9092
kafka-topics.sh --create --topic words-output --bootstrap-server localhost:9092

# produce a few lines (type, then Ctrl+D)
kafka-console-producer.sh --topic words-input --bootstrap-server localhost:9092
> the quick brown fox
> the lazy dog
```

Then read the output. The values are `Long`, so the consumer needs the right deserializer and the keys printed:

```bash
kafka-console-consumer.sh --topic words-output \
  --bootstrap-server localhost:9092 \
  --from-beginning \
  --property print.key=true \
  --value-deserializer org.apache.kafka.common.serialization.LongDeserializer
```

Don't panic when nothing appears for the first half-minute. With the default config the record cache holds updates and flushes them on commit, and `commit.interval.ms` defaults to 30 seconds, so the output topic stays empty until the first commit. Then everything arrives at once:

```
quick   1
the     2
brown   1
fox     1
lazy    1
dog     1
```

Note `the` shows up once, as `2`. Both lines landed inside one commit interval, so the cache merged the intermediate `the 1` into the final count before emitting. That's still the `KTable` doing its job: each output record is the *current* count for that word, not a one-time final answer. Send more lines after the next commit and the counts keep climbing as new records. To watch every single update flow through, the `the 1` then `the 2` most tutorials show, which reflects `TopologyTestDriver` semantics, not real-broker defaults, set `statestore.cache.max.bytes` to `0`. (If you want a single final emission per window instead of running updates, that's windowing plus suppression; see [aggregations](https://www.conduktor.io/kafka-streams/aggregations).)

That's a complete Kafka Streams app: a dependency, five config lines, a topology, and a clean shutdown.

**What Maven dependency do I need for Kafka Streams?**

Add `org.apache.kafka:kafka-streams` (this page uses version `3.9.0`). It pulls in `kafka-clients` transitively, so you don't add that separately; also add an SLF4J binding so you can see the library's logs during rebalances and state restores.

**What is the application.id config and why does it matter?**

`application.id` is an identity Kafka Streams reuses for three things: the consumer group id, the prefix for every internal topic it creates, and the local state directory on disk. Because one string drives all three, two apps sharing an `application.id` will fight over partitions and corrupt each other's state, so keep it unique per app.

**Why does my Kafka Streams app read nothing on first run?**

Usually not the offset reset: Kafka Streams overrides the consumer's `latest` default to `earliest`, so a fresh app reads the input topic from the beginning (a plain `KafkaConsumer` does not get this override). The likelier cause is the record cache plus the 30-second default `commit.interval.ms`: nothing reaches the output topic until the first commit. Wait out the interval, or set `statestore.cache.max.bytes` to `0` to see every update immediately.

**Why do I need a shutdown hook in a Kafka Streams app?**

`streams.start()` returns immediately and runs in the background, so a clean stop needs `streams.close()` wired to a shutdown hook. Without it, a hard kill leaves uncommitted offsets and unflushed state, forcing the next start to re-restore from the changelog and possibly reprocess records.

**Why does my WordCount output show the same word with increasing counts?**

An aggregation returns a `KTable`, so each output record is the current running count for that key, not a one-time final answer, `the` will appear as `1`, then `2`, and so on as more lines arrive. To collapse those updates into one result per time window instead of every update, you reach for windowing plus `suppress(untilWindowCloses)`; the record cache only reduces how many updates you see, it never yields a single final value.

> **See it in practice with Conduktor**
> Once your first app is running, it shows up on the cluster as a consumer group plus a set of internal topics. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) lets you watch `words-output` fill up, find the `wordcount-app-...` repartition and changelog topics it created, and check the consumer group's lag, the fastest way to confirm the app is actually keeping up.

## Next steps

- [Kafka Streams architecture](https://www.conduktor.io/kafka-streams/architecture), how a topology becomes tasks and threads
- [Aggregations](https://www.conduktor.io/kafka-streams/aggregations), go beyond count with reduce and aggregate
- [State stores](https://www.conduktor.io/kafka-streams/state-store), what `Materialized.as(...)` actually created
