Kafka Schema Registry Demo for Avro
Implements a Kafka Schema Registry demo example that stores and retrieves Avro schemas.
Maven Dependencies
Add the following repositories to the POM file to resolve Confluent and Data Fabric dependencies:
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
<repository>
<id>mapr-maven</id>
<url>https://repository.mapr.com/maven/</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
The following dependencies are needed for Avro and
Kafka:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.1.2.0-mapr-700</version>
</dependency>
Creating a Java class that corresponds to the Avro schema
Add the following plugins to the pom.xml
file:
- Plugin to build code:
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory> <stringType>String</stringType> <createSetters>false</createSetters> <enableDecimalLogicalType>true</enableDecimalLogicalType> <fieldVisibility>private</fieldVisibility> </configuration> </execution> </executions> </plugin>
- Plugin to force the discovery of the generated classes:
<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>target/generated-sources/avro</source> </sources> </configuration> </execution> </executions> </plugin>
- Create a file with filename extension
.avsc
in thescr/main/resources
directory.An example of an Avro schema is as follows:{ "namespace": "com.example", "type": "record", "name": "Employee", "doc" : "Represents an Employee at a company", "fields": [ {"name": "firstName", "type": "string", "doc": "The persons given name"}, {"name": "lastName", "type": "string"}, {"name": "age", "type": "int", "default": -1}, {"name": "emails", "default":[], "type":{"type": "array", "items": "string"}}, {"name": "phoneNumber", "type": "string"} ] }
TheEmployee.class
Java class is auto-generated in thetarget/classes/com/example
directory after executing the following commands:$ mvn clean $ mvn package
You can use this class in your program after performing these steps.
Creating an Avro Producer
- Import the following properties for the Kafka Producer:
import com.example.Employee; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.ArrayList; import java.util.List; import java.util.Properties;
- Configure the following properties for Event Data Streams:
Properties properties = new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); // Configure the KafkaAvroSerializer. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); //Schema registry location. properties.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaProducer<Integer, Employee> producer = new KafkaProducer<>(properties);
- The following code sends
n
different objects of classEmployee.java
to the topicavro_example
in the/sample-stream
stream:String topic = "/sample-stream:avro_example"; Employee employee; for (int i = 0; i < n; i++) { List<String> emails = new ArrayList<>(); for (int j = 0; j < i; j++) { emails.add("john" + j + ".doe" + i + "@mail.com"); } employee = Employee.newBuilder() .setFirstName("John" + i) .setLastName("Doe") .setAge(i + 5) .setEmails(emails) .setPhoneNumber("+1-202-555-" + i + i + i + i) .build(); ProducerRecord<Integer, Employee> record = new ProducerRecord(topic, i, employee); producer.send(record, (recordMetadata, e) -> { if (e == null) { System.out.println("Success!" ); System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } }); } producer.flush(); producer.close();
Creating an Avro Consumer
- Import the following properties for the Kafka Consumer:
import com.example.Employee; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.util.Collections; import java.util.Properties;
- The properties to configure are similar to the Kafka producer, only Deserializers must
be used instead of Serializers. Add one more property called
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
:Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); //Use Kafka Avro Deserializer. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); //Use Specific Record or else you get Avro GenericRecord. properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); //Schema registry location. properties.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaConsumer<Integer, Employee> consumer = new KafkaConsumer<>(properties);
- The following code reads objects of the
Employee.java
class from theavro_example
topic in the/sample-stream
stream:String topic = "/sample-stream:avro_example"; consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<Integer, Employee> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { Employee employeeRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), employeeRecord); }); } } finally { consumer.close(); }