Your First Kafka Producer: Python, Java, and Go Examples

Build your first Kafka producer in Python, Java, and Go. Working code examples, common connection errors, and essential configuration for reliable messa...

Stéphane DerosiauxStéphane Derosiaux · January 28, 2025 ·
Your First Kafka Producer: Python, Java, and Go Examples

Every Kafka journey starts with a producer. You want to get data in, see it arrive, and understand what happened. This is the shortest path from nothing to messages flowing.

I've helped dozens of teams get their first producer running. The code is simple. The errors are predictable. Here's what actually works.

Our first producer took 3 days because we were debugging network issues that turned out to be a missing port mapping. Now we share this guide with every new developer.

Platform Engineer at a startup

Start Kafka

docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 \
  -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  bitnami/kafka:latest

Create a test topic:

kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic demo-topic --partitions 3 --replication-factor 1

Python (confluent-kafka)

pip install confluent-kafka
from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    if err:
        print(f'Failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer.produce('demo-topic', key='user-1', value='Hello Kafka', callback=delivery_report)
producer.flush()

Common error: BufferError: Local: Queue full

You're producing faster than the network can send. Call poll() between produces:

for i in range(1000000):
    producer.produce('demo-topic', value=f'message-{i}')
    producer.poll(0)
producer.flush()

Java

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.0</version>
</dependency>
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    ProducerRecord<String, String> record = new ProducerRecord<>("demo-topic", "user-1", "Hello Kafka");
    producer.send(record, (metadata, exception) -> {
        if (exception != null) {
            System.err.println("Failed: " + exception.getMessage());
        } else {
            System.out.printf("Delivered to %s [%d] @ %d%n",
                metadata.topic(), metadata.partition(), metadata.offset());
        }
    });
    producer.flush();
}

Common error: Missing required configuration "key.serializer"

Always specify both serializers. Kafka doesn't know how to convert your objects to bytes without them.

Production tip: Wrap sends in try-catch for transient errors:

try {
    producer.send(record, callback).get(10, TimeUnit.SECONDS);
} catch (TimeoutException | ExecutionException e) {
    log.error("Send failed, retrying: {}", e.getMessage());
    // Implement retry logic or dead-letter queue
}

Go (franz-go)

go get github.com/twmb/franz-go/pkg/kgo
cl, err := kgo.NewClient(kgo.SeedBrokers("localhost:9092"))
if err != nil {
    log.Fatal(err)
}
defer cl.Close()

record := &kgo.Record{
    Topic: "demo-topic",
    Key:   []byte("user-1"),
    Value: []byte("Hello Kafka"),
}

if err := cl.ProduceSync(context.Background(), record).FirstErr(); err != nil {
    log.Fatalf("Failed: %v", err)
}
fmt.Printf("Delivered to %s [%d] @ %d\n", record.Topic, record.Partition, record.Offset)

Common error: UNKNOWN_TOPIC_OR_PARTITION

Topic doesn't exist and auto-creation is disabled. Create the topic first or enable auto.create.topics.enable=true on the broker.

Key Configuration

ConfigDefaultRecommendation
acks1all for durability
linger.ms05-20 for throughput
compression.typenonelz4 or zstd
acks=all waits for all in-sync replicas. Slower but durable.

linger.ms=20 batches messages for 20ms before sending. Reduces network calls by 10x at the cost of 20ms latency.

Verify Messages Arrived

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic demo-topic --from-beginning --max-messages 5

Or with more detail:

kcat -b localhost:9092 -t demo-topic -C -f 'Partition: %p, Offset: %o, Key: %k, Value: %s\n' -e

Building a producer is step one. Monitoring what happens after is step two. Conduktor Console provides visibility into message delivery, partition distribution, and consumer lag.

Book a demo to see how Conduktor Console shows where your messages land, with lag tracking and partition distribution.