Consumer Application for CDC JSON Data
This example consumes changed data records from HPE Ezmeral Data Fabric Database JSON tables.
Example of Consuming JSON Changed Data Records
In this example, the
following occurs:
For changed data records from HPE Ezmeral Data Fabric Database JSON table data, the following are unique: - Initialize the consumer properties using Apache Kafka and Data Fabric configuration parameters.
- Display the change data record properties.
- Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
- Retrieve the properties of individual change node (for example: data type, field name, field value, and so on) by using various methods of the ChangeDataReader interface.
- Display the change data record values by using the ChangeNode interface.
- Subscribe to the stream topic, consume the events, and determine record type.
- There are multiple property values that can be retrieved through the ChangeDataReader interface. For example, getDouble or getFloat.
- There are multiple values for single fields in documents that can be retrieved through
ChangeNode interface. See the code line:
Value value = changeNode.getValue();
package example.cdps;
import com.mapr.db.MapRDB;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.cdc.*;
import java.util.*;
public class CDPConsumer {
/**
* Initialize Basic Consumer Properties
* @return
*/
public Properties getBasicListnerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "mfs220.qa.lab:9211");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Use MapR CDP Specific Deserializer to parse the change contents
props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
props.put("fetch.min.bytes", "10");
props.put("fetch.wait.max.ms", "5000");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
return props;
}
/**
* Display Utility
* @param consumerRecordkey
* @param id
* @param changeDataRecordType
* @param recordOpTime
* @param recordServerOpTime
* @param field
* @param op
* @param changeNodeOpTime
* @param changeNodeServerOpTime
* @param valueType
* @param value
*/
public void display(String consumerRecordkey,
Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String field,
ChangeOp op,
Long changeNodeOpTime,
Long changeNodeServerOpTime,
Value.Type valueType,
Value value) {
Document document = MapRDB.newDocument();
document.set("consumerRecordkey", consumerRecordkey);
if(id != null)
document.set("id", id);
if(changeDataRecordType != null)
document.set("changeDataRecordType", changeDataRecordType.name());
document.set("recordOpTime", recordOpTime);
document.set("recordServerOpTime", recordServerOpTime);
if(field != null)
document.set("field", field);
document.set("op", op.name());
document.set("changeNodeOpTime", changeNodeOpTime);
document.set("changeNodeServerOpTime", changeNodeServerOpTime);
if(valueType != null)
document.set("valueType", valueType.name());
if(value != null)
document.set("value", value);
System.out.println("\t\n********* Propagated Change **************************\t\n");
System.out.println("\t\n" + document.asJsonString() + "\t\n");
System.out.println("\t\n******************************************************\t\n");
}
/**
* Parse change node contents via iterator
* @param consumerRecordkey
* @param changeDataRecord
*/
public void iteratorDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {
// field if operation was done one a field
String field = fieldChangePair.getKey().asJsonString();
// Actual change node object, which holds change values
ChangeNode changeNode = fieldChangePair.getValue();
// Change Op, based on op done can be NULL, SET, MERGE, DELETE, DELETE_EXACT
ChangeOp op = changeNode.getOp();
// change node op time
Long changeNodeOpTime = changeNode.getOpTimestamp();
Long changeNodeServerOpTime = changeNode.getServerTimestamp();
// the value type if it was non delete operation, such as insert replace etc
Value.Type valueType = changeNode.getType();
// value of the operation such as insert value or replace
Value value = changeNode.getValue();
// display the change contents
display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
}
}
/**
* Get Parsed Value
* @param changeDataReader
* @param field
* @param valueType
* @return
*/
public Value getValue(ChangeDataReader changeDataReader, String field, Value.Type valueType) {
Document valDoc = MapRDB.newDocument();
if(field == null) {
valDoc.setNull("null");
field = "null";
}
switch (valueType) {
case NULL:
valDoc.setNull(field);
break;
case BOOLEAN:
valDoc.set(field, changeDataReader.getBoolean());
break;
case STRING:
valDoc.set(field, changeDataReader.getString());
break;
case SHORT:
valDoc.set(field, changeDataReader.getShort());
break;
case BYTE:
valDoc.set(field, changeDataReader.getByte());
break;
case INT:
valDoc.set(field, changeDataReader.getInt());
break;
case LONG:
valDoc.set(field, changeDataReader.getLong());
break;
case FLOAT:
valDoc.set(field, changeDataReader.getFloat());
break;
case DOUBLE:
valDoc.set(field, changeDataReader.getDouble());
break;
case DECIMAL:
valDoc.set(field, changeDataReader.getDecimal());
break;
case DATE:
valDoc.set(field, changeDataReader.getDate());
break;
case TIME:
valDoc.set(field, changeDataReader.getTime());
break;
case TIMESTAMP:
valDoc.set(field, changeDataReader.getTimestamp());
break;
case INTERVAL:
valDoc.set(field, changeDataReader.getInterval());
break;
case BINARY:
valDoc.set(field, changeDataReader.getBinary());
break;
default:
break;
}
return valDoc.getValue(field);
}
/**
* Parse change node contents via reader
* @param consumerRecordkey
* @param changeDataRecord
*/
public void readerDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
System.out.println("Reader");
ChangeEvent changeEvent;
// get reader from the event
ChangeDataReader changeDataReader = changeDataRecord.getReader();
while ((changeEvent = changeDataReader.next()) != null) {
// parse through change events
switch (changeEvent) {
case NODE:
System.out.println("node event get the value type");
Value.Type valueType = changeDataReader.getType();
String field = changeDataReader.getFieldName();
Long serverTimestamp = changeDataReader.getServerTimestamp();
Long opTimestamp = changeDataReader.getOpTimestamp();
ChangeOp op = changeDataReader.getOp();
Value value = getValue(changeDataReader, field, valueType);
display(consumerRecordkey, id, changeDataRecordType,
recordOpTime, recordServerOpTime, field, op, opTimestamp,
serverTimestamp, valueType, value);
break;
}
break;
}
}
/**
* Consume from changelog topics
* @param pollTimeout
* @param topics
*/
public void consume(long pollTimeout, String topics, boolean method) {
System.out.println("consume...");
// initialize consumer
KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
(getBasicListnerProperties());
// subscribe to /stream:topic
List<String> topicList = new ArrayList<String>();
topicList.add(topics);
consumer.subscribe(topicList);
consumer.seekToBeginning();
// Get consumer records
ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);
// iterate over consumer records
for(ConsumerRecord<String, ChangeDataRecord> consumerRecord: consumerRecords) {
String consumerRecordkey = consumerRecord.key().trim();
ChangeDataRecord changeDataRecord = consumerRecord.value();
// record key for the change
Value id = changeDataRecord.getId();
// record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();
// record level op-time & server op-time
Long recordOpTime = changeDataRecord.getOpTimestamp();
Long recordServerOpTime = changeDataRecord.getServerTimestamp();
if(method) {
// Method 1 - via iterator interface
iteratorDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
} else {
// Method 2 - via reader interface
readerDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
}
}
consumer.close();
}
/**
* Driver
* @param args
*/
public static void main(String[] args) {
Long pollTimeout = Long.parseLong(args[0]);
String topic = args[1];
boolean method = Boolean.parseBoolean(args[2]);
CDPConsumer cdpConsumer = new CDPConsumer();
cdpConsumer.consume(pollTimeout, topic, method);
}
}