# Kafka Streams for AI agents

*Learn how Kafka Streams becomes the memory layer for a multi-agent AI system.*

Multi-agent AI is a distributed-systems problem. Once you stop wiring agents together with direct HTTP calls and put Kafka between them, the conversation becomes a durable, ordered log, and that log is exactly the kind of thing Kafka Streams was built to turn into queryable state. This page shows the pattern that keeps surfacing in production agent stacks: **Kafka Streams as the agents' context store**, materialized in the same data plane the agents already run on, with no external database and no Flink.

**What you'll learn:**
- Why an agent needs a context store, and why bolting on a separate database is a tax
- How to materialize conversation memory into a `KTable` keyed by conversation
- How to track turn-rate with a window store for quotas and "user is stuck" detection
- How agents read that state in single-digit milliseconds via interactive queries

## Why an agent needs a context store

An LLM's context window is finite; a conversation grows without bound. Something has to summarize, truncate, and re-inject the relevant history into each prompt, and that something needs the full conversation, fast, on every turn.

The reflex is to bolt on a database: Postgres, DynamoDB, Redis. But in a Kafka-based agent system the conversation *already exists*, ordered and durable, on topics like `user.messages`, `subagent.responses`, and `agent.responses`. Re-ingesting it into a second store is a pipeline nobody budgets for and everybody ends up operating. Kafka Streams collapses that loop: it reads the conversation topics and materializes the memory in place, next to the bus, where it sees every turn the moment it lands.

