# Safeguard Kafka from Timestamp-Based Data Loss

Kafka's default configuration trusts producers completely - including the timestamps they set. Combined with time-based retention, this creates a subtle failure mode: messages with old timestamps can be deleted immediately after ingestion.

The good news: Kafka 3.6+ introduced asymmetric timestamp validation (KIP-937), and there are multiple safeguards you can apply today. This post covers the mechanism, detection strategies, and how to protect your topics.

## How It Happens

Every Kafka message carries a timestamp. By default (`message.timestamp.type=CreateTime`), producers set this timestamp when creating the record. Most producers use the current system time, and everything works fine.

But the Kafka producer API accepts any timestamp value. A business timestamp. An event timestamp. A timestamp from a source system.

```java
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",           // topic
    null,               // partition
    eventTimestamp,     // <-- any long value
    key,
    value
);
```

This flexibility is intentional. Replaying historical data, migrating from other systems, preserving event time semantics - all valid use cases. The problem is how retention interacts with these timestamps.

Kafka's time-based retention (`log.retention.ms`) evaluates each closed segment independently. It compares the broker's current time against the *maximum* timestamp within that segment - typically the last message, but not guaranteed with out-of-order producers.

Here's the sequence:

1. Broker receives messages with timestamps from 2023
2. Messages append to the active segment
3. Segment rolls (reaches `segment.bytes` or `segment.ms` threshold)
4. Log retention thread evaluates the closed segment
5. `(current_broker_time - segment_max_timestamp) > retention.ms`
6. Segment scheduled for deletion
7. Data removed from disk

