Kafka Schema Registry Demo for Protobuf
Implements a Kafka Schema Registry demo example that stores and retrieves Protobuf schemas.
Maven Dependencies
Add the following repositories to the POM file to resolve Confluent and Data Fabric
The following dependencies are needed for Protobuf and
Create a Java class that corresponds to the Protobuf schema
Add the following plugin to the
Create a file with the
extension in the
directory. For example,
. An example of a Protobuf schema is as
follows:syntax = "proto3";
package io.demo.example;
option java_outer_classname = "PersonImpl";
message Person {
int32 id = 1;
string firstName = 2;
string lastName = 3;
string email = 4;
Java class is auto-generated in the
directory after running the following
commands:$ mvn clean
$ mvn package
You can use this class in your program to manage the class
Create a Protobuf Producer
To create a Protobuf producer:
- Import the following properties for the Kafka
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig; import io.demo.example.PersonImpl.Person; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.Properties;
- Configure the following properties for the Event Data
Properties properties = new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); // Configure the KafkaProtobufSerializer. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class.getName()); // Schema registry location. properties.setProperty(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaProducer<Integer, Person> producer = new KafkaProducer<>(properties);
- Use the following code to send
different objects of classPerson.java
to the topicproto_example
in the/sample-stream
stream:String topic = "/sample-stream:proto_example"; Person person; for (int i = 0; i < n; i++) { Person person = Person.newBuilder() .setId(i) .setFirstName("John") .setLastName("Doe") .setEmail("john" + i + ".doe@mail.com") .build(); ProducerRecord<Integer, Person> record = new ProducerRecord(topic, i, person); producer.send(record, (recordMetadata, e) -> { if (e == null) { System.out.println("Success!" ); System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } }); } producer.flush(); producer.close();
Create a Protobuf Consumer
To create a Protobuf consumer:
- Import the following properties for the Kafka
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; import io.demo.example.PersonImpl.Person; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties;
- Add the
property to the properties of the Kafka Consumer to deserialize the output to the needed class.Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); //Use Kafka Protobuf Deserializer. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class.getName()); //A class generated by Protocol buffers that the message value should be deserialized to. properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Person.class.getName()); //Schema registry location. properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaConsumer<Integer, Person> consumer = new KafkaConsumer<>(properties);
- Use the following code to read objects of the
class from theproto_example
topic in the/sample-stream stream
:String topic = "/sample-stream:proto_example"; consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<Integer, Person> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { Person personRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), personRecord); }); } } finally { consumer.close(); }