# Kafka Streams interactive queries

*Serve your state store to the outside world.*

A [state store](https://www.conduktor.io/kafka-streams/state-store) is a queryable view of your data living right inside your app. Interactive queries are how you read it (from a REST endpoint, another service, an [AI agent](https://www.conduktor.io/kafka-streams/ai-agents)) without standing up a separate database. The store page covers how state is *kept*. This page covers how you *serve* it, which is where the multi-instance problems live.

The single fact that shapes everything here: state is **sharded across instances**, the same way partitions are. A key lives on exactly one instance, and querying the wrong one fails. Getting interactive queries right is mostly about finding the instance that owns the key.

**What you'll learn:**
- How to open a read-only handle on a store with `StoreQueryParameters`
- Why a single-instance query breaks at two instances, and how to route to the owner
- Why you keep hitting `InvalidStateStoreException`, and the two fixes
- IQv1 vs IQv2 (KIP-796): the older API and the newer typed one

## Querying a store on one instance

With a single instance, interactive queries are trivial. You ask the running `KafkaStreams` object for a read-only view of a named store and call `get`:

```java
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore()));

Long count = store.get("kafka");          // point lookup
KeyValueIterator<String, Long> all = store.all();   // full scan
```

The store type must match what you materialized. The common ones:

| Store | `QueryableStoreTypes` factory | Reads |
|---|---|---|
| Key-value (`count`, `aggregate`, `reduce`) | `keyValueStore()` | `get(key)`, `range(from, to)`, `all()` |
| [Windowed](https://www.conduktor.io/kafka-streams/windowing) | `windowStore()` | `fetch(key, timeFrom, timeTo)` |
| Session | `sessionStore()` | `fetch(key)` |

The store must be **named** when you create it (`Materialized.as("word-counts")`). An unnamed store gets a positional name that you can't reliably reference and that shifts when you [edit the topology](https://www.conduktor.io/kafka-streams/topology-evolution). Kafka 4.3 logs a startup WARN listing every unnamed store and internal topic, and `ensure.explicit.internal.resource.naming=true` turns unnamed resources into a hard failure.

## Enabling discovery across instances

Run two instances and the picture changes. Each owns a subset of partitions, so each holds a *different slice* of the store. A query for `"kafka"` only works on whichever instance owns the partition `"kafka"` hashes to. The other instance has no idea what that key's value is.

To make instances discoverable, set `application.server` to a host:port that the *other* instances (and your own query layer) can reach:

```properties
application.server=10.0.1.42:8080
```

This doesn't open any port for you. It's metadata Kafka Streams gossips through the consumer group so every instance knows every other instance's address. You still build the HTTP (or gRPC) endpoint at that address yourself.

> 🚫 *"Interactive queries let me read any key from any instance, so I'll just query whichever one my load balancer hits."*

That's the misconception that bites everyone first. An instance can only answer for the keys whose partitions it owns. Query a key the local instance doesn't own and you get an empty result or an exception, not the value sitting on the neighbor. You have to find the owner and route to it.

## Routing to the instance that owns the key

The store metadata tells you which instance owns a given key. You ask `queryMetadataForKey`, compare the returned host to your own `application.server`, and either read locally or make an HTTP call to the owner.

```java
KeyQueryMetadata md = streams.queryMetadataForKey(
    "word-counts", "kafka", Serdes.String().serializer());

HostInfo owner = md.activeHost();
if (owner.equals(myHostInfo)) {
    return localStore.get("kafka");            // we own it
} else {
    // RPC to http://owner.host():owner.port()/store/word-counts/kafka
    return httpClient.get(owner, "kafka");
}
```

For a query that has to span the whole keyspace (a dashboard total, an export), there's no single owner: you fan out. `streams.metadataForAllStreamsClients()` returns every instance and the partitions it holds; you call each one's endpoint and merge the results. (The `allMetadata()` from older tutorials is removed in Kafka 4.x, not just deprecated; code still calling it fails with `NoSuchMethodError`.)

```java
for (StreamsMetadata instance : streams.metadataForAllStreamsClients()) {
    HostInfo host = instance.hostInfo();
    // GET http://host/store/word-counts/all  on every instance, then combine
}
```

That fan-out REST layer (a controller that does a local read when it owns the key and an HTTP hop when it doesn't) is the part you write. Kafka Streams gives you the routing metadata; it does not give you the web server.

## Why you keep hitting InvalidStateStoreException

This is the most-pasted interactive-queries error, and it has two distinct causes that need different fixes.

```
org.apache.kafka.streams.errors.InvalidStateStoreException:
  The state store, word-counts, may have migrated to another instance.
```

**Cause 1: the app isn't `RUNNING` yet, or a rebalance is in flight.** During startup, a [rebalance](https://www.conduktor.io/kafka-streams/rebalancing), or a [state restore](https://www.conduktor.io/kafka-streams/state-restore), the store is being (re)built and isn't available to query. The store handle you cached can also go stale across a rebalance. The exact message depends on the phase: before `start()` you get `StreamsNotStartedException`, during startup or a rebalance `streams.store()` says "Cannot get state store word-counts because the stream thread is STARTING, not RUNNING", and the "may have migrated" text above is what a stale handle throws. All are `InvalidStateStoreException` or its subclasses, so catch the parent. The fix is to treat the exception as *transient*: re-fetch the store handle and retry with backoff. Don't hold a `ReadOnlyKeyValueStore` reference for the life of the app: re-acquire it per query, or after any `REBALANCING` transition.

**Cause 2: the key genuinely isn't on this instance.** You queried the local store for a key another instance owns. A plain `get()` here usually returns `null` silently rather than throwing, as long as the instance owns at least one partition of the store, so the miss is easy to mistake for a missing key. No retry will fix that; the value isn't here. The fix is the routing from the previous section: find the owner and call it.

If you can tolerate slightly stale data during a rebalance, IQv1 also lets you query a store that's still restoring rather than waiting for `RUNNING`:

```java
ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters
        .fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore())
        .enableStaleStores());            // KIP-535: serve standby/restoring replicas
```

`enableStaleStores()` (KIP-535) lets a query hit a standby replica or a still-restoring active store, trading freshness for availability, useful when "an answer that's a few seconds behind" beats "an exception during every deploy." Pair it with [standby replicas](https://www.conduktor.io/kafka-streams/state-restore) (`num.standby.replicas`) so there's a warm copy to read from while the active restores.

## IQv1 vs IQv2

Everything above is **IQv1**: `streams.store(StoreQueryParameters...)` returning a typed `ReadOnly*Store`. It works, but each store type has its own interface and adding a new query shape means a new store type.

**IQv2** (KIP-796) is the newer, typed query API: instead of fetching a store interface, you build a `Query` object and execute it through `streams.query(...)`, getting back a `StateQueryResult` with per-partition results and position info.

```java
StateQueryRequest<Long> request =
    StateQueryRequest.inStore("word-counts")
        .withQuery(KeyQuery.withKey("kafka"));   // KeyQuery returns the raw value

StateQueryResult<Long> result = streams.query(request);
```

(`KeyQuery` unwraps to the plain value; use `TimestampedKeyQuery` if you want the `ValueAndTimestamp` wrapper.)

IQv2 ships query types like `KeyQuery` (point lookup) and `RangeQuery` (scan), is extensible to custom queries, and exposes partition `Position` so you can reason about freshness. IQv1 is not deprecated and remains the most common code you'll see in the wild, so know both, but `query(...)` is the direction the project is moving, and it's the cleaner API for new code.

## Querying the whole pipeline, end to end

Putting it together, a production interactive-query service is three layers: the Streams app materializing named stores; an HTTP endpoint per instance bound to its `application.server` address; and routing logic that either reads locally or fans out to the owners. This is exactly the shape behind the [AI-agent context store](https://www.conduktor.io/kafka-streams/ai-agents), where an orchestrator calls `GET /conversation/{id}` and the service routes to whichever instance holds that conversation's partition.

**Why do I get InvalidStateStoreException?**

Two causes. Either the app isn't `RUNNING` yet or a rebalance/restore is in flight (transient: re-fetch the store handle and retry with backoff), or the key lives on a different instance (not transient: route to the owner via `queryMetadataForKey`). `InvalidStateStoreException` and its subclasses (like `StreamsNotStartedException`) cover both, but the message varies: "Cannot get state store X because the stream thread is STARTING, not RUNNING" during startup or rebalance, "may have migrated to another instance" for stale handles and migrated stores.

**Can I query a state store during a rebalance?**

Not the active store by default: it may be migrating or restoring. You can opt into stale reads with `StoreQueryParameters.enableStaleStores()` (KIP-535) to query a standby or still-restoring replica, trading freshness for availability. Otherwise, retry until the app returns to `RUNNING`.

**How do I query a state store on another instance?**

Set `application.server` so instances discover each other, call `streams.queryMetadataForKey(store, key, serializer)` to get the owning `HostInfo`, and make an HTTP/gRPC call to that instance's endpoint. You build the REST layer; Kafka Streams only provides the routing metadata.

**What's the difference between IQv1 and IQv2?**

IQv1 (`streams.store(StoreQueryParameters...)`) returns a typed read-only store interface per store type. IQv2 (KIP-796) uses `streams.query(...)` with composable `Query` objects (`KeyQuery`, `RangeQuery`) and returns a `StateQueryResult` with partition position info. IQv1 isn't deprecated; IQv2 is the newer, more extensible API.

**Do I need a database if I use interactive queries?**

Often not. A materialized store is already a queryable, fault-tolerant view rebuilt from its changelog, so for read-your-own-derived-state cases it replaces a separate database. You add a database when you need cross-key transactions, ad-hoc SQL, or to serve data the Streams app didn't materialize.

> **See it in practice with Conduktor**
> Interactive queries read from local stores backed by changelog topics on the cluster. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) lets you see those `<application-id>-...-changelog` topics, check their compaction and size, and watch the restore consumer's lag, so when a query throws `InvalidStateStoreException` after a deploy, you can confirm whether a restore is still in progress instead of guessing.

## Next steps

- [State stores](https://www.conduktor.io/kafka-streams/state-store): how the data you're querying is kept and made fault-tolerant
- [Kafka Streams for AI agents](https://www.conduktor.io/kafka-streams/ai-agents): the context-store pattern that serves state over interactive queries
- [Monitoring Kafka Streams](https://www.conduktor.io/kafka-streams/monitoring): the lag and restore signals that explain query failures
