Building Consumers for CDC
HPE Ezmeral Data Fabric Streams consumers read and process CDC changed data records. The consumer is built with the OJAI API library.
Description
- Set the consumer properties using Apache Kafka and HPE Ezmeral Data Fabric configuration parameters.
- Subscribe to the stream topic.
- Consume the events and determine record type.
- Process the change data records.
The following examples refer to the MapR CDC Sample. See the QueryResult for specific API information.
Set Configuration
This code snippet configures the consumer properties using the Apache Kafka configuration parameters. See HPE Ezmeral Data Fabric Streams Configuration Parameters for Consumers. This could be externalized in a file or hard coded in the application code. The following code examples show both methods.
value.deserializer
must be set to
com.mapr.db.cdc.ChangeDataRecordDeserializer.// Consumer configuration parameters specified in application Properties consumerProperties = new Properties(); consumerProperties.setProperty("group.id", "cdc.consumer.demo_table.fts_geo"); consumerProperties.setProperty("enable.auto.commit", "true"); consumerProperties.setProperty("auto.offset.reset", "latest"); consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProperties.setProperty("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
// Consumer configuration parameters specified in an external file key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=com.mapr.db.cdc.ChangeDataRecordDeserializer enable.auto.commit=true auto.offset.reset=latest group.id=cdc.consumer.demo_table.fts_geo
Subscribe to topic
This code snippet creates the consumer and subscribes to the HPE Ezmeral Data Fabric Streams topic that contains
the change data records. The consumer is created using a key (bytes[]) and a
ChangeDataRecord
object for the value.
// Consumer used to consume MapR-DB CDC events
KafkaConsumer<byte[], ChangeDataRecord> consumer = new KafkaConsumer<byte[], ChangeDataRecord>(consumerProperties);
consumer.subscribe(Arrays.asList("/demo_changelog:demo_table"));
Consume the events and determine record type
This code snippet polls the topic to determine whether there are any changes and, if so, iterates through the change data records to retrieve the change data record IDs based on the change data record type. The ChangeDataRecordType interface is used to determine the type of record and the ChangeDataRecord interface is used to retrieve the record type and record ID.
while (true) { ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(500); Iterator<ConsumerRecord<byte[], ChangeDataRecord>> iter = changeRecords.iterator(); while (iter.hasNext()) { ConsumerRecord<byte[], ChangeDataRecord> crec = iter.next(); // The ChangeDataRecord contains all the changes made to a document ChangeDataRecord changeDataRecord = crec.value(); String documentId = changeDataRecord.getId().getString(); if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_INSERT) { System.out.println("\n\t Document Inserted " + documentId); insertAndUpdateDocument(changeDataRecord, producer); } else if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_UPDATE) { System.out.println("\n\t Document Updated " + documentId); insertAndUpdateDocument(changeDataRecord, producer); } else if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_DELETE) { System.out.println("\n\t Document Deleted " + documentId); deleteDocument(changeDataRecord, producer); } } } }
Process the records
This code snippet processes the change data records and based on the type of event (insert,
update, delete), using the ChangeDataRecordType
class and the
changeDataRecord.getType()
method, checks and retrieves the record
type.
// Use the ChangeNode Iterator to capture all the individual changes Iterator<KeyValue<FieldPath, ChangeNode>> cdrItr = changeDataRecord.iterator(); while (cdrItr.hasNext()) { Map.Entry<FieldPath, ChangeNode> changeNodeEntry = cdrItr.next(); String fieldPathAsString = changeNodeEntry.getKey().asPathString(); ChangeNode changeNode = changeNodeEntry.getValue(); ... ... }
To process and retrieve an inserted new document, you can check to see if the field path is NULL or empty. When a new document is inserted, all the changes are made in a single object represented as a Map. You then retrieve the map value by using the changeNode.getMap() or changeNode.getString() methods depending on the field value.
if (fieldPathAsString == null || fieldPathAsString.equals("")) { // Insert Map<String, Object> documentInserted = changeNode.getMap(); if (documentInserted.containsKey("firstName")) { fieldToIndex.put("firstName", (String) documentInserted.get("firstName")); sendIndexingMessage = true; } if (documentInserted.containsKey("lastName")) { fieldToIndex.put("lastName", (String) documentInserted.get("lastName")); sendIndexingMessage = true; } if (documentInserted.containsKey("address")) { addressMessage.set("address", jsonMapper.convertValue((Map)documentInserted.get("address"), JsonNode.class) ); sendAddressMessage = true; } }
To process and retrieve updated documents, you can check the field path and retrieve the value depending on the expected value type. When a document is updated, the iterator contains one ChangeNode by updated field. You can then access the field path and value directly. You then retrieve the map value by using the changeNode.getMap() or changeNode.getString() methods depending on the field value.
if (fieldPathAsString.equalsIgnoreCase("firstName")) { fieldToIndex.put("firstName", changeNode.getString()); sendIndexingMessage = true; } else if (fieldPathAsString.equalsIgnoreCase("lastName")) { fieldToIndex.put("lastName", changeNode.getString()); sendIndexingMessage = true; } else if (fieldPathAsString.equalsIgnoreCase("address")) { addressMessage.set("address", jsonMapper.convertValue( changeNode.getMap(), JsonNode.class) ); sendAddressMessage = true; }
To process delete operations, you can directly retrieve the document ID using the changeDataRecord.getId() method and process the document deletion with the deleteDocument method. The delete operation is a single change data record.