Consumer Application for CDC Binary Data
This example consumes changed data records from HPE Ezmeral Data Fabric Database Binary tables.
Example of Consuming Binary Changed Data Records
In this example, the following occurs:
For changed data records from HPE Ezmeral Data Fabric Database Binary table data, the following are unique: - Initialize the consumer properties using Apache Kafka and HPE Ezmeral Data Fabric configuration parameters.
- Display the change data record properties.
- Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
- Display the change data record values by using the ChangeNode interface.
- Subscribe to the stream topic, consume the events, and determine record type.
- An additional package must be imported:
java.nio.ByteBuffer
- There is single value for single fields in documents that can be retrieved through
ChangeNode interface. See the code line:
ByteBuffer value = changeNode.getBinary();
package com.mapr.qa.cdc.tests.binary;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.Connection;
import org.ojai.store.Driver;
import org.ojai.store.DriverManager;
import org.ojai.store.cdc.*;
import java.nio.ByteBuffer;
import java.util.*;
public class CDCBinaryExample {
/**
* Initialize Basic Consumer Properties
*
* @return
*/
public Properties getBasicListnerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "broker:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Use MapR CDC Specific Deserializer to parse the change contents
props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
props.put("fetch.min.bytes", "10");
props.put("fetch.wait.max.ms", "5000");
props.put("auto.offset.reset", "earliest");
return props;
}
/**
* Display Utility
*
* @param consumerRecordkey
* @param id
* @param changeDataRecordType
* @param recordOpTime
* @param recordServerOpTime
* @param field
* @param op
* @param changeNodeOpTime
* @param changeNodeServerOpTime
* @param valueType
* @param value
*/
public void display(String consumerRecordkey,
Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String field,
ChangeOp op,
Long changeNodeOpTime,
Long changeNodeServerOpTime,
Value.Type valueType,
ByteBuffer value) {
Connection mConnection = DriverManager.getConnection("ojai:mapr:");
Driver mDriver = mConnection.getDriver();
Document document = mDriver.newDocument();
document.set("consumerRecordkey", consumerRecordkey);
if (id != null)
document.set("id", id);
if (changeDataRecordType != null)
document.set("changeDataRecordType", changeDataRecordType.name());
document.set("recordOpTime", recordOpTime);
document.set("recordServerOpTime", recordServerOpTime);
if (field != null)
document.set("field", field);
document.set("op", op.name());
document.set("changeNodeOpTime", changeNodeOpTime);
document.set("changeNodeServerOpTime", changeNodeServerOpTime);
if (valueType != null)
document.set("valueType", valueType.name());
if (value != null)
document.set("value", new String(value.array()));
System.out.println("\t\n********* Propagated Change **************************\t\n");
System.out.println("\t\n" + document.asJsonString() + "\t\n");
System.out.println("\t\n******************************************************\t\n");
}
/**
* Parse change node contents via iterator
*
* @param consumerRecordkey
* @param changeDataRecord
*/
public void iteratorDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {
// field if operation was done on a field
String field = fieldChangePair.getKey().asJsonString();
// Actual change node object, which holds change values
ChangeNode changeNode = fieldChangePair.getValue();
// Change Op, based on op done can be NULL, PUT, DELETE, DELETE_EXACT
ChangeOp op = changeNode.getOp();
// change node op time
Long changeNodeOpTime = changeNode.getOpTimestamp();
Long changeNodeServerOpTime = changeNode.getServerTimestamp();
// the value type BINARY, if it is non delete operation
Value.Type valueType = changeNode.getType();
// value of the operation
ByteBuffer value = changeNode.getBinary();
// display the change contents
display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
}
}
/**
* Parse change node contents via reader
*
* @param consumerRecordkey
* @param changeDataRecord
*/
public void readerDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
ChangeEvent changeEvent;
// get reader from the event
ChangeDataReader changeDataReader = changeDataRecord.getReader();
while ((changeEvent = changeDataReader.next()) != null) {
// parse through change events
switch (changeEvent) {
case NODE:
System.out.println("node event get the value type");
Value.Type valueType = changeDataReader.getType();
String field = changeDataReader.getFieldName();
Long serverTimestamp = changeDataReader.getServerTimestamp();
Long opTimestamp = changeDataReader.getOpTimestamp();
ChangeOp op = changeDataReader.getOp();
ByteBuffer value = changeDataReader.getBinary();
display(consumerRecordkey, id, changeDataRecordType,
recordOpTime, recordServerOpTime, field, op, opTimestamp,
serverTimestamp, valueType, value);
break;
}
break;
}
}
/**
* Consume from changelog topics
*
* @param pollTimeout
* @param topics
*/
public void consume(long pollTimeout, String topics, boolean method) {
System.out.println("consume...");
// initialize consumer
KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
(getBasicListnerProperties());
// subscribe to /stream:topic
List<String> topicList = new ArrayList<String>();
topicList.add(topics);
consumer.subscribe(topicList);
consumer.seekToBeginning();
// Get consumer records
ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);
// iterate over consumer records
for (ConsumerRecord<String, ChangeDataRecord> consumerRecord : consumerRecords) {
String consumerRecordkey = consumerRecord.key().trim();
ChangeDataRecord changeDataRecord = consumerRecord.value();
// record key for the change
Value id = changeDataRecord.getId();
// record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();
// record level op-time & server op-time
Long recordOpTime = changeDataRecord.getOpTimestamp();
Long recordServerOpTime = changeDataRecord.getServerTimestamp();
if (method) {
// Method 1 - via iterator interface
iteratorDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
} else {
// Method 2 - via reader interface
readerDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
}
}
consumer.close();
}
/**
* Driver
*
* @param args
*/
public static void main(String[] args) {
Long pollTimeout = Long.parseLong(args[0]);
String topic = args[1];
boolean method = Boolean.parseBoolean(args[2]);
CDCBinaryExample cdcBinaryExample = new CDCBinaryExample();
cdcBinaryExample.consume(pollTimeout, topic, method);
}
}