Building Consumers for CDC

HPE Ezmeral Data Fabric Streams consumers read and process CDC changed data records. The consumer is built with the OJAI API library.

Description

When building a consumer, the general steps are to:
  • Set the consumer properties using Apache Kafka and HPE Ezmeral Data Fabric configuration parameters.
  • Subscribe to the stream topic.
  • Consume the events and determine record type.
  • Process the change data records.

The following examples refer to the MapR CDC Sample. See the QueryResult for specific API information.

Set Configuration

This code snippet configures the consumer properties using the Apache Kafka configuration parameters. See HPE Ezmeral Data Fabric Streams Configuration Parameters for Consumers. This could be externalized in a file or hard coded in the application code. The following code examples show both methods.

NOTE
CDC uses a optimized serialization format for all the events, so value.deserializer must be set to com.mapr.db.cdc.ChangeDataRecordDeserializer.
// Consumer configuration parameters specified in application

    Properties consumerProperties = new Properties();
    consumerProperties.setProperty("group.id", "cdc.consumer.demo_table.fts_geo");
    consumerProperties.setProperty("enable.auto.commit", "true");
    consumerProperties.setProperty("auto.offset.reset", "latest");
    consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    consumerProperties.setProperty("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
      
// Consumer configuration parameters specified in an external file

key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=com.mapr.db.cdc.ChangeDataRecordDeserializer 
enable.auto.commit=true
auto.offset.reset=latest
group.id=cdc.consumer.demo_table.fts_geo 
      

Subscribe to topic

This code snippet creates the consumer and subscribes to the HPE Ezmeral Data Fabric Streams topic that contains the change data records. The consumer is created using a key (bytes[]) and a ChangeDataRecord object for the value.

// Consumer used to consume MapR-DB CDC events

KafkaConsumer<byte[], ChangeDataRecord> consumer = new KafkaConsumer<byte[], ChangeDataRecord>(consumerProperties);
consumer.subscribe(Arrays.asList("/demo_changelog:demo_table"));

Consume the events and determine record type

This code snippet polls the topic to determine whether there are any changes and, if so, iterates through the change data records to retrieve the change data record IDs based on the change data record type. The ChangeDataRecordType interface is used to determine the type of record and the ChangeDataRecord interface is used to retrieve the record type and record ID.

    while (true) {
      ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(500);
      Iterator<ConsumerRecord<byte[], ChangeDataRecord>> iter = changeRecords.iterator();

      while (iter.hasNext()) {
        ConsumerRecord<byte[], ChangeDataRecord> crec = iter.next();
        // The ChangeDataRecord contains all the changes made to a document
        ChangeDataRecord changeDataRecord = crec.value();
        String documentId = changeDataRecord.getId().getString();

        if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_INSERT) {
          System.out.println("\n\t Document Inserted " + documentId);
          insertAndUpdateDocument(changeDataRecord, producer);
        } else if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_UPDATE) {
          System.out.println("\n\t Document Updated " + documentId);
          insertAndUpdateDocument(changeDataRecord, producer);
        } else if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_DELETE) {
          System.out.println("\n\t Document Deleted " + documentId);
          deleteDocument(changeDataRecord, producer);
        }


      }
    }

  }
     

Process the records

This code snippet processes the change data records and based on the type of event (insert, update, delete), using the ChangeDataRecordType class and the changeDataRecord.getType() method, checks and retrieves the record type.

// Use the ChangeNode Iterator to capture all the individual changes

    Iterator<KeyValue<FieldPath, ChangeNode>> cdrItr = changeDataRecord.iterator();

    while (cdrItr.hasNext()) {
      Map.Entry<FieldPath, ChangeNode> changeNodeEntry = cdrItr.next();
      String fieldPathAsString = changeNodeEntry.getKey().asPathString();
      ChangeNode changeNode = changeNodeEntry.getValue();
      ...
      ...
    }

To process and retrieve an inserted new document, you can check to see if the field path is NULL or empty. When a new document is inserted, all the changes are made in a single object represented as a Map. You then retrieve the map value by using the changeNode.getMap() or changeNode.getString() methods depending on the field value.

if (fieldPathAsString == null || fieldPathAsString.equals("")) { // Insert
        Map<String, Object> documentInserted = changeNode.getMap();

        if (documentInserted.containsKey("firstName")) {
          fieldToIndex.put("firstName", (String) documentInserted.get("firstName"));
          sendIndexingMessage = true;
        }

        if (documentInserted.containsKey("lastName")) {
          fieldToIndex.put("lastName", (String) documentInserted.get("lastName"));
          sendIndexingMessage = true;
        }

        if (documentInserted.containsKey("address")) {
          addressMessage.set("address", jsonMapper.convertValue((Map)documentInserted.get("address"), JsonNode.class) );
          sendAddressMessage = true;
        }     
     }

To process and retrieve updated documents, you can check the field path and retrieve the value depending on the expected value type. When a document is updated, the iterator contains one ChangeNode by updated field. You can then access the field path and value directly. You then retrieve the map value by using the changeNode.getMap() or changeNode.getString() methods depending on the field value.

if (fieldPathAsString.equalsIgnoreCase("firstName")) {
          fieldToIndex.put("firstName", changeNode.getString());
          sendIndexingMessage = true;
        } 
          else if (fieldPathAsString.equalsIgnoreCase("lastName")) {
          fieldToIndex.put("lastName", changeNode.getString());
          sendIndexingMessage = true;
        } 
          else if (fieldPathAsString.equalsIgnoreCase("address")) {
          addressMessage.set("address", jsonMapper.convertValue( changeNode.getMap(), JsonNode.class) );
          sendAddressMessage = true;
        }

To process delete operations, you can directly retrieve the document ID using the changeDataRecord.getId() method and process the document deletion with the deleteDocument method. The delete operation is a single change data record.