# Kafka Streams serde errors

*Diagnose and fix the two serde exceptions everyone hits.*

Almost every Kafka Streams app throws a serde error before it ships. The two that fill Stack Overflow look unrelated but share a root cause: a serializer or deserializer was applied to a type it was never configured for. One fires when you change a value's type mid-topology and forget to tell Streams; the other fires when a plain deserializer meets bytes written by the Confluent Schema Registry.

Both come with a stack trace people paste verbatim, and both have a precise fix once you understand how Kafka Streams *resolves* which serde to use for a given operator. This page walks the two errors, then the rule that prevents them.

**What you'll learn:**
- Why `ClassCastException: java.lang.Long cannot be cast to java.lang.String` shows up after a `map` or `aggregate`
- What `Unknown magic byte!` actually means and how to fix it
- How serde resolution works: the default config vs per-operator overrides
- The principle: set the serde at every operator that changes a type

## Error 1: ClassCastException after a transform

You write a topology, set sensible default serdes, and the app compiles. Then at runtime, the first record through an `aggregate` blows up:

```
Exception in thread "...-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Exception caught in process. taskId=0_0, processor=KSTREAM-AGGREGATE-0000000002 ...
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer
(org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual
value type (value type: java.lang.Long). Change the default Serdes in StreamConfig or
provide correct Serdes via method parameters.
Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to
class java.lang.String (java.lang.Long and java.lang.String are in module java.base ...)
	at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
```

That is the 4.x wording, and it names both the offending type and the fix. On 2.8 and earlier the wrapper read `ClassCastException invoking Processor. Do the Processor's input types match the deserialized types?`, with the same `ClassCastException` underneath.

Here is the trap. You configured a **default value serde** of `String` for the whole application:

```properties
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
```

Then somewhere in the topology you changed the value type to one Streams cannot infer. An `aggregate` builds a running `Long` total. A `mapValues` turns a JSON string into an `Order` object. The moment Streams needs to *write* that new type, to a changelog topic, a repartition topic, or your output topic, it reaches for the only serde it knows: the default `StringSerde`. `StringSerializer.serialize` receives a `Long`, casts it to `String`, and throws.

One operator gets a pass: `count()`. Streams knows its result is always a `Long`, so it materializes the store with `Serdes.Long()` and propagates that serde downstream through `toStream().to(...)`. The classic "count then to() blows up" example does not actually reproduce; it only fails if you force a wrong serde explicitly. `aggregate` gets no such inference, because its result type is whatever your initializer returns.

The exception fires deep in the serializer, but the bug is the missing serde declaration at the operator that changed the type.

```java
// Throws at runtime: aggregate() emits Long, default value serde is String
builder.<String, String>stream("orders")
    .groupByKey()
    .aggregate(() -> 0L,
        (key, value, agg) -> agg + 1)  // value type is now Long, serde not inferred
    .toStream()
    .to("order-counts");               // store, changelog, and sink all reach for StringSerde
```

The fix is to set the serde explicitly on the operator that introduced the new type. Every stateful and re-keying operator accepts a `...with(...)` configuration object for exactly this:

```java
// Tell each operator the real types
builder.<String, String>stream("orders",
        Consumed.with(Serdes.String(), Serdes.String()))
    .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
    .aggregate(() -> 0L,
        (key, value, agg) -> agg + 1,
        Materialized.with(Serdes.String(), Serdes.Long()))   // store + changelog: Long values
    .toStream()
    .to("order-counts", Produced.with(Serdes.String(), Serdes.Long()));  // output: Long values
```

`Consumed`, `Produced`, `Grouped`, and `Materialized` each carry a key serde and a value serde. Set them wherever the type differs from the default, and the `ClassCastException` is gone.

> 🚫 *"I set the default serde once at the top, so the whole topology is covered."*

The default serde only covers operators where the type matches the default. The instant an `aggregate` or a type-changing `mapValues` produces a type Streams cannot infer, the default is wrong for everything downstream of it, and you get the cast exception, not a "serde missing" message.

## How serde resolution works

Kafka Streams picks a serde for every read and write using a simple precedence:

