Kafka Schema Registry Demo for Protobuf

Implements a Kafka Schema Registry demo example that stores and retrieves Protobuf schemas.

Maven Dependencies

Add the following repositories to the POM file to resolve Confluent and Data Fabric dependencies:
<repositories>
   <repository>
       <id>confluent</id>
       <url>http://packages.confluent.io/maven/</url>
   </repository>
   <repository>
       <id>mapr-maven</id>
       <url>https://repository.mapr.com/maven/</url>
       <releases><enabled>true</enabled></releases>
       <snapshots><enabled>true</enabled></snapshots>
   </repository>
</repositories>
The following dependencies are needed for Protobuf and Kafka:
<dependency>
     <groupId>com.google.protobuf</groupId>
     <artifactId>protobuf-java</artifactId>
     <version>3.17.3</version>
</dependency>

<dependency>
     <groupId>io.confluent</groupId>
     <artifactId>kafka-protobuf-serializer</artifactId>
     <version>6.0.0.0-eep-800</version>
</dependency>

Create a Java class that corresponds to the Protobuf schema

Add the following plugin to the pom.xml file:
<plugin>
    <groupId>com.github.os72</groupId>
    <artifactId>protoc-jar-maven-plugin</artifactId>
    <version>3.11.4</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>run</goal>
            </goals>
            <configuration>
                <inputDirectories>
                    <include>src/main/resources/</include>
                </inputDirectories>
                <outputTargets>
                    <outputTarget>
                        <type>java</type>
                        <addSources>none</addSources>
                        <outputDirectory>src/main/java/</outputDirectory>
                    </outputTarget>
                </outputTargets>
            </configuration>
        </execution>
    </executions>
</plugin>
Create a file with the .proto extension in the scr/main/resources/proto directory. For example, person.proto. An example of a Protobuf schema is as follows:
syntax = "proto3";
package io.demo.example;
option java_outer_classname = "PersonImpl";

message Person {
  int32 id = 1;
  string firstName = 2;
  string lastName = 3;
  string email = 4;
}
The PersonImpl.class Java class is auto-generated in the src/main/java/io/demo/example directory after running the following commands:
$ mvn clean
$ mvn package 

You can use this class in your program to manage the class Person.class.

Create a Protobuf Producer

To create a Protobuf producer:
  1. Import the following properties for the Kafka Producer:
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig;
    import io.demo.example.PersonImpl.Person;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    
    import java.util.Properties;
  2. Configure the following properties for the Event Data Streams:
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            IntegerSerializer.class.getName());
    
    // Configure the KafkaProtobufSerializer.
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            KafkaProtobufSerializer.class.getName());
    
    // Schema registry location.
    properties.setProperty(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaProducer<Integer, Person> producer = 
            new KafkaProducer<>(properties);
  3. Use the following code to send n different objects of class Person.java to the topic proto_example in the /sample-stream stream:
    String topic = "/sample-stream:proto_example";
    Person person;
    
    for (int i = 0; i < n; i++) {
                Person person = Person.newBuilder()
                        .setId(i)
                        .setFirstName("John")
                        .setLastName("Doe")
                        .setEmail("john" + i + ".doe@mail.com")
                        .build();
    
                ProducerRecord<Integer, Person> record =
                        new ProducerRecord(topic, i, person);
    
                producer.send(record, (recordMetadata, e) -> {
                    if (e == null) {
                        System.out.println("Success!" );
                        System.out.println(recordMetadata.toString());
                    } else {
                        e.printStackTrace();
                    }
                });
    }
    
    producer.flush();
    producer.close();

Create a Protobuf Consumer

To create a Protobuf consumer:
  1. Import the following properties for the Kafka Consumer:
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
    import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
    import io.demo.example.PersonImpl.Person;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
  2. Add the KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE property to the properties of the Kafka Consumer to deserialize the output to the needed class.
    Properties properties = new Properties();
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            IntegerDeserializer.class.getName());
    
    //Use Kafka Protobuf Deserializer.
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            KafkaProtobufDeserializer.class.getName());
    
    //A class generated by Protocol buffers that the message value should be deserialized to. 
    properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE,
            Person.class.getName());
    
    //Schema registry location.
    properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaConsumer<Integer, Person> consumer = 
            new KafkaConsumer<>(properties);
  3. Use the following code to read objects of the Person.java class from the proto_example topic in the /sample-stream stream:
    
    String topic = "/sample-stream:proto_example";
    consumer.subscribe(Collections.singletonList(topic));
    
    try {
        while (true) {
                    ConsumerRecords<Integer, Person> records =
                            consumer.poll(Duration.ofMillis(100));
    
                    records.forEach(record -> {
    
                        Person personRecord = record.value();
    
                        System.out.printf("%s %d %d %s \n", record.topic(),
                                record.partition(), record.offset(), personRecord);
                    });
        }
    } finally {
        consumer.close();
    }