Kafka Schema Registry Demo for Avro
Implements a Kafka Schema Registry demo example that stores and retrieves Avro schemas.
Maven Dependencies
Add the following repositories to the POM file to resolve Confluent and Data Fabric dependencies:
<repositories>
   <repository>
       <id>confluent</id>
       <url>http://packages.confluent.io/maven/</url>
   </repository>
   <repository>
       <id>mapr-maven</id>
       <url>https://repository.mapr.com/maven/</url>
       <releases><enabled>true</enabled></releases>
       <snapshots><enabled>true</enabled></snapshots>
   </repository>
</repositories>
      The following dependencies are needed for Avro and
        Kafka:
      
    <dependency>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro</artifactId>
   <version>1.9.2</version>
</dependency>
<dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-avro-serializer</artifactId>
   <version>5.1.2.0-mapr-700</version>
</dependency>Creating a Java class that corresponds to the Avro schema
Add the following plugins to the pom.xml file:
- Plugin to build code:
          
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory> <stringType>String</stringType> <createSetters>false</createSetters> <enableDecimalLogicalType>true</enableDecimalLogicalType> <fieldVisibility>private</fieldVisibility> </configuration> </execution> </executions> </plugin> - Plugin to force the discovery of the generated classes:
          
<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>target/generated-sources/avro</source> </sources> </configuration> </execution> </executions> </plugin> - Create a file with filename extension 
.avscin thescr/main/resourcesdirectory.
          An example of an Avro schema is as follows:{ "namespace": "com.example", "type": "record", "name": "Employee", "doc" : "Represents an Employee at a company", "fields": [ {"name": "firstName", "type": "string", "doc": "The persons given name"}, {"name": "lastName", "type": "string"}, {"name": "age", "type": "int", "default": -1}, {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}}, {"name": "phoneNumber", "type": "string"} ] }TheEmployee.classJava class is auto-generated in thetarget/classes/com/exampledirectory after executing the following commands:$ mvn clean $ mvn package 
You can use this class in your program after performing these steps.
Creating an Avro Producer
- Import the following properties for the Kafka Producer:
          
import com.example.Employee; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.ArrayList; import java.util.List; import java.util.Properties; - Configure the following properties for Event Data Streams:
          
Properties properties = new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); // Configure the KafkaAvroSerializer. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); //Schema registry location. properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaProducer<Integer, Employee> producer = new KafkaProducer<>(properties); - The following code sends 
ndifferent objects of classEmployee.javato the topicavro_examplein the/sample-streamstream:String topic = "/sample-stream:avro_example"; Employee employee; for (int i = 0; i < n; i++) { List<String> emails = new ArrayList<>(); for (int j = 0; j < i; j++) { emails.add("john" + j + ".doe" + i + "@mail.com"); } employee = Employee.newBuilder() .setFirstName("John" + i) .setLastName("Doe") .setAge(i + 5) .setEmails(emails) .setPhoneNumber("+1-202-555-" + i + i + i + i) .build(); ProducerRecord<Integer, Employee> record = new ProducerRecord(topic, i, employee); producer.send(record, (recordMetadata, e) -> { if (e == null) { System.out.println("Success!" ); System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } }); } producer.flush(); producer.close(); 
Creating an Avro Consumer
- Import the following properties for the Kafka Consumer:
          
import com.example.Employee; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.util.Collections; import java.util.Properties; - The properties to configure are similar to the Kafka producer, only Deserializers must
          be used instead of Serializers. Add one more property called
            
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG:Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); //Use Kafka Avro Deserializer. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); //Use Specific Record or else you get Avro GenericRecord. properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); //Schema registry location. properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaConsumer<Integer, Employee> consumer = new KafkaConsumer<>(properties); - The following code reads objects of the 
Employee.javaclass from theavro_exampletopic in the/sample-streamstream:String topic = "/sample-stream:avro_example"; consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<Integer, Employee> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { Employee employeeRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), employeeRecord); }); } } finally { consumer.close(); }