How to Produce and Consume Protobuf Records in Apache Kafka

Introduction

Protobuf is a method to serialize/deserialize structured data.

Data serialization is an important piece in building Streaming solutions as it makes sure that our data is being consistent across our applications and we can share it easily.

Protobuf messages are defined in .proto files and it supports generating code bindings in many different languages including Python, Java, C#, C++, Go, etc.

We’ll see how to:

  1. Create a Kafka Topic and a Protobuf Schema using Conduktor
  2. Produce Protobuf records using a Java Producer
  3. Consuming Protobuf records from Conduktor

Create Kafka Topic linked to a Protobuf Schema

Once we have configured and connected our cluster in Conduktor, we can create a topic from Conduktor by choosing Topics on the left and then clicking on CREATE on the top right. Check the below screenshot where we are creating topic testprotobuftopic with 6 partitions and 3 replication factor:

creating a kafka topic

Once the topic is created, it will appear in Conduktor with many details (In-Sync replicas, leaders, compaction, the last time a record was published into, its consumer groups, etc.):

topic details

Create a Protobuf schema

Next, we need to create a protobuf schema and attach it to the above created topic.

To create a schema, we need to create a Subject first. A subject is a lineage of compatible schemas.

In Conduktor, choose Schema Registry on the left menu, then click on CREATE on the top right corner.

To create a subject, we need to provide a bit of configuration:

  • Format -> Protobuf
  • Strategy -> Topic Name
  • Key or Value -> Value
  • Topic -> Choose topic from drop down

Our protobuf schema looks like below. We are going to attach it to the topic testprotobuftopic created in the above step.

syntax = "proto3";
package conduktor.v1;

option java_outer_classname = "ProductProtobuf";

message Product {
    int32 product_id = 1;
    string name = 2;
    string description = 3;
}

creating a protobuf subject

Once our schema is created, it will be shown on in the Schema Registry page with the name $topicname-value. This is because we have chosen Strategy as topic name.

subject created in the schema registry

Produce Protobuf records using a Java Producer

Next, we are going to use Java Producer in a Maven-based project.

The producer will produce records with key as String and value serialized with the Protobuf Serializer. We can look at the code below for insights:

var properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("schema.registry.url", "http://localhost:8081");

var producer = new KafkaProducer<String, ProductProtobuf.Product>(properties);
var topic = "testprotobuftopic";
var key = "product_id";
var product = ProductProtobuf.Product.newBuilder()
    .setProductId(123)
    .setName("Conduktor")
    .setDescription("Conduktor Testing")
    .build();
var record = new ProducerRecord<String, ProductProtobuf.Product>(topic, key, product);
producer.send(record);

We are using protoc-jar-maven-plugin plugin in our maven pom.xml. This is necessary to generate protobuf code bindings from Product.proto so that we can use them to produce records with the Product type.

The plugin will generate the code bindings when we’ll compile the project (mvn clean compile).

Consuming Protobuf messages from Conduktor

Once protobuf messages are produced to the topic, we can consume them from Conduktor.

For this demo, we are going to consume data from our testprotobuftopic topic where we have produced some records. We select our topic, then we click on Consume Data on the top right corner.

topic details

From here, we have to configure some settings to start consuming Protobuf data from the topic:

  • Format
    • key to String (default)
    • Value to Protobuf (Schema Registry)
  • Parameters
    • Start from to the beginning (earliest)

configuring protobuf deserializer

After choosing all the above settings, we click on Start at the bottom left and we can see protobuf records flowing. (If you see odd characters here, double-check the settings, you may have forgetten to select Protobuf Deserializer)

consuming protobuf records from Kafka

Conclusion

Protobuf / Protocol buffers define how we want our data to be structured so that we can serialize and deserialize records easily and share data across our different applications in a safe manner.