Kafka Schema Registry Demo for Protobuf
Implements a Kafka Schema Registry demo example that stores and retrieves Protobuf schemas.
Maven Dependencies
Add the following repositories to the POM file to resolve Confluent and Data Fabric
dependencies:
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
<repository>
<id>mapr-maven</id>
<url>https://repository.mapr.com/maven/</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
The following dependencies are needed for Protobuf and
Kafka:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.17.3</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>6.0.0.0-eep-800</version>
</dependency>
Create a Java class that corresponds to the Protobuf schema
Add the following plugin to the
pom.xml
file:<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<inputDirectories>
<include>src/main/resources/</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<addSources>none</addSources>
<outputDirectory>src/main/java/</outputDirectory>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
Create a file with the
.proto
extension in the
scr/main/resources/proto
directory. For example,
person.proto
. An example of a Protobuf schema is as
follows:syntax = "proto3";
package io.demo.example;
option java_outer_classname = "PersonImpl";
message Person {
int32 id = 1;
string firstName = 2;
string lastName = 3;
string email = 4;
}
The
PersonImpl.class
Java class is auto-generated in the
src/main/java/io/demo/example
directory after running the following
commands:$ mvn clean
$ mvn package
You can use this class in your program to manage the class
Person.class
.
Create a Protobuf Producer
To create a Protobuf producer:
- Import the following properties for the Kafka
Producer:
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig; import io.demo.example.PersonImpl.Person; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.Properties;
- Configure the following properties for the Event Data
Streams:
Properties properties = new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); // Configure the KafkaProtobufSerializer. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class.getName()); // Schema registry location. properties.setProperty(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaProducer<Integer, Person> producer = new KafkaProducer<>(properties);
- Use the following code to send
n
different objects of classPerson.java
to the topicproto_example
in the/sample-stream
stream:String topic = "/sample-stream:proto_example"; Person person; for (int i = 0; i < n; i++) { Person person = Person.newBuilder() .setId(i) .setFirstName("John") .setLastName("Doe") .setEmail("john" + i + ".doe@mail.com") .build(); ProducerRecord<Integer, Person> record = new ProducerRecord(topic, i, person); producer.send(record, (recordMetadata, e) -> { if (e == null) { System.out.println("Success!" ); System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } }); } producer.flush(); producer.close();
Create a Protobuf Consumer
To create a Protobuf consumer:
- Import the following properties for the Kafka
Consumer:
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; import io.demo.example.PersonImpl.Person; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties;
- Add the
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE
property to the properties of the Kafka Consumer to deserialize the output to the needed class.Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); //Use Kafka Protobuf Deserializer. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class.getName()); //A class generated by Protocol buffers that the message value should be deserialized to. properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Person.class.getName()); //Schema registry location. properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaConsumer<Integer, Person> consumer = new KafkaConsumer<>(properties);
- Use the following code to read objects of the
Person.java
class from theproto_example
topic in the/sample-stream stream
:String topic = "/sample-stream:proto_example"; consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<Integer, Person> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { Person personRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), personRecord); }); } } finally { consumer.close(); }