Kafka Schema Registry Demo for JSON Schema
Implements a Kafka Schema Registry demo example that stores and retrieves schemas in JSON Schema format.
Maven Dependencies
Add the following repositories to the POM file to resolve Confluent and Data Fabric
        dependencies:
      <repositories>
   <repository>
       <id>confluent</id>
       <url>http://packages.confluent.io/maven/</url>
   </repository>
   <repository>
       <id>mapr-maven</id>
       <url>https://repository.mapr.com/maven/</url>
       <releases><enabled>true</enabled></releases>
       <snapshots><enabled>true</enabled></snapshots>
   </repository>
</repositories>The following dependencies are needed for JSON Schema and
        Kafka:
    <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.10.5</version>
</dependency>
<dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-json-schema-serializer</artifactId>
            <version>6.0.0.0-eep-800</version>
</dependency>Create a Java class that corresponds to JSON Schema
Create a Java class that includes Jackson annotations, for
        example:
    import com.fasterxml.jackson.annotation.JsonProperty;
public class User {
    @JsonProperty
    public String firstName;
    @JsonProperty
    public String lastName;
    @JsonProperty
    public short age;
    public User() {}
    public User(String firstName, String lastName, short age) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
    }
    public String toString() {
        return String.format("first name: " + firstName
                + "; last name: " + lastName + "; age: " + age);
    }
}Create a JSON Schema Producer
- Import the following properties for the Kafka
            Producer:
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig; import io.demo.example.User; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.Properties; - Configure the following properties for the Event Data
            Streams:
Properties properties = new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); // Configure the KafkaJsonSchemaSerializer. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class.getName()); // Schema registry location. properties.setProperty(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaProducer<Integer, User> producer = new KafkaProducer<>(properties); - Use the following code to send 
ndifferent objects of classUser.javato the topicjson-schema_examplein the/sample-streamstream:String topic = "/sample-stream:json-schema_example"; for (int i = 0; i < n; i++) { User user = new User("John" + i, "Doe", (short) (i + 30)); ProducerRecord<Integer, User> record = new ProducerRecord(topic, i, user); producer.send(record, (recordMetadata, e) -> { if (e == null) { System.out.println("Success!" ); System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } }); } producer.flush(); producer.close(); 
Create a JSON Schema Consumer
- Import the following properties for the Kafka
            Consumer:
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerConfig; import io.demo.example.User; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; - Add the 
KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPEproperty to the properties of the Kafka Consumer to deserialize the output to the needed class.Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); //Use Kafka JSON Schema Deserializer. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class.getName()); //A class that the message value should be deserialized to. properties.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE, User.class.getName()); //Schema registry location. properties.put(KafkaJsonSchemaDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(properties); - Use the following code to read objects of the 
User.javaclass from thejson-schema_exampletopic in the/sample-streamstream:String topic = "/sample-stream:json-schema_example"; consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<Integer, User> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { User userRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), userRecord); }); } } finally { consumer.close(); }