| Source | Scope | How you set it |
|---|---|---|
| Per-operator serde | That one operator | `Consumed`, `Produced`, `Grouped`, `Materialized`, `StreamJoined`, `Repartitioned` `.with(...)` |
| Default serde | Every operator without an explicit serde | `default.key.serde` / `default.value.serde` config |
| (none configured) | N/A | `ConfigException` at startup: a serde is required |

An explicit per-operator serde always wins over the default. There is no per-topic registry and no type inference from generics, the `<String, String>` you write on `stream(...)` is a compile-time hint for the Java compiler, not a runtime instruction to Streams. At runtime, Streams only knows the default config and whatever you passed to each operator.

This is why the type-change case bites: generics make the code *look* type-safe, but the serializer is chosen from config, not from the generic parameter.

## Error 2: Unknown magic byte!

The second classic looks nothing like the first:

```
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(...)
```

That trace is from a Schema Registry 7.x client. On 8.x clients the wire-format parsing moved, so the top-level exception reads `SerializationException: Error deserializing schema ID` with `Unknown magic byte!` as the cause underneath. Same problem, same fix.

This one is about *wire format*, not types. The Confluent Schema Registry serializers do not write a bare Avro (or Protobuf, or JSON Schema) payload. They prefix every record value with a small envelope:

```
byte 0        : magic byte (always 0x00)
bytes 1-4     : 4-byte schema ID (big-endian int)
bytes 5..end  : the serialized payload
```

`Unknown magic byte!` means a Schema-Registry-aware deserializer read a record whose first byte was *not* `0x00`, so it could not find the schema ID it needs. There are two ways to land here:

- **A plain serde wrote the data, a Schema Registry serde is reading it.** The bytes have no magic byte or schema ID, so the Avro deserializer rejects the very first byte. Common when one team produces with a `StringSerializer` and another consumes the topic expecting Avro.
- **A Schema Registry serde wrote the data, a plain serde is reading it.** This is the mirror image and usually surfaces differently, a `StringDeserializer` happily turns the magic byte and schema ID into garbage characters at the front of your string, and you get corrupted-looking values instead of an exception. Same root cause: format mismatch.

The fix is to use the matching serde on both sides and point it at the registry. For Avro `SpecificRecord`:

```java
// Schema-Registry-aware Avro serde, configured with the registry URL
final Map<String, String> serdeConfig =
    Map.of("schema.registry.url", "http://schema-registry:8081");

final SpecificAvroSerde<Order> orderSerde = new SpecificAvroSerde<>();
orderSerde.configure(serdeConfig, false);  // false = value serde (true for keys)

builder.stream("orders",
        Consumed.with(Serdes.String(), orderSerde))   // matching deserializer on read
    .mapValues(order -> order.getTotal())
    .to("order-totals", Produced.with(Serdes.String(), Serdes.Long()));
```

If you are using the lower-level consumer config instead of a DSL serde, the equivalent is a `KafkaAvroDeserializer` with `schema.registry.url` set. The rule is the same: the deserializer must speak the same wire format as the serializer that wrote the bytes.

A frequent variant: the data is fine, but the consumer can't reach the registry, so it can't resolve the schema ID, check `schema.registry.url`, network reachability, and any auth the registry requires before assuming the records are bad.

## Windowed keys need a windowed serde

There is a third place serdes catch people: windowed aggregations. When you window a `count` or `aggregate`, the key is no longer your plain key, it is a `Windowed<K>` that wraps your key plus the window's time bounds. A plain `StringSerde` cannot serialize that.

If you write a windowed result to a topic, or query a windowed store, you need a **windowed serde** that knows how to encode the key-plus-window:

```java
import org.apache.kafka.streams.kstream.WindowedSerdes;

builder.<String, String>stream("clicks")
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count()
    .toStream()
    // key is Windowed<String> now, wrap the inner serde in a time-windowed serde
    .to("clicks-per-window",
        Produced.with(
            WindowedSerdes.timeWindowedSerdeFrom(String.class, 5 * 60 * 1000L),
            Serdes.Long()));
```

`WindowedSerdes.timeWindowedSerdeFrom(...)` builds a serde for the windowed key from the inner type and the window size. (Session windows have their own `sessionWindowedSerdeFrom`.) Forget it and you are back to a `ClassCastException`, this time `Windowed cannot be cast to String`.

