Kafka Schema Registry Demo for JSON Schema

Implements a Kafka Schema Registry demo example that stores and retrieves schemas in JSON Schema format.

Maven Dependencies

Add the following repositories to the POM file to resolve Confluent and Data Fabric dependencies:
<repositories>
   <repository>
       <id>confluent</id>
       <url>http://packages.confluent.io/maven/</url>
   </repository>
   <repository>
       <id>mapr-maven</id>
       <url>https://repository.mapr.com/maven/</url>
       <releases><enabled>true</enabled></releases>
       <snapshots><enabled>true</enabled></snapshots>
   </repository>
</repositories>
The following dependencies are needed for JSON Schema and Kafka:
<dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.10.5</version>
</dependency>

<dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-json-schema-serializer</artifactId>
            <version>6.0.0.0-eep-800</version>
</dependency>

Create a Java class that corresponds to JSON Schema

Create a Java class that includes Jackson annotations, for example:
import com.fasterxml.jackson.annotation.JsonProperty;

public class User {
    @JsonProperty
    public String firstName;
    @JsonProperty
    public String lastName;
    @JsonProperty
    public short age;
    public User() {}
    public User(String firstName, String lastName, short age) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
    }
    public String toString() {
        return String.format("first name: " + firstName
                + "; last name: " + lastName + "; age: " + age);
    }
}

Create a JSON Schema Producer

  1. Import the following properties for the Kafka Producer:
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig;
    import io.demo.example.User;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    
    import java.util.Properties;
    
  2. Configure the following properties for the Event Data Streams:
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            IntegerSerializer.class.getName());
    
    // Configure the KafkaJsonSchemaSerializer.
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            KafkaJsonSchemaSerializer.class.getName());
    
    // Schema registry location.
    properties.setProperty(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaProducer<Integer, User> producer = 
            new KafkaProducer<>(properties);
  3. Use the following code to send n different objects of class User.java to the topic json-schema_example in the /sample-stream stream:
    String topic = "/sample-stream:json-schema_example";
    
    for (int i = 0; i < n; i++) {
                User user = new User("John" + i, "Doe", (short) (i + 30));
    
                ProducerRecord<Integer, User> record =
                        new ProducerRecord(topic, i, user);
    
                producer.send(record, (recordMetadata, e) -> {
                    if (e == null) {
                        System.out.println("Success!" );
                        System.out.println(recordMetadata.toString());
                    } else {
                        e.printStackTrace();
                    }
                });
    }
    
    producer.flush();
    producer.close();

Create a JSON Schema Consumer

  1. Import the following properties for the Kafka Consumer:
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerConfig;
    import io.demo.example.User;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
  2. Add the KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE property to the properties of the Kafka Consumer to deserialize the output to the needed class.
    Properties properties = new Properties();
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            IntegerDeserializer.class.getName());
    
    //Use Kafka JSON Schema Deserializer.
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            KafkaJsonSchemaDeserializer.class.getName());
    
    //A class that the message value should be deserialized to. 
    properties.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE,
    
            User.class.getName());
    
    //Schema registry location.
    properties.put(KafkaJsonSchemaDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaConsumer<Integer, User> consumer = 
            new KafkaConsumer<>(properties);
  3. Use the following code to read objects of the User.java class from the json-schema_example topic in the /sample-stream stream:
    String topic = "/sample-stream:json-schema_example";
    consumer.subscribe(Collections.singletonList(topic));
    
    try {
        while (true) {
                    ConsumerRecords<Integer, User> records =
                            consumer.poll(Duration.ofMillis(100));
    
                    records.forEach(record -> {
    
                        User userRecord = record.value();
    
                        System.out.printf("%s %d %d %s \n", record.topic(),
                                record.partition(), record.offset(), userRecord);
                    });
        }
    } finally {
        consumer.close();
    }