Kafka Streams interactive queries

Kafka Streams interactive queries: read state store data live, route to the owning instance, handle InvalidStateStoreException, and IQv1 vs IQv2 (KIP-796).

Serve your state store to the outside world.

A state store is a queryable view of your data living right inside your app. Interactive queries are how you read it (from a REST endpoint, another service, an AI agent) without standing up a separate database. The store page covers how state is kept. This page covers how you serve it, which is where the multi-instance problems live.

The single fact that shapes everything here: state is sharded across instances, the same way partitions are. A key lives on exactly one instance, and querying the wrong one fails. Getting interactive queries right is mostly about finding the instance that owns the key.

What you'll learn:

  • How to open a read-only handle on a store with StoreQueryParameters
  • Why a single-instance query breaks at two instances, and how to route to the owner
  • Why you keep hitting InvalidStateStoreException, and the two fixes
  • IQv1 vs IQv2 (KIP-796): the older API and the newer typed one

Querying a store on one instance

With a single instance, interactive queries are trivial. You ask the running KafkaStreams object for a read-only view of a named store and call get:

ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore()));

Long count = store.get("kafka");          // point lookup
KeyValueIterator<String, Long> all = store.all();   // full scan

The store type must match what you materialized. The common ones:

StoreQueryableStoreTypes factoryReads
Key-value (count, aggregate, reduce)keyValueStore()get(key), range(from, to), all()
WindowedwindowStore()fetch(key, timeFrom, timeTo)
SessionsessionStore()fetch(key)
The store must be named when you create it (Materialized.as("word-counts")). An unnamed store gets a positional name that you can't reliably reference and that shifts when you edit the topology. Kafka 4.3 logs a startup WARN listing every unnamed store and internal topic, and ensure.explicit.internal.resource.naming=true turns unnamed resources into a hard failure.

Enabling discovery across instances

Run two instances and the picture changes. Each owns a subset of partitions, so each holds a different slice of the store. A query for "kafka" only works on whichever instance owns the partition "kafka" hashes to. The other instance has no idea what that key's value is.

To make instances discoverable, set application.server to a host:port that the other instances (and your own query layer) can reach:

application.server=10.0.1.42:8080

This doesn't open any port for you. It's metadata Kafka Streams gossips through the consumer group so every instance knows every other instance's address. You still build the HTTP (or gRPC) endpoint at that address yourself.

🚫 "Interactive queries let me read any key from any instance, so I'll just query whichever one my load balancer hits."

That's the misconception that bites everyone first. An instance can only answer for the keys whose partitions it owns. Query a key the local instance doesn't own and you get an empty result or an exception, not the value sitting on the neighbor. You have to find the owner and route to it.

Routing to the instance that owns the key

The store metadata tells you which instance owns a given key. You ask queryMetadataForKey, compare the returned host to your own application.server, and either read locally or make an HTTP call to the owner.

KeyQueryMetadata md = streams.queryMetadataForKey(
    "word-counts", "kafka", Serdes.String().serializer());

HostInfo owner = md.activeHost();
if (owner.equals(myHostInfo)) {
    return localStore.get("kafka");            // we own it
} else {
    // RPC to http://owner.host():owner.port()/store/word-counts/kafka
    return httpClient.get(owner, "kafka");
}

For a query that has to span the whole keyspace (a dashboard total, an export), there's no single owner: you fan out. streams.metadataForAllStreamsClients() returns every instance and the partitions it holds; you call each one's endpoint and merge the results. (The allMetadata() from older tutorials is removed in Kafka 4.x, not just deprecated; code still calling it fails with NoSuchMethodError.)

for (StreamsMetadata instance : streams.metadataForAllStreamsClients()) {
    HostInfo host = instance.hostInfo();
    // GET http://host/store/word-counts/all  on every instance, then combine
}

