# Kafka Streams joins

*Learn the four join types and the rules that silently break them.*

Joining is where Kafka Streams earns its keep: enrich an order stream with customer data, correlate clicks with impressions, stitch two changelogs into one view. The DSL makes the join itself a one-liner. What it does *not* make obvious is that every join carries a contract, co-partitioning, timing, key semantics, and breaking that contract usually fails silently, with no exception and no output.

This page covers the four join types, what each one needs from you, and the three ways a join produces nothing while looking perfectly healthy.

**What you'll learn:**
- The four join types: KStream-KStream, KStream-KTable, KTable-KTable, GlobalKTable
- Why co-partitioning is mandatory, and how a partitioner mismatch silently corrupts results
- Why a KStream-KTable join drops records when the table isn't populated yet
- When to reach for a foreign-key join or a GlobalKTable instead

## The join matrix

Kafka Streams gives you four joins, and which one you pick is dictated by the *type* of each side, stream or table, not by taste.

| Left | Right | Join semantics | Windowed? | Co-partition required? |
|---|---|---|---|---|
| KStream | KStream | Both sides are events; match within a time window | Yes | Yes |
| KStream | KTable | Event looks up the latest table value for its key | No | Yes |
| KTable | KTable | Two changelogs combined into one updated view | No | Yes (on key) |
| KTable | KTable | Foreign-key: join on a non-key field | No | No (re-keyed internally) |
| KStream / KTable | GlobalKTable | Lookup against a fully-replicated table by any field | No | **No** |

The mental split: a **stream-stream** join asks "did these two things happen close together in time?" A **stream-table** join asks "what is the current state for this key?" A **table-table** join keeps a continuously-updated combined view. Get the question wrong and you've picked the wrong join, see [KStream, KTable & GlobalKTable](https://www.conduktor.io/kafka-streams/kstream-ktable-globalktable) for the underlying model.

## KStream-KStream: windowed event correlation

Two event streams, matched when records with the same key arrive within a time window. This is the join for "click within 10 minutes of an impression" or "payment within an hour of an order".

```java
KStream<String, Impression> impressions = builder.stream("impressions");
KStream<String, Click> clicks = builder.stream("clicks");

KStream<String, Attribution> attributed = impressions.join(
    clicks,
    (impression, click) -> new Attribution(impression, click),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)),
    StreamJoined.with(Serdes.String(), impressionSerde, clickSerde)
);
```

The join is symmetric and windowed: each side is buffered in a window store for the window duration, so a click can match an impression that arrived earlier *or* later within the window. That buffering is state, two more changelog-backed stores, sized by your window length and traffic. A wide window over high-volume topics is a real memory and disk cost, not a free lookup.

