ArrowBack to blog

How to Produce and Consume Protobuf Records in Apache Kafka

Protobuf is a powerful alternative to Apache Avro when it comes to data serialization and schemas compatibility.

Author's avatar
Stéphane Derosiaux
December 27th, 2020
Blog's image

Introduction

Protobuf is a method to serialize/deserialize structured data.

Data serialization is an important piece in building Streaming solutions as it makes sure that our data is consistent across our applications and we can share it easily.

Protobuf messages are defined in .proto files and it supports generating code bindings in many different languages including Python, Java, C#, C++, Go, etc.

We'll see how to:

  1. Create a Kafka Topic and a Protobuf Schema using Conduktor Platform

  2. Produce Protobuf records using a Java Producer

  3. Consume Protobuf records from Conduktor Platform

Create a Kafka Topic linked to a Protobuf Schema

To perform this example, we'll use the Conduktor Platform demo environment, which has all the foundations setup for us, so we can jump straight in.

Head to the demo page to get started. You'll just need an email address to begin.

The demo environment already has clusters with schema registry setup, but you can also use the free version of Conduktor Cloud; just head to our docs to learn how to get setup. We can create a topic from Conduktor Console - click the "Launch" button to open it. Next, select the "Create Topic" button in the top right-hand corner to add a new topic. Check the below screenshot where we are creating topic testprotobuftopic with 6 partitions and a replication factor of 2:

Once the topic is created, it will appear in Conduktor with many details (min In-Sync replicas, records, compaction, topic size, etc.):

Create a Protobuf schema

Next, we need to create a protobuf schema and attach it to the above created topic.

To create a schema, we need to create a Subject first. A subject is a lineage of compatible schemas.

In Conduktor Console, choose Schema Registry on the left-hand menu, then click on "Create schema" in the top right corner.

To create a subject, we need to provide a bit of configuration:

  • Format

    ->

    Protobuf

  • Strategy

    ->

    Topic Name

  • Key or Value

    ->

    Value

  • Topic

    ->

    Type the "testprotobuf" name and then select from drop down

Our protobuf schema should look like below. Copy and paste this into the field and then create your schema.

syntax = "proto3"; package conduktor.v1; option java_outer_classname = "ProductProtobuf"; message Product { int32 product_id = 1; string name = 2; string description = 3; }

Once our schema is created, it will be shown in the Schema Registry page with the name $topicname-value. This is because we have chosen Strategy as topic name.

Produce Protobuf records using a Java Producer

Next, we are going to use a Java Producer in a Maven-based project.

The producer will produce records with key as String and value serialized with the Protobuf Serializer. We can look at the code below for insights:

var properties = new Properties(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class); properties.setProperty("bootstrap.servers", "34.140.204.135:12092"); properties.setProperty("schema.registry.url", "http://34.140.204.135/registry/"); var producer = new KafkaProducer(properties); var topic = "testprotobuftopic"; var key = "product_id"; var product = ProductProtobuf.Product.newBuilder() .setProductId(123) .setName("Conduktor") .setDescription("Conduktor Testing") .build(); var record = new ProducerRecord(topic, key, product); producer.send(record);

Note: You may need to adjust the bootstrap.servers and schema.registry.url values depending on the cluster you used to create testprotobuftopic. The demo environment typically has 2 clusters, "internal" and "Secondary Cluster". Head to https://demo.conduktor.io/admin/clusters to get the details.

We are using the protoc-jar-maven-plugin plugin in our maven pom.xml. This is necessary to generate protobuf code bindings from Product.proto so that we can use them to produce records with the Product type.

The plugin will generate the code bindings when we'll compile the project (mvn clean compile).

Consuming Protobuf messages from Conduktor

Once protobuf messages are produced to the topic, we can consume them from Conduktor Platform.

For this demo, we are going to consume data from our testprotobuftopic topic where we have produced some records. You just need to select the topic, then any consumed data will be displayed by default. The screenshot below shows the topic with a single test message, generated from the "Producer" tab. With Conduktor Platform, you can auto-generate messages that will fit to your existing schema!

Once you have run your producer and sent the messages, they will display in your topic:

Conclusion

Protobuf / Protocol buffers define how we want our data to be structured so that we can serialize and deserialize records easily and share data across our different applications in a safe manner.

We aim to accelerate Kafka projects delivery by making developers and organizations more efficient with Kafka.