All articles

How to Produce and Consume Protobuf Records in Apache Kafka

~ NaN minutes

Protobuf is a powerful alternative to Apache Avro when it comes to data serialization and schemas compatibility.

Written by
Author's avatarStéphane Derosiaux
Published onDec 27, 2020
Blog's image cover

    Introduction

    Protobuf is a method to serialize/deserialize structured data.

    Data serialization is an important piece in building Streaming solutions as it makes sure that our data is consistent across our applications and we can share it easily.

    Protobuf messages are defined in .proto files and it supports generating code bindings in many different languages including Python, Java, C#, C++, Go, etc.

    We'll see how to:

    1. Create a Kafka Topic and a Protobuf Schema using Conduktor Platform
    2. Produce Protobuf records using a Java Producer
    3. Consume Protobuf records from Conduktor Platform

    Create a Kafka Topic linked to a Protobuf Schema

    To perform this example, we'll use the Conduktor Platform demo environment, which has all the foundations setup for us, so we can jump straight in.

    Head to the demo page to get started. You'll just need an email address to begin.

    The demo environment already has clusters with schema registry setup, but you can also use the free version of Conduktor Cloud; just head to our docs to learn how to get setup. We can create a topic from Conduktor Console - click the "Launch" button to open it. Next, select the "Create Topic" button in the top right-hand corner to add a new topic. Check the below screenshot where we are creating topic testprotobuftopic with 6 partitions and a replication factor of 2:

    testprotobuf

    Once the topic is created, it will appear in Conduktor with many details (min In-Sync replicas, records, compaction, topic size, etc.):

    topic-details

    Create a Protobuf schema

    Next, we need to create a protobuf schema and attach it to the above created topic.

    To create a schema, we need to create a Subject first. A subject is a lineage of compatible schemas.

    In Conduktor Console, choose Schema Registry on the left-hand menu, then click on "Create schema" in the top right corner.

    To create a subject, we need to provide a bit of configuration:

    • Format -> Protobuf
    • Strategy -> Topic Name
    • Key or Value -> Value
    • Topic -> Type the "testprotobuf" name and then select from drop down

    Our protobuf schema should look like below. Copy and paste this into the field and then create your schema.

    syntax = "proto3";
    package conduktor.v1;
    
    option java_outer_classname = "ProductProtobuf";
    
    message Product {
        int32 product_id = 1;
        string name = 2;
        string description = 3;
    }
    

    create-protobuf

    Once our schema is created, it will be shown in the Schema Registry page with the name $topicname-value. This is because we have chosen Strategy as topic name.

    schema-display

    Produce Protobuf records using a Java Producer

    Next, we are going to use a Java Producer in a Maven-based project.

    The producer will produce records with key as String and value serialized with the Protobuf Serializer. We can look at the code below for insights:

    var properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
    properties.setProperty("bootstrap.servers", "34.140.204.135:12092");
    properties.setProperty("schema.registry.url", "http://34.140.204.135/registry/");
    
    var producer = new KafkaProducer<String, ProductProtobuf.Product>(properties);
    var topic = "testprotobuftopic";
    var key = "product_id";
    var product = ProductProtobuf.Product.newBuilder()
        .setProductId(123)
        .setName("Conduktor")
        .setDescription("Conduktor Testing")
        .build();
    var record = new ProducerRecord<String, ProductProtobuf.Product>(topic, key, product);
    producer.send(record);
    

    Note: You may need to adjust the bootstrap.servers and schema.registry.url values depending on the cluster you used to create testprotobuftopic. The demo environment typically has 2 clusters, "internal" and "Secondary Cluster". Head to https://demo.conduktor.io/admin/clusters to get the details.

    We are using the protoc-jar-maven-plugin plugin in our maven pom.xml. This is necessary to generate protobuf code bindings from Product.proto so that we can use them to produce records with the Product type.

    The plugin will generate the code bindings when we'll compile the project (mvn clean compile).

    Consuming Protobuf messages from Conduktor

    Once protobuf messages are produced to the topic, we can consume them from Conduktor Platform.

    For this demo, we are going to consume data from our testprotobuftopic topic where we have produced some records. You just need to select the topic, then any consumed data will be displayed by default. The screenshot below shows the topic with a single test message, generated from the "Producer" tab. With Conduktor Platform, you can auto-generate messages that will fit to your existing schema!

    data-consume

    Once you have run your producer and sent the messages, they will display in your topic:

    consumed-records

    Conclusion

    Protobuf / Protocol buffers define how we want our data to be structured so that we can serialize and deserialize records easily and share data across our different applications in a safe manner.