`JoinWindows` controls the matching span; the grace period controls how long late records can still join after the window's nominal end. Newer windowed operations default to **no grace** (`ofTimeDifferenceWithNoGrace`), so a late event is refused by the join buffer and counted in the `dropped-records` metric unless you ask for tolerance explicitly. (It can still match on arrival if the other side's store hasn't expired the counterpart yet; it just can't match anything that arrives after it.) The interaction between stream-time, windows, and grace is the same one that bites windowing, covered in [windowing](https://www.conduktor.io/kafka-streams/windowing).

A **self-join**, joining a stream with itself to correlate events on the same topic (matching a login with a later logout, say), is a special case. Since Kafka 3.4 (KIP-862), an inner self-join on the key is optimized to use a *single* state store instead of two, halving its state cost, when [topology optimization](https://www.conduktor.io/kafka-streams/topology-evolution) is enabled. The optimization covers inner, key-based self-joins only, not N-way ones.

## KStream-KTable: the enrichment lookup

The most common join in production: a stream of events enriched with the current value from a table. An order stream joined against a `KTable` of customers, each order decorated with the customer's tier.

```java
KStream<String, Order> orders = builder.stream("orders");
KTable<String, Customer> customers = builder.table("customers");

KStream<String, EnrichedOrder> enriched = orders.join(
    customers,
    (order, customer) -> new EnrichedOrder(order, customer)
);
```

This join is **not** windowed and it is **asymmetric**: only a record on the *stream* side triggers a lookup. A new customer record updates the table silently; it does not re-emit past orders. That asymmetry is correct for enrichment, and it's also the source of the timing trap below.

## The timing trap: stream arrives before the table is ready

Here is the failure that costs people an afternoon. A KStream-KTable join looks up the table value *as it exists when the stream record is processed*. If the stream record for a key is processed **before** the table has been populated for that key, the lookup finds nothing, and the record is **silently dropped**. No error. No log line. Not even a tick of the `dropped-records` metric. The join simply produced nothing for that record.

This is most visible on startup and after a restart. Both the stream topic and the table's source topic are being consumed concurrently, and there is no guarantee the table side is read first. If your stream is ahead, early records miss a table that hasn't caught up yet.

The lever is `max.task.idle.ms`. A task processes whichever buffered record has the lowest timestamp, and since Kafka 3.0 (KIP-695) the default (`0`) already pauses when one input has no buffered data but non-zero lag: it waits for records that exist on the broker to be fetched. Setting `max.task.idle.ms` above zero makes the task additionally wait for records that haven't been *produced* yet, giving the table side time to deliver records up to the same timestamp before the stream record is processed, improving timestamp synchronization across the two inputs. (`-1` disables idling entirely.)

```properties
# Let a task wait up to 5s for a lagging input so the table side can catch up
max.task.idle.ms=5000
```

This reduces the drop window; it does not make it impossible (a table side that is genuinely far behind still loses the race). When a join "drops records on startup", this is usually why. The full diagnostic walkthrough lives in [joins that drop data](https://www.conduktor.io/kafka-streams/join-troubleshooting).

## Co-partitioning: the contract that breaks silently

Every join except the GlobalKTable join requires the two sides to be **co-partitioned**. Co-partitioning means two things, and both must hold:

1. **Same partition count** on both input topics.
2. **Same partitioning**, the same key produces the same partition number on both sides.

The reason is mechanical. A join task owns one partition number from each side and matches keys *within that partition only*. Records for key `acct-42` must land in the same partition on both topics, or the two halves of the join never meet in the same task.

The two ways to break it fail very differently:

| Violation | Detected? | Symptom |
|---|---|---|
| Partition-count mismatch | **Sometimes** | `TopologyException: ... not co-partitioned` at the first rebalance, but on 3.2+ only if the topology contains a repartition topic. Otherwise: silent wrong results |
| Partitioner mismatch (same count) | **No** | No exception. Wrong or empty results, forever |

A count mismatch *can* be the good failure: the check runs in the partition assignor during the first rebalance and kills the app with `TopologyException` "not co-partitioned" before it processes a single record. But since Kafka Streams 3.2, that check is skipped entirely when the topology contains no internal repartition topics, and the plain pre-keyed join above is exactly that case. The app starts, reports RUNNING, creates a task per partition of the larger topic, and joins only the overlapping partitions: the same silent wrong-results failure as a partitioner mismatch. Any operation that adds a repartition topic to the topology re-arms the check.

The partitioner mismatch is the dangerous one. If both topics have 12 partitions but they were written by producers using *different* partitioning logic, a custom partitioner on one side, the default murmur2 hash on the other, or one topic keyed by a string and another by the same value serialized differently, Kafka Streams **cannot see how the upstream producers partitioned the data**. It assumes co-partitioning holds, finds no matches for most keys, and emits wrong or empty results with no warning whatsoever.

> 🚫 *"Both topics have the same number of partitions, so the join is co-partitioned."*

Same partition count is necessary, not sufficient. If the two sides were produced by different applications, with different key serializers or a custom partitioner on one side, the data can be mis-aligned even with matching counts, and you get a silently empty join.

The fix when you can't guarantee upstream partitioning is to **repartition explicitly** before the join, forcing both sides through Kafka Streams' own partitioner:

```java
KStream<String, Order> repartitioned =
    orders.repartition(Repartitioned.with(Serdes.String(), orderSerde));
```

This writes through an internal repartition topic with a known partitioner, restoring the co-partitioning guarantee. Name the repartition topic (`Repartitioned.as("...")`) so it stays stable across topology edits, see [evolving a topology](https://www.conduktor.io/kafka-streams/topology-evolution).

## The null-key skip (a join that "broke after upgrade")

Inner joins have always skipped records with a **null key**, since a null key cannot be co-partitioned or looked up. Kafka 2.7 extended that skip to the left-join paths (with a WARN log and a tick of the `dropped-records` metric), which is why teams upgrading from older versions hit this as "our join started dropping data after the upgrade." **Kafka 3.7 (KIP-962) then relaxed it for left and outer joins:** `leftJoin`/`outerJoin` (and left foreign-key joins) no longer drop null-key records, they call the `ValueJoiner` with `null` for the missing side, matching documented left/outer semantics. **Inner joins still drop null-key records**, since an inner match is impossible. On an inner join (or before 3.7), set a key with `selectKey` *before* the join so those records survive.

## KTable-KTable: combining two changelogs

A table-table join keeps a continuously-maintained combined view: whenever either side updates for a key, the join output for that key is recomputed and re-emitted. Both sides must be co-partitioned on the key.

```java
KTable<String, Account> accounts = builder.table("accounts");
KTable<String, Profile> profiles = builder.table("profiles");

KTable<String, AccountView> view = accounts.join(
    profiles,
    (account, profile) -> new AccountView(account, profile)
);
```

Because the result is itself a KTable, a tombstone (null value) on either side flows through as a delete on the output. This is the right behavior for materialized views, and a footgun if you then `toStream()` the result and forget the values can be null.

## Foreign-key joins: joining tables on a non-key field

The standard table-table join matches on the key. But often you need to join on a *field inside the value*, orders keyed by `orderId`, each carrying a `customerId`, joined to a customers table keyed by `customerId`. That's a **foreign-key join**, available for KTable-KTable since Kafka 2.4.

```java
KTable<String, Order> orders = builder.table("orders");       // keyed by orderId
KTable<String, Customer> customers = builder.table("customers"); // keyed by customerId

KTable<String, EnrichedOrder> enriched = orders.join(
    customers,
    order -> order.getCustomerId(),                  // extract the foreign key
    (order, customer) -> new EnrichedOrder(order, customer)
);
```

The foreign-key join does **not** require you to co-partition the inputs yourself, it re-keys internally using a pair of hidden subscription and response topics (they show up in your topic list as `<appId>-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-<n>-topic` and `...-SUBSCRIPTION-RESPONSE-<n>-topic`) to route updates both ways (a change to a customer must update every order that references it). That machinery is powerful and not free: at scale the internal topics carry real traffic, and the join is heavier than a same-key join. It also has sharper edges, out-of-order update warnings and null-foreign-key left-join semantics that have surprised people. Treat it as the right tool for genuine foreign-key relationships, not a default.

Since Kafka 4.0 (KIP-1104), the foreign-key extractor can read the record's **key as well as its value** (a `BiFunction<K, V, KO>` overload), so you no longer have to copy a key field into the value just to join on it.

## GlobalKTable: the lookup with no co-partitioning

A `GlobalKTable` is replicated in full to **every** application instance, rather than partitioned across them. That single property removes the co-partitioning requirement: because every instance has the whole table locally, any record can be looked up regardless of how the stream side is partitioned.

```java
KStream<String, Order> orders = builder.stream("orders");
GlobalKTable<String, Country> countries = builder.globalTable("countries");

KStream<String, EnrichedOrder> enriched = orders.join(
    countries,
    (orderKey, order) -> order.getCountryCode(),     // map stream record to the table key
    (order, country) -> new EnrichedOrder(order, country)
);
```

Two things make GlobalKTable joins different from KTable joins:

- **You supply a `KeyValueMapper`** that derives the table key from the stream record. You are not restricted to the stream's key, and the stream does **not** need to be re-keyed or co-partitioned. This is the idiomatic way to join on a field in the value without a foreign-key join's overhead.
- **It is not time-synchronized.** A GlobalKTable is bootstrapped fully on startup and then updated continuously, but its updates are not aligned to the stream's timestamps the way a KTable's are. For small, slowly-changing reference data (country codes, currency tables, feature flags) that's fine; for large or fast-moving data, full replication to every instance is the cost you pay.

Use a GlobalKTable when the right side is small enough to hold entirely in memory on every instance and you want to avoid co-partitioning gymnastics. Reach for a KTable when the data is large, partitioned, and you need timestamp-aligned semantics.

**What are the join types available in Kafka Streams?**

KStream-KStream (windowed event correlation), KStream-KTable (enrichment lookup of the latest table value), KTable-KTable (a continuously-updated combined view, on the key or on a foreign key), and joins against a GlobalKTable. Which one you use is dictated by the type of each side, stream or table, not by preference.

**What is co-partitioning and why do joins require it?**

Co-partitioning means both input topics have the same partition count and the same partitioning, so a given key lands in the same partition number on both sides. A join task owns one partition number from each side and matches keys within that partition only, so without it the two halves never meet. Every join except the GlobalKTable join requires it.

**Why did my Kafka Streams join start dropping records after an upgrade?**

Inner joins always skipped null-key records, and Kafka 2.7 extended that skip to left joins because a null key cannot be co-partitioned or looked up, which teams hit as "the join started dropping data." Kafka 3.7 (KIP-962) relaxed this for `leftJoin`/`outerJoin`, but inner joins still drop null-key records, so set a key with `selectKey` before an inner join.

**Why does my KStream-KTable join produce nothing for some records?**

A KStream-KTable join looks up the table value as it exists when the stream record is processed, and if the stream record arrives before the table is populated for that key, the lookup finds nothing and the record is silently dropped with no error. Setting `max.task.idle.ms` lets the task wait briefly for the table side to catch up, narrowing the drop window.

**When should I use a foreign-key join or a GlobalKTable instead?**

Use a KTable-KTable foreign-key join (since Kafka 2.4) to join on a field inside the value rather than the key, it re-keys internally and does not require you to co-partition. Use a GlobalKTable when the right side is small enough to replicate fully to every instance, which removes co-partitioning entirely and lets you look up by any field.

> **See it in practice with Conduktor**
> A join's co-partitioning contract is visible in your cluster: the partition counts of the two input topics, and the internal repartition and subscription topics Kafka Streams creates to satisfy it. [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) lets you inspect those topics, compare partition counts across both join sides, and watch consumer group lag on each input, the first things to check when a join produces nothing.

## Next steps

- [Joins that drop data](https://www.conduktor.io/kafka-streams/join-troubleshooting), a diagnostic walkthrough for "my join produces nothing"
- [KStream, KTable & GlobalKTable](https://www.conduktor.io/kafka-streams/kstream-ktable-globalktable), the stream-vs-table model joins are built on
- [Windowing](https://www.conduktor.io/kafka-streams/windowing), grace, stream-time, and why stream-stream joins need them