## The principle: serde at every type change

All three errors collapse into one rule:

**Set the serde explicitly at every operator that introduces a type the default serde doesn't cover.**

In practice that means:

- An `aggregate` or `reduce` produces a value type the default doesn't cover → set `Materialized.with(...)` (covers the store *and* its changelog: store writes are serialized with the changelog topic name as the serde's topic argument, which is also the registry subject for Schema Registry serdes) and `Produced.with(...)` if you write the result out. `count()` is exempt: Streams fills in `Serdes.Long()` for you.
- A `groupBy` / `groupByKey` before an aggregation → set `Grouped.with(...)` so the repartition topic is encoded correctly.
- A `mapValues` / `map` that changes a type, followed by a stateful op or a `to(...)` → set the serde on the downstream operator that serializes.
- A windowed aggregation written out or queried → use a windowed serde for the key.
- Any topic carrying Schema Registry data → use the matching `SpecificAvroSerde` / Schema Registry serde with `schema.registry.url`.

Reaching for a [poison-pill handler](https://www.conduktor.io/kafka-streams/dead-letter-queue) is *not* a substitute here. A serde misconfiguration is a deterministic bug in your topology, not a single bad record, skipping it would skip every record. Fix the serde; reserve dead-letter handling for genuinely malformed input.

**What is a Serde in Kafka Streams?**

A Serde bundles a serializer and a deserializer for one type, so Kafka Streams can read records into Java objects and write them back to bytes. Kafka Streams resolves a Serde for every read and write from either the per-operator setting or the `default.key.serde`/`default.value.serde` config.

**What causes "Unknown magic byte!" in Kafka Streams?**

A Schema-Registry-aware deserializer (like Avro) read a record whose first byte wasn't the expected `0x00` magic byte, so it couldn't find the schema ID. It usually means a plain serde wrote the data while a Schema Registry serde is reading it, or the reverse, fix it by using the matching serde on both sides and pointing it at `schema.registry.url`.

**Why do I get a ClassCastException after a map, groupBy, or aggregate?**

An operator changed the value type to one Streams can't infer (for example an `aggregate()` building a `Long` total) but the only Serde Streams knows downstream is the `String` default, so `StringSerializer.serialize` receives a `Long` and throws. Set the Serde explicitly on the operator that introduced the new type via `Materialized.with(...)`, `Grouped.with(...)`, or `Produced.with(...)`.

**How do I set a Serde per operation versus as a default?**

A per-operator Serde, passed through `Consumed`, `Produced`, `Grouped`, `Materialized`, `StreamJoined`, or `Repartitioned` `.with(...)`, always wins over the default. The `<String, String>` generics on `stream(...)` are only a compile-time hint; at runtime Streams picks the serializer from config or the operator override, never from the generic parameter.

**Why does a windowed aggregation need a windowed Serde?**

After `windowedBy(...)`, the key becomes a `Windowed<K>` wrapping your key plus the window bounds, which a plain `StringSerde` can't encode. Use `WindowedSerdes.timeWindowedSerdeFrom(...)` (or `sessionWindowedSerdeFrom` for session windows) when you write a windowed result out or query a windowed store, otherwise you get a `Windowed cannot be cast to String` error.

> **See it in practice with Conduktor**
> When a serde error fires, the fastest question to answer is "what's actually in the bytes?" [Conduktor Console](https://docs.conduktor.io/guide/manage-kafka/kafka-resources/topics) lets you inspect the records on the input, repartition, and changelog topics, including whether a value carries the Schema Registry magic byte and schema ID, or is plain text, so you can tell a type mismatch from a wire-format mismatch without attaching a debugger.

## Next steps

- [Dead letter queues in Kafka Streams](https://www.conduktor.io/kafka-streams/dead-letter-queue), handling the bad records a serde can't decode
- [KStream, KTable & GlobalKTable](https://www.conduktor.io/kafka-streams/kstream-ktable-globalktable), where types change and serdes matter
- [Build your first Kafka Streams app](https://www.conduktor.io/kafka-streams/getting-started), configuring default and per-operator serdes from the start
