Kafka Schema Registry Demo for JSON Schema
Implements a Kafka Schema Registry demo example that stores and retrieves schemas in JSON Schema format.
Maven Dependencies
Add the following repositories to the POM file to resolve Confluent and Data Fabric
dependencies:
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
<repository>
<id>mapr-maven</id>
<url>https://repository.mapr.com/maven/</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
The following dependencies are needed for JSON Schema and
Kafka:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>6.0.0.0-eep-800</version>
</dependency>
Create a Java class that corresponds to JSON Schema
Create a Java class that includes Jackson annotations, for
example:
import com.fasterxml.jackson.annotation.JsonProperty;
public class User {
@JsonProperty
public String firstName;
@JsonProperty
public String lastName;
@JsonProperty
public short age;
public User() {}
public User(String firstName, String lastName, short age) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
}
public String toString() {
return String.format("first name: " + firstName
+ "; last name: " + lastName + "; age: " + age);
}
}
Create a JSON Schema Producer
- Import the following properties for the Kafka
Producer:
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig; import io.demo.example.User; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.Properties;
- Configure the following properties for the Event Data
Streams:
Properties properties = new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); // Configure the KafkaJsonSchemaSerializer. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class.getName()); // Schema registry location. properties.setProperty(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaProducer<Integer, User> producer = new KafkaProducer<>(properties);
- Use the following code to send
n
different objects of classUser.java
to the topicjson-schema_example
in the/sample-stream
stream:String topic = "/sample-stream:json-schema_example"; for (int i = 0; i < n; i++) { User user = new User("John" + i, "Doe", (short) (i + 30)); ProducerRecord<Integer, User> record = new ProducerRecord(topic, i, user); producer.send(record, (recordMetadata, e) -> { if (e == null) { System.out.println("Success!" ); System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } }); } producer.flush(); producer.close();
Create a JSON Schema Consumer
- Import the following properties for the Kafka
Consumer:
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerConfig; import io.demo.example.User; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties;
- Add the
KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE
property to the properties of the Kafka Consumer to deserialize the output to the needed class.Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); //Use Kafka JSON Schema Deserializer. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class.getName()); //A class that the message value should be deserialized to. properties.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE, User.class.getName()); //Schema registry location. properties.put(KafkaJsonSchemaDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(properties);
- Use the following code to read objects of the
User.java
class from thejson-schema_example
topic in the/sample-stream
stream:String topic = "/sample-stream:json-schema_example"; consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<Integer, User> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { User userRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), userRecord); }); } } finally { consumer.close(); }