That fan-out REST layer (a controller that does a local read when it owns the key and an HTTP hop when it doesn't) is the part you write. Kafka Streams gives you the routing metadata; it does not give you the web server.

Why you keep hitting InvalidStateStoreException

This is the most-pasted interactive-queries error, and it has two distinct causes that need different fixes.

org.apache.kafka.streams.errors.InvalidStateStoreException:
  The state store, word-counts, may have migrated to another instance.

Cause 1: the app isn't RUNNING yet, or a rebalance is in flight. During startup, a rebalance, or a state restore, the store is being (re)built and isn't available to query. The store handle you cached can also go stale across a rebalance. The exact message depends on the phase: before start() you get StreamsNotStartedException, during startup or a rebalance streams.store() says "Cannot get state store word-counts because the stream thread is STARTING, not RUNNING", and the "may have migrated" text above is what a stale handle throws. All are InvalidStateStoreException or its subclasses, so catch the parent. The fix is to treat the exception as transient: re-fetch the store handle and retry with backoff. Don't hold a ReadOnlyKeyValueStore reference for the life of the app: re-acquire it per query, or after any REBALANCING transition.

Cause 2: the key genuinely isn't on this instance. You queried the local store for a key another instance owns. A plain get() here usually returns null silently rather than throwing, as long as the instance owns at least one partition of the store, so the miss is easy to mistake for a missing key. No retry will fix that; the value isn't here. The fix is the routing from the previous section: find the owner and call it.

If you can tolerate slightly stale data during a rebalance, IQv1 also lets you query a store that's still restoring rather than waiting for RUNNING:

ReadOnlyKeyValueStore<String, Long> store = streams.store(
    StoreQueryParameters
        .fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore())
        .enableStaleStores());            // KIP-535: serve standby/restoring replicas

enableStaleStores() (KIP-535) lets a query hit a standby replica or a still-restoring active store, trading freshness for availability, useful when "an answer that's a few seconds behind" beats "an exception during every deploy." Pair it with standby replicas (num.standby.replicas) so there's a warm copy to read from while the active restores.

IQv1 vs IQv2

Everything above is IQv1: streams.store(StoreQueryParameters...) returning a typed ReadOnly*Store. It works, but each store type has its own interface and adding a new query shape means a new store type.

IQv2 (KIP-796) is the newer, typed query API: instead of fetching a store interface, you build a Query object and execute it through streams.query(...), getting back a StateQueryResult with per-partition results and position info.

StateQueryRequest<Long> request =
    StateQueryRequest.inStore("word-counts")
        .withQuery(KeyQuery.withKey("kafka"));   // KeyQuery returns the raw value

StateQueryResult<Long> result = streams.query(request);

(KeyQuery unwraps to the plain value; use TimestampedKeyQuery if you want the ValueAndTimestamp wrapper.)

IQv2 ships query types like KeyQuery (point lookup) and RangeQuery (scan), is extensible to custom queries, and exposes partition Position so you can reason about freshness. IQv1 is not deprecated and remains the most common code you'll see in the wild, so know both, but query(...) is the direction the project is moving, and it's the cleaner API for new code.

Querying the whole pipeline, end to end

Putting it together, a production interactive-query service is three layers: the Streams app materializing named stores; an HTTP endpoint per instance bound to its application.server address; and routing logic that either reads locally or fans out to the owners. This is exactly the shape behind the AI-agent context store, where an orchestrator calls GET /conversation/{id} and the service routes to whichever instance holds that conversation's partition.

Why do I get InvalidStateStoreException?

Two causes. Either the app isn't RUNNING yet or a rebalance/restore is in flight (transient: re-fetch the store handle and retry with backoff), or the key lives on a different instance (not transient: route to the owner via queryMetadataForKey). InvalidStateStoreException and its subclasses (like StreamsNotStartedException) cover both, but the message varies: "Cannot get state store X because the stream thread is STARTING, not RUNNING" during startup or rebalance, "may have migrated to another instance" for stale handles and migrated stores.

Can I query a state store during a rebalance?

Not the active store by default: it may be migrating or restoring. You can opt into stale reads with StoreQueryParameters.enableStaleStores() (KIP-535) to query a standby or still-restoring replica, trading freshness for availability. Otherwise, retry until the app returns to RUNNING.

How do I query a state store on another instance?

Set application.server so instances discover each other, call streams.queryMetadataForKey(store, key, serializer) to get the owning HostInfo, and make an HTTP/gRPC call to that instance's endpoint. You build the REST layer; Kafka Streams only provides the routing metadata.

What's the difference between IQv1 and IQv2?

IQv1 (streams.store(StoreQueryParameters...)) returns a typed read-only store interface per store type. IQv2 (KIP-796) uses streams.query(...) with composable Query objects (KeyQuery, RangeQuery) and returns a StateQueryResult with partition position info. IQv1 isn't deprecated; IQv2 is the newer, more extensible API.

Do I need a database if I use interactive queries?

Often not. A materialized store is already a queryable, fault-tolerant view rebuilt from its changelog, so for read-your-own-derived-state cases it replaces a separate database. You add a database when you need cross-key transactions, ad-hoc SQL, or to serve data the Streams app didn't materialize.

See it in practice with Conduktor

Interactive queries read from local stores backed by changelog topics on the cluster. Conduktor Console lets you see those -...-changelog topics, check their compaction and size, and watch the restore consumer's lag, so when a query throws InvalidStateStoreException after a deploy, you can confirm whether a restore is still in progress instead of guessing.

Next steps