Consumer Application for CDC JSON Data

This example consumes changed data records from HPE Ezmeral Data Fabric Database JSON tables.

Example of Consuming JSON Changed Data Records

In this example, the following occurs:
  • Initialize the consumer properties using Apache Kafka and Data Fabric configuration parameters.
  • Display the change data record properties.
  • Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
  • Retrieve the properties of individual change node (for example: data type, field name, field value, and so on) by using various methods of the ChangeDataReader interface.
  • Display the change data record values by using the ChangeNode interface.
  • Subscribe to the stream topic, consume the events, and determine record type.
For changed data records from HPE Ezmeral Data Fabric Database JSON table data, the following are unique:
  • There are multiple property values that can be retrieved through the ChangeDataReader interface. For example, getDouble or getFloat.
  • There are multiple values for single fields in documents that can be retrieved through ChangeNode interface. See the code line: Value value = changeNode.getValue();
package example.cdps;

import com.mapr.db.MapRDB;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.cdc.*;
import java.util.*;

public class CDPConsumer {

    /**
     * Initialize Basic Consumer Properties
     * @return
     */
    public Properties getBasicListnerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "mfs220.qa.lab:9211");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // Use MapR CDP Specific Deserializer to parse the change contents
        props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
        props.put("fetch.min.bytes", "10");
        props.put("fetch.wait.max.ms", "5000");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        return props;
    }

    /**
     * Display Utility
     * @param consumerRecordkey
     * @param id
     * @param changeDataRecordType
     * @param recordOpTime
     * @param recordServerOpTime
     * @param field
     * @param op
     * @param changeNodeOpTime
     * @param changeNodeServerOpTime
     * @param valueType
     * @param value
     */
    public void display(String consumerRecordkey,
                        Value id,
                        ChangeDataRecordType changeDataRecordType,
                        Long recordOpTime,
                        Long recordServerOpTime,
                        String field,
                        ChangeOp op,
                        Long changeNodeOpTime,
                        Long changeNodeServerOpTime,
                        Value.Type valueType,
                        Value value) {

        Document document = MapRDB.newDocument();
        document.set("consumerRecordkey",  consumerRecordkey);

        if(id != null)
            document.set("id", id);

        if(changeDataRecordType != null)
            document.set("changeDataRecordType", changeDataRecordType.name());

        document.set("recordOpTime", recordOpTime);
        document.set("recordServerOpTime", recordServerOpTime);

        if(field != null)
            document.set("field", field);

        document.set("op", op.name());

        document.set("changeNodeOpTime", changeNodeOpTime);
        document.set("changeNodeServerOpTime", changeNodeServerOpTime);

        if(valueType != null)
            document.set("valueType", valueType.name());

        if(value != null)
            document.set("value", value);

        System.out.println("\t\n********* Propagated Change **************************\t\n");
        System.out.println("\t\n" + document.asJsonString() + "\t\n");
        System.out.println("\t\n******************************************************\t\n");
    }

    /**
     * Parse change node contents via iterator
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void iteratorDisplay(Value id,
                                ChangeDataRecordType changeDataRecordType,
                                Long recordOpTime,
                                Long recordServerOpTime,
                                String consumerRecordkey,
                                ChangeDataRecord changeDataRecord) {

        for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {

            // field if operation was done one a field
            String field = fieldChangePair.getKey().asJsonString();

            // Actual change node object, which holds change values
            ChangeNode changeNode = fieldChangePair.getValue();

            // Change Op, based on op done can be NULL, SET, MERGE, DELETE, DELETE_EXACT
            ChangeOp op = changeNode.getOp();

            // change node op time
            Long changeNodeOpTime = changeNode.getOpTimestamp();
            Long changeNodeServerOpTime  = changeNode.getServerTimestamp();

            // the value type if it was non delete operation, such as insert replace etc
            Value.Type valueType = changeNode.getType();

            // value of the operation such as insert value or replace
            Value value = changeNode.getValue();

            // display the change contents
            display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
                    field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
        }
    }

    /**
     * Get Parsed Value
     * @param changeDataReader
     * @param field
     * @param valueType
     * @return
     */
    public Value getValue(ChangeDataReader changeDataReader, String field, Value.Type valueType) {
        Document valDoc = MapRDB.newDocument();

        if(field == null) {
            valDoc.setNull("null");
            field = "null";
        }

        switch (valueType) {
            case NULL:
                valDoc.setNull(field);
                break;
            case BOOLEAN:
                valDoc.set(field, changeDataReader.getBoolean());
                break;
            case STRING:
                valDoc.set(field, changeDataReader.getString());
                break;
            case SHORT:
                valDoc.set(field, changeDataReader.getShort());
                break;
            case BYTE:
                valDoc.set(field, changeDataReader.getByte());
                break;
            case INT:
                valDoc.set(field, changeDataReader.getInt());
                break;
            case LONG:
                valDoc.set(field, changeDataReader.getLong());
                break;
            case FLOAT:
                valDoc.set(field, changeDataReader.getFloat());
                break;
            case DOUBLE:
                valDoc.set(field, changeDataReader.getDouble());
                break;
            case DECIMAL:
                valDoc.set(field, changeDataReader.getDecimal());
                break;
            case DATE:
                valDoc.set(field, changeDataReader.getDate());
                break;
            case TIME:
                valDoc.set(field, changeDataReader.getTime());
                break;
            case TIMESTAMP:
                valDoc.set(field, changeDataReader.getTimestamp());
                break;
            case INTERVAL:
                valDoc.set(field, changeDataReader.getInterval());
                break;
            case BINARY:
                valDoc.set(field, changeDataReader.getBinary());
                break;
            default:
                break;
        }
        return valDoc.getValue(field);
    }
    /**
     * Parse change node contents via reader
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void readerDisplay(Value id,
                              ChangeDataRecordType changeDataRecordType,
                              Long recordOpTime,
                              Long recordServerOpTime,
                              String consumerRecordkey,
                              ChangeDataRecord changeDataRecord) {
        System.out.println("Reader");
        ChangeEvent changeEvent;
        // get reader from the event
        ChangeDataReader changeDataReader = changeDataRecord.getReader();

        while ((changeEvent = changeDataReader.next()) != null) {
            // parse through change events
            switch (changeEvent) {
                case NODE:
                    System.out.println("node event get the value type");
                    Value.Type valueType = changeDataReader.getType();
                    String field = changeDataReader.getFieldName();
                    Long serverTimestamp = changeDataReader.getServerTimestamp();
                    Long opTimestamp = changeDataReader.getOpTimestamp();
                    ChangeOp op = changeDataReader.getOp();
                    Value value = getValue(changeDataReader, field, valueType);

                    display(consumerRecordkey, id, changeDataRecordType,
                        recordOpTime, recordServerOpTime, field, op, opTimestamp,
                            serverTimestamp, valueType, value);
                    break;
            }
            break;
        }
    }

    /**
     * Consume from changelog topics
     * @param pollTimeout
     * @param topics
     */
    public void consume(long pollTimeout, String topics, boolean method) {
        System.out.println("consume...");
        // initialize consumer
        KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
                (getBasicListnerProperties());

        // subscribe to /stream:topic
        List<String> topicList = new ArrayList<String>();
        topicList.add(topics);
        consumer.subscribe(topicList);
        consumer.seekToBeginning();

        // Get consumer records
        ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);

        // iterate over consumer records
        for(ConsumerRecord<String, ChangeDataRecord> consumerRecord: consumerRecords) {

            String consumerRecordkey = consumerRecord.key().trim();
            ChangeDataRecord changeDataRecord = consumerRecord.value();

            // record key for the change
            Value id = changeDataRecord.getId();

            // record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
            ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();

            // record level op-time & server op-time
            Long recordOpTime = changeDataRecord.getOpTimestamp();
            Long recordServerOpTime = changeDataRecord.getServerTimestamp();

            if(method) {
                // Method 1 - via iterator interface
                iteratorDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            } else {
                // Method 2 - via reader interface
                readerDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            }
        }
        consumer.close();
    }

    /**
     * Driver
     * @param args
     */
    public static void main(String[] args) {
        Long pollTimeout = Long.parseLong(args[0]);
        String topic = args[1];
        boolean method = Boolean.parseBoolean(args[2]);
        CDPConsumer cdpConsumer = new CDPConsumer();
        cdpConsumer.consume(pollTimeout, topic, method);
    }
}