Consumer Application for CDC Binary Data

This example consumes changed data records from HPE Ezmeral Data Fabric Database Binary tables.

Example of Consuming Binary Changed Data Records

In this example, the following occurs:
  • Initialize the consumer properties using Apache Kafka and HPE Ezmeral Data Fabric configuration parameters.
  • Display the change data record properties.
  • Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
  • Display the change data record values by using the ChangeNode interface.
  • Subscribe to the stream topic, consume the events, and determine record type.
For changed data records from HPE Ezmeral Data Fabric Database Binary table data, the following are unique:
  • An additional package must be imported: java.nio.ByteBuffer
  • There is single value for single fields in documents that can be retrieved through ChangeNode interface. See the code line: ByteBuffer value = changeNode.getBinary();
package com.mapr.qa.cdc.tests.binary;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.Connection;
import org.ojai.store.Driver;
import org.ojai.store.DriverManager;
import org.ojai.store.cdc.*;

import java.nio.ByteBuffer;
import java.util.*;

public class CDCBinaryExample {

    /**
     * Initialize Basic Consumer Properties
     *
     * @return
     */
    public Properties getBasicListnerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "broker:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // Use MapR CDC Specific Deserializer to parse the change contents
        props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
        props.put("fetch.min.bytes", "10");
        props.put("fetch.wait.max.ms", "5000");
        props.put("auto.offset.reset", "earliest");
        return props;
    }

    /**
     * Display Utility
     *
     * @param consumerRecordkey
     * @param id
     * @param changeDataRecordType
     * @param recordOpTime
     * @param recordServerOpTime
     * @param field
     * @param op
     * @param changeNodeOpTime
     * @param changeNodeServerOpTime
     * @param valueType
     * @param value
     */
    public void display(String consumerRecordkey,
                        Value id,
                        ChangeDataRecordType changeDataRecordType,
                        Long recordOpTime,
                        Long recordServerOpTime,
                        String field,
                        ChangeOp op,
                        Long changeNodeOpTime,
                        Long changeNodeServerOpTime,
                        Value.Type valueType,
                        ByteBuffer value) {

        Connection mConnection = DriverManager.getConnection("ojai:mapr:");
        Driver mDriver = mConnection.getDriver();
        Document document = mDriver.newDocument();
        document.set("consumerRecordkey", consumerRecordkey);

        if (id != null)
            document.set("id", id);

        if (changeDataRecordType != null)
            document.set("changeDataRecordType", changeDataRecordType.name());

        document.set("recordOpTime", recordOpTime);
        document.set("recordServerOpTime", recordServerOpTime);

        if (field != null)
            document.set("field", field);

        document.set("op", op.name());

        document.set("changeNodeOpTime", changeNodeOpTime);
        document.set("changeNodeServerOpTime", changeNodeServerOpTime);

        if (valueType != null)
            document.set("valueType", valueType.name());

        if (value != null)
            document.set("value", new String(value.array()));

        System.out.println("\t\n********* Propagated Change **************************\t\n");
        System.out.println("\t\n" + document.asJsonString() + "\t\n");
        System.out.println("\t\n******************************************************\t\n");
    }

    /**
     * Parse change node contents via iterator
     *
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void iteratorDisplay(Value id,
                                ChangeDataRecordType changeDataRecordType,
                                Long recordOpTime,
                                Long recordServerOpTime,
                                String consumerRecordkey,
                                ChangeDataRecord changeDataRecord) {

        for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {

            // field if operation was done on a field
            String field = fieldChangePair.getKey().asJsonString();

            // Actual change node object, which holds change values
            ChangeNode changeNode = fieldChangePair.getValue();

            // Change Op, based on op done can be NULL, PUT, DELETE, DELETE_EXACT
            ChangeOp op = changeNode.getOp();

            // change node op time
            Long changeNodeOpTime = changeNode.getOpTimestamp();
            Long changeNodeServerOpTime = changeNode.getServerTimestamp();

            // the value type BINARY, if it is non delete operation
            Value.Type valueType = changeNode.getType();

            // value of the operation
            ByteBuffer value = changeNode.getBinary();

            // display the change contents
            display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
                    field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
        }
    }

    /**
     * Parse change node contents via reader
     *
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void readerDisplay(Value id,
                              ChangeDataRecordType changeDataRecordType,
                              Long recordOpTime,
                              Long recordServerOpTime,
                              String consumerRecordkey,
                              ChangeDataRecord changeDataRecord) {

        ChangeEvent changeEvent;
        // get reader from the event
        ChangeDataReader changeDataReader = changeDataRecord.getReader();

        while ((changeEvent = changeDataReader.next()) != null) {
            // parse through change events
            switch (changeEvent) {
                case NODE:
                    System.out.println("node event get the value type");
                    Value.Type valueType = changeDataReader.getType();
                    String field = changeDataReader.getFieldName();
                    Long serverTimestamp = changeDataReader.getServerTimestamp();
                    Long opTimestamp = changeDataReader.getOpTimestamp();
                    ChangeOp op = changeDataReader.getOp();
                    ByteBuffer value = changeDataReader.getBinary();

                    display(consumerRecordkey, id, changeDataRecordType,
                            recordOpTime, recordServerOpTime, field, op, opTimestamp,
                            serverTimestamp, valueType, value);
                    break;
            }
            break;
        }
    }

    /**
     * Consume from changelog topics
     *
     * @param pollTimeout
     * @param topics
     */
    public void consume(long pollTimeout, String topics, boolean method) {
        System.out.println("consume...");
        // initialize consumer
        KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
                (getBasicListnerProperties());

        // subscribe to /stream:topic
        List<String> topicList = new ArrayList<String>();
        topicList.add(topics);
        consumer.subscribe(topicList);
        consumer.seekToBeginning();

        // Get consumer records
        ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);

        // iterate over consumer records
        for (ConsumerRecord<String, ChangeDataRecord> consumerRecord : consumerRecords) {

            String consumerRecordkey = consumerRecord.key().trim();
            ChangeDataRecord changeDataRecord = consumerRecord.value();

            // record key for the change
            Value id = changeDataRecord.getId();

            // record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
            ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();

            // record level op-time & server op-time
            Long recordOpTime = changeDataRecord.getOpTimestamp();
            Long recordServerOpTime = changeDataRecord.getServerTimestamp();

            if (method) {
                // Method 1 - via iterator interface
                iteratorDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            } else {
                // Method 2 - via reader interface
                readerDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            }
        }
        consumer.close();
    }

    /**
     * Driver
     *
     * @param args
     */
    public static void main(String[] args) {
        Long pollTimeout = Long.parseLong(args[0]);
        String topic = args[1];
        boolean method = Boolean.parseBoolean(args[2]);
        CDCBinaryExample cdcBinaryExample = new CDCBinaryExample();
        cdcBinaryExample.consume(pollTimeout, topic, method);
    }
}