Open Format

Describes the CDC open format.

Open Format Mapping

The following shows the mapping between the HPE Ezmeral Data Fabric CDC data types and the JSON open format data types.

{
        "map": {
        "null": null,
        "boolean" : true,
        "string": "eureka",
        "byte" : {"$numberByte": 127},
        "short": {"$numberShort": 32767},
        "int": {"$numberInt": 2147483647},
        "long": {"$numberLong":9223372036854775807},
        "float" : {"$numberFloat":3.4028235E38},
        "double" : 1.7976931348623157e308,
        "decimal": {"$decimal": "12345678901234567890189012345678901.23456789"},
        "date": {"$dateDay": "yyyy-mm-dd"},
        "time" : {"$time" : "HH:mm:ss[.sss]"},
        "timestamp" : {"$date" : "yyyy-MM-ddTHH:mm:ss.SSSXXX"},
        "interval" : {"$interval" : number_of_millisecods},
        "binary" : {"$binary" : "base64_encoded_binary_value"},
        "array" : [42, "open sesame", 3.14, {"$dateDay": "2015-01-21"}]
        }
}
      

JSON Record Format

When the consumer retrieves the changed data record (by the key-value pair), the record is returned as a string in JSON format (a readable open format). The information about the mutation is returned as an array where each array element is one (1) change.
NOTE
If you use the default print, the string returns float values of up to six (6) digits of precision and double values of up to fifteen (15) digits. If the data exceeds this default and you want the exact number returned, use the CDC API that returns a float or double value.

The following example changed data record shows two (2) mutations.

{
        "_id":"row1"
        "$opType":"$RECORD_UPDATE",
        "$opTime":1518654391801,
        
        "$mutations":[
        {"$fieldPath":"arrayB,
        "$fieldOp":"$SET",
        "$fieldValue":[{"$numberInt":100},false,"set a map"]
        }
        {"$fieldPath":"arrayC,
        "$fieldOp":"$SET",
        "$fieldValue":[{"$numberInt":200},false,"set a map"]
        }
        ]
}

Example

The following sample code initialized consumer properties for open format and consumes the changelog data from the topic.

/*
 * Initialize Basic Consumer Properties for Open Format
 * @return
 */

private Properties getOpenFormatListenerProperties() { 
   Properties props = new Properties(); 
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
   return props; }

/* 
 * Consume from changelog topic
 *
 */
public void startConsume(String topic) {
   KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (getOpenFormatListnerProperties());
   List<String> topicList = new ArrayList<>();
   topicList.add(topic);
   consumer.subscribe(topicList);

   ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
   Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
   while (iterator.hasNext())

      { ConsumerRecord<String, String> record = iterator.next(); String cdcResult = record.value(); }

}