The [segment configuration](https://www.conduktor.io/blog/kafka-log-retention-and-segments) matters here. With the default `segment.ms=168h` (7 days), you have a week before retention even evaluates the segment. With `segment.ms=1h`, this failure manifests within an hour.

**This failure mode is brutal to diagnose.** Metrics look normal - throughput shows messages arriving. Offsets advance correctly. [Consumer lag](https://www.conduktor.io/glossary/consumer-lag-monitoring) might show zero because auto-reset policies silently handle `OffsetOutOfRangeException`. The only clue: when you actually look at the topic contents, records are missing.

A typical scenario: a customer reports consumer lag spiking then dropping to zero, downstream reconciliation jobs failing, but all Kafka metrics looking healthy. After ruling out network issues and consumer bugs, consuming messages with `print.timestamp=true` reveals they were from 2023. A CDC pipeline was replaying historical events, preserving original timestamps - and retention was deleting them as fast as they arrived.

## Safeguards

### LogAppendTime

Kafka supports two timestamp modes at the topic level:

| Mode | Source | Use Case |
|------|--------|----------|
| `CreateTime` | Producer sets timestamp | Event-time processing, replay, migration |
| `LogAppendTime` | Broker sets timestamp on arrival | Ingestion-time semantics, deterministic retention |

With `LogAppendTime`, the broker overwrites whatever timestamp the producer sends. Retention becomes predictable - messages persist for the configured duration from when they arrived.

```bash
kafka-configs.sh --alter --topic orders \
  --add-config message.timestamp.type=LogAppendTime \
  --bootstrap-server localhost:9092
```

You trade event-time semantics for predictability. If you need the original timestamp, store it in a message header:

```java
ProducerRecord<String, String> record = new ProducerRecord<>("orders", key, value);
record.headers().add("x-event-time", String.valueOf(eventTimestamp).getBytes());
producer.send(record);  // Broker will set the record timestamp on arrival
```

### Timestamp Validation (KIP-937)

Kafka has always had `log.message.timestamp.difference.max.ms` - a config that should reject messages with timestamps too far from broker time. The default is `Long.MAX_VALUE` (~292 million years) - effectively no limit. The protection exists but is disabled out of the box.

[KIP-937](https://cwiki.apache.org/confluence/display/KAFKA/KIP-937:+Improve+Message+Timestamp+Validation) (Kafka 3.6+) introduced smarter validation with separate controls for past and future:

| Config | Controls | Default |
|--------|----------|---------|
| `log.message.timestamp.before.max.ms` | How far in the past | Long.MAX_VALUE |
| `log.message.timestamp.after.max.ms` | How far in the future | Long.MAX_VALUE |

This separation matters. Future timestamps are almost always bugs (clock skew, wrong units). Past timestamps might be intentional (replay, migration).

```properties
# 30 days in ms (30 * 24 * 60 * 60 * 1000)
log.message.timestamp.before.max.ms=2592000000

# 1 hour in ms - reject future timestamps beyond clock skew tolerance
log.message.timestamp.after.max.ms=3600000
```

Future major versions will default `log.message.timestamp.after.max.ms` to one hour.

Explore all timestamp-related configs and their version history in the [Kafka Options Explorer](https://kafka-options-explorer.conduktor.io/?search=log.message.timestamp).

## Common Timestamp Bugs

### Wrong Time Units

The most common bug: passing nanoseconds or seconds instead of milliseconds.

```java
// Bug: Unix timestamp in seconds instead of milliseconds
long timestamp = Instant.now().getEpochSecond();  // e.g., 1738756800

// Kafka interprets this as milliseconds: 1738756800 ms ≈ Jan 21, 1970
// With 7-day retention, this is ~55 years old → deleted immediately
```

```java
// Bug: Epoch in microseconds (common in databases, IoT systems)
long timestamp = System.currentTimeMillis() * 1000;

// Kafka interprets: Year 50,000+
// Breaks time indexes, consumer time-based seeks fail unpredictably
```

### Source System Timestamps

Data pipelines often propagate timestamps from source systems:

```java
// CDC from legacy database
long timestamp = legacyRecord.getTimestamp();  // Could be anything
```

If the source system uses a different epoch, different precision, or contains historical data, those timestamps flow directly into Kafka.

### Timezone and Parsing Errors

Timestamps parsed from strings without explicit zones default unpredictably:

```java
// Bug: ISO string without timezone offset
String eventTime = "2024-01-01T00:00:00";  // From external system
long timestamp = LocalDateTime.parse(eventTime)
    .atZone(ZoneId.systemDefault())  // Uses JVM's timezone
    .toInstant()
    .toEpochMilli();

// Result varies by server location - could be hours off
// If servers run in different timezones, timestamps become inconsistent
```

## Gateway-Level Protection

Platform teams can intercept and normalize timestamps before they reach Kafka. A proxy layer like [Conduktor Gateway](https://www.conduktor.io/gateway) can:

1. Validate incoming timestamps against configurable bounds
2. Replace invalid timestamps with arrival time (instead of rejecting)
3. Preserve the original timestamp in a header for downstream processing
4. Apply different policies per topic or producer

This centralizes timestamp policy without requiring every producer team to implement validation correctly. Unlike broker-side validation which rejects messages outright, a gateway can fix them transparently - producers don't fail, and the original value is preserved for debugging.

## Detection and Recommendations

### How to Detect

**Inspect message timestamps** by consuming a sample:

```bash
kafka-console-consumer.sh --topic orders --from-beginning --max-messages 100 \
  --property print.timestamp=true --bootstrap-server localhost:9092
```

If timestamps are years in the past or future, you've found the problem. Tools like Conduktor Console show timestamps visually, making anomalies obvious.

**Check log start offset progression.** Run `kafka-topics.sh --describe` and note the `Log Start Offset`. If it's advancing faster than expected (e.g., jumping ahead by days worth of data), retention is deleting segments prematurely.

**Monitor topic size.** If a topic with steady ingestion shows flat or declining storage, retention is removing data as fast as it arrives.

### Recommendations

**For new topics**: Start with `LogAppendTime` unless you have a specific need for event-time semantics. Add the original event timestamp as a message header if needed.

**For existing topics**: Audit timestamp distribution before changing configs. Consume a sample of messages and inspect their timestamps.

**For data migration pipelines**: Set explicit timestamp bounds. Accept that replaying 3-year-old data into a topic with 7-day retention will lose data by design.

**For platform teams**: Consider timestamp validation at the gateway level. Don't rely on every producer implementing correct timestamp handling.

---

**Troubleshooting Kafka? Join the [Conduktor Community Slack](https://www.conduktor.io/slack) where platform engineers share war stories and solutions.**