> **This is a real architecture, not a thought experiment.** It's the design behind Conduktor's multi-agent reference implementation, an orchestrator, specialized sub-agents, share-group tool workers, an audit tap, and a Kafka Streams context store with HTTP interactive queries. The wider argument is in [Kafka and Flink as the infrastructure for AI agents](https://www.conduktor.io/blog/ai-agents-at-scale-the-critical-role-of-kafka-and-flink).

## The conversation is already a log

When agents communicate over Kafka, every message, a user turn, a sub-agent handoff, a final response, is an event on a topic. Key those events by `conversationId` and Kafka does two useful things for free: all of a conversation's turns land on the **same partition** (so they stay ordered), and a Streams app can [group them by key](https://www.conduktor.io/kafka-streams/aggregations) and fold them into a running view.

## Materialize conversation memory in a KTable

Merge the conversation topics, group by `conversationId`, and aggregate each conversation's turns into a single context object held in a [state store](https://www.conduktor.io/kafka-streams/state-store):

```java
StreamsBuilder builder = new StreamsBuilder();

KStream<String, Turn> turns = builder.stream("user.messages", Consumed.with(Serdes.String(), turnSerde))
    .merge(builder.stream("subagent.responses", Consumed.with(Serdes.String(), turnSerde)))
    .merge(builder.stream("agent.responses",    Consumed.with(Serdes.String(), turnSerde)));

turns
    .groupByKey() // already keyed by conversationId
    .aggregate(
        ConversationContext::new,
        (conversationId, turn, ctx) -> ctx.append(turn),
        Materialized.<String, ConversationContext, KeyValueStore<Bytes, byte[]>>as("conversation-context")
            .withValueSerde(contextSerde));
```

The result is a `KTable<String, ConversationContext>`, the full transcript per conversation, kept in RocksDB and backed by a compacted changelog (`<application.id>-conversation-context-changelog`, auto-created with `cleanup.policy=compact`) so it survives restarts. The three input topics must be [co-partitioned](https://www.conduktor.io/kafka-streams/joins) on `conversationId` (same key, same partition count) so each conversation's turns converge on one task. One trap: Streams only enforces co-partitioning for joins, not for merged sources feeding an aggregation. With mismatched partition counts the app reaches RUNNING with no error or warning, and a conversation's turns silently split across tasks, leaving a partial transcript in the store. Verify the counts yourself. Because the store is named, its changelog is stable across [topology edits](https://www.conduktor.io/kafka-streams/topology-evolution).

## Track turn-rate with a window store

The same app can answer operational questions about a conversation. A five-minute [tumbling window](https://www.conduktor.io/kafka-streams/windowing) counting user turns gives you both a rate limit and a "this user is looping / stuck" signal:

```java
builder.stream("user.messages", Consumed.with(Serdes.String(), turnSerde))
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("turns-5m"));
```

One Streams app, two materialized views: the full transcript for prompt enrichment, and a windowed turn-count for quotas, both derived from the log the agents are already writing.

## Serve it to the agents with interactive queries

A materialized store isn't only internal plumbing. With [interactive queries](https://www.conduktor.io/kafka-streams/state-store), the agent reads it directly over HTTP, no separate database, no network hop to an external system:

```java
ReadOnlyKeyValueStore<String, ConversationContext> store =
    streams.store(StoreQueryParameters.fromNameAndType(
        "conversation-context", QueryableStoreTypes.keyValueStore()));

// GET /conversation/{id}, enrich the next prompt with prior turns
ConversationContext ctx = store.get(conversationId);
```

The orchestrator calls `GET /conversation/{id}` to fold prior turns into the next prompt, or `GET /conversation/{id}/activity` to enforce a quota from the windowed store. Both answer in single-digit milliseconds because the state lives in-process, next to the agent's data plane.

The one operational detail to plan for: with more than one instance, the state is sharded across hosts, so a conversation may live on another instance. You ask `streams.queryMetadataForKey(...)` which instance owns the key and route the call there, the same multi-instance routing covered in [state stores](https://www.conduktor.io/kafka-streams/state-store). Add a replica and the view rebuilds itself from the log.

## When this fits, and when it doesn't

Be honest about the boundary, the same way you would about adding Kafka at all. A single agent with modest traffic does not need this, a row in Postgres holds the conversation fine, and you skip the operational surface entirely. The context-store pattern earns its place when the agents *already* run on Kafka at scale, and you want their memory in the same governed, observable plane as the rest of the traffic rather than in a second system you have to keep in sync.

That last part is the quiet payoff: when agent memory is a Kafka topic and a Streams store, it inherits everything Kafka already gives you, ordering, replay, audit, and access control, instead of needing its own.

**Where does Kafka Streams fit in an AI agent architecture?**

It serves as the agents' context store: a Streams app reads the conversation topics, aggregates each conversation into a `KTable` keyed by `conversationId`, and the agents read that state over interactive queries to enrich the next prompt. The memory lives in the same data plane as the agent traffic, with no external database.

**Can Kafka Streams power real-time features for AI agents?**

Yes, for stateful enrichment and operational signals derived from the conversation log. The same app that materializes the transcript can run a five-minute tumbling window counting user turns to drive both a rate limit and a "user is looping / stuck" signal, both from the log the agents already write.

**How do agents read the context store in low latency?**

Through interactive queries: the agent calls `streams.store(...)` and reads the value directly in-process, answering in single-digit milliseconds with no network hop to an external system. With more than one instance the state is sharded, so you ask `streams.queryMetadataForKey(...)` which instance owns the key and route the call there.

**Why not just use Postgres or Redis for agent memory?**

When agents communicate over Kafka, the conversation already exists as an ordered, durable log, so re-ingesting it into a second store is a pipeline you have to operate and keep in sync. A single agent with modest traffic is fine in Postgres; the Streams context store earns its place when the agents already run on Kafka at scale and you want their memory in the same governed, observable plane.

**Do the conversation topics need to be co-partitioned?**

Yes. The input topics (`user.messages`, `subagent.responses`, `agent.responses`) must share the same key (`conversationId`) and partition count so every turn of a conversation converges on one task and stays ordered. Streams does not enforce this for merged sources (only for joins): with mismatched partition counts the app runs without error and a conversation silently splits across tasks, so check the counts yourself.

> **See it in practice with Conduktor**
> Agent traffic is just Kafka topics, so it observes and governs like any other. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) shows the `user.messages` / `subagent.responses` / `agent.responses` topics, their consumer-group lag, and the changelog behind the context store, the full conversation trail in one place. And because the data plane is shared, [Gateway interceptors](https://www.conduktor.io/gateway) can redact sensitive fields or enforce quotas on agent topics with no change to the agent code. Conduktor also speaks [MCP](https://www.conduktor.io/mcp) for AI-driven Kafka operations.

## Next steps

- [State stores & interactive queries](https://www.conduktor.io/kafka-streams/state-store), the mechanism that serves context to the agents
- [Windowing](https://www.conduktor.io/kafka-streams/windowing), the tumbling windows behind turn-rate and quotas
- [The future of Kafka Streams](https://www.conduktor.io/kafka-streams/future), remote state, new rebalance protocols, and the AI direction
