Kafka Schema Registry Demo for Protobuf

Implements a Kafka Schema Registry demo example that stores and retrieves Protobuf schemas.

Maven Dependencies

Add the following repositories to the POM file to resolve Confluent and Data Fabric dependencies:
The following dependencies are needed for Protobuf and Kafka:


Create a Java class that corresponds to the Protobuf schema

Add the following plugin to the pom.xml file:
Create a file with the .proto extension in the scr/main/resources/proto directory. For example, person.proto. An example of a Protobuf schema is as follows:
syntax = "proto3";
package io.demo.example;
option java_outer_classname = "PersonImpl";

message Person {
  int32 id = 1;
  string firstName = 2;
  string lastName = 3;
  string email = 4;
The PersonImpl.class Java class is auto-generated in the src/main/java/io/demo/example directory after running the following commands:
$ mvn clean
$ mvn package 

You can use this class in your program to manage the class Person.class.

Create a Protobuf Producer

To create a Protobuf producer:
  1. Import the following properties for the Kafka Producer:
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
    import io.demo.example.PersonImpl.Person;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    import java.util.Properties;
  2. Configure the following properties for the Event Data Streams:
    Properties properties = new Properties();
    // Configure the KafkaProtobufSerializer.
    // Schema registry location.
    KafkaProducer<Integer, Person> producer = 
            new KafkaProducer<>(properties);
  3. Use the following code to send n different objects of class Person.java to the topic proto_example in the /sample-stream stream:
    String topic = "/sample-stream:proto_example";
    Person person;
    for (int i = 0; i < n; i++) {
                Person person = Person.newBuilder()
                        .setEmail("john" + i + ".doe@mail.com")
                ProducerRecord<Integer, Person> record =
                        new ProducerRecord(topic, i, person);
                producer.send(record, (recordMetadata, e) -> {
                    if (e == null) {
                        System.out.println("Success!" );
                    } else {

Create a Protobuf Consumer

To create a Protobuf consumer:
  1. Import the following properties for the Kafka Consumer:
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
    import io.demo.example.PersonImpl.Person;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
  2. Add the KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE property to the properties of the Kafka Consumer to deserialize the output to the needed class.
    Properties properties = new Properties();
    //Use Kafka Protobuf Deserializer.
    //A class generated by Protocol buffers that the message value should be deserialized to. 
    //Schema registry location.
    KafkaConsumer<Integer, Person> consumer = 
            new KafkaConsumer<>(properties);
  3. Use the following code to read objects of the Person.java class from the proto_example topic in the /sample-stream stream:
    String topic = "/sample-stream:proto_example";
    try {
        while (true) {
                    ConsumerRecords<Integer, Person> records =
                    records.forEach(record -> {
                        Person personRecord = record.value();
                        System.out.printf("%s %d %d %s \n", record.topic(),
                                record.partition(), record.offset(), personRecord);
    } finally {