Developing a HPE Ezmeral Data Fabric Streams C Application
This topic includes basic information about how to develop a HPE Ezmeral Data Fabric Streams C application. Sample applications are provided.
Before you Begin
Confirm that your environment meets the following requirements:
- The HPE Ezmeral Data Fabric cluster version is 5.2.1 or greater.
- HPE Ezmeral Data Fabric core client (mapr-client) packagei s installed on the node and it is configured to access the cluster. Or, it is a HPE Ezmeral Data Fabric cluster node. See Installing the Data Fabric Client (Non-FIPS) for more information.
- The HPE Ezmeral Data Fabric Streams C Client is installed and configured on the node. See Configuring the HPE Ezmeral Data Fabric Streams C Client.
- GNU Compiler Collection (GCC) is installed on the node.
Creating, Compiling and Running C Apps
The following sections describes how to create a producer and consumer in C, compile the source code, generate executables, and run the applications.
- Include the rdkafka.h header file (/opt/mapr/include/librdkafka/rdkafka.h)
- Use
rd_kafka_conf_new()
to create the producer configuration. - Use
rd_kafka_new ()
to create the producer handle. - Use
rd_kafka_topic_conf_new()
to create the topic configuration. - Use
rd_kafka_topic_new ()
to create a topic handle for the producer. - Use
rd_kafka_produce()
to produce messages. - Optionally, use
rd_kafka_poll()
to poll for callbacks. This is useful to see if there are messages that have yet to be sent to the server. - Use
rd_kafka_topic_destroy ()
to destroy the topic handle destroy - Use
rd_kafka_destroy ()
to destroy the producer handle.
For example, the following source code produces 5 messages to topic /MapR_Streams:MapR-Topic1:
/*
* This file contains the producer function.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rdkafka.h>
#include <errno.h>
/* msgDeliveryCB: Is the delivery callback.
* The delivery report callback will be called once for each message
* accepted by rd_kafka_produce() with err set to indicate
* the result of the produce request. An application must call rd_kafka_poll()
* at regular intervals to serve queued delivery report callbacks.
*/
static void msgDeliveryCB (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("FAILURE: Message not delivered to partition.\n");
printf("ERROR: %s", rd_kafka_err2str(rkmessage->err));
} else {
printf("Produced: %.*s\n",(int)rkmessage->len, (const char*)rkmessage->payload);
}
}
/*
* Method : int producer(int nummsgs_p, const char *fullTopicName)
* Description : This is a simple producer method. In this method the producer
* produces messages to a topic.
*/
int producer(int nummsgs_p, const char *fullTopicName) {
printf("************************ PRODUCER ************************\n");
rd_kafka_t *prodHndle;
rd_kafka_conf_t *prodCfg;
char errstr[1000];
int totalMsgs = nummsgs_p;
printf("Create producer configuration object\n");
/*
* rd_kafka_conf_new(): This API creates default rd_kafka_conf_t object to
* be passed at the time of producer object creation using rd_kafka_new call.
*/
prodCfg = rd_kafka_conf_new();
if (prodCfg == NULL) {
printf("Failed to create conf\n");
return (EXIT_FAILURE);
}
/* rd_kafka_conf_set_dr_msg_cb(): This API sets the producer callback
* 'msgDeliveryCB' in producer config 'prodCfg'
* The delivery report callback will be called once for each message
* accepted by rd_kafka_produce() with err set to indicate
* the result of the produce request. An application must call rd_kafka_poll()
* at regular intervals to serve queued delivery report callbacks.
*/
rd_kafka_conf_set_dr_msg_cb(prodCfg, msgDeliveryCB);
printf("Create Producer Kafka handle\n");
/*
* rd_kafka_new():Creates a new Kafka handle and starts its operation
* according to the specified type (RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER).
* prodCfg object passed here is freed by this function and must not be used
* or destroyed by the application subsequently. errstr must be a pointer to
* memory of at least size errstr_size where
* `rd_kafka_new()` may write a human readable error message in case the
* creation of a new handle fails. In which case the function returns NULL.
*/
prodHndle = rd_kafka_new(RD_KAFKA_PRODUCER, prodCfg, errstr, sizeof(errstr));
if (prodHndle == NULL) {
printf("Failed to create producer: %s\n", errstr);
return (EXIT_FAILURE);
}
/*
* Following code does following:
* 1. Create a topic handle for each producer-topic combination
* 2. Produce 'totalMsgs' # of messages using topic handle created in step 1
* 3. Wait for all messages to be produced and callback to be delivered.
* 4. Move on to next topic and repeat.
*/
int totalTopics = 1;
for (int nTopics = 0; nTopics < totalTopics ; nTopics++) {
printf("Create topic handle\n");
rd_kafka_topic_conf_t *prodTopicCfg;
/*
* rd_kafka_topic_conf_new(): This API Creates topic conf object
* required to create topic handle which then will be used for each
* producer-topic combination
*/
prodTopicCfg = rd_kafka_topic_conf_new();
if (prodTopicCfg == NULL) {
printf("Failed to create new topic conf\n");
return (EXIT_FAILURE);
}
rd_kafka_topic_t *prodTopicHndl;
/*
* rd_kafka_topic_new(): This API Creates topic handle for a given
* producer, topic name and topic config. Topic handles are refcounted
* internally and calling rd_kafka_topic_new()
* again with the same topic name will return the previous topic handle
* without updating the original handle's configuration.
* Applications must eventually call rd_kafka_topic_destroy() for each
* succesfull call to rd_kafka_topic_new() to clear up resources.
*/
prodTopicHndl = rd_kafka_topic_new(prodHndle, fullTopicName, prodTopicCfg);
if (prodTopicHndl == NULL) {
printf("Failed to create new topic handle\n");
return (EXIT_FAILURE);
}
prodTopicCfg = NULL; /* Now owned by topic */
const char* key ="Key";
printf("Send/Produce message to topic: %s\n", fullTopicName);
for (int i = 0; i < totalMsgs; i++) {
char payload[1000];
if (i == 0)
sprintf(payload, "%s", "Welcome to MapR Streams CAPI");
else
sprintf(payload, "MapR Streams CAPI Message Payload %d", i);
/*
* rd_kafka_produce(): This API produces a single message
* to the cluster. prodTopicHandle must be created using
* rd_kafka_topic_new() api. This is an asynch non-blocking API.
* RD_KAFKA_PARTITION_UA is used to indicate automatic
* partitioning, using topics partitioner or fixed partition
* can be provided. RD_KAFKA_MSG_F_COPY flag indicates that
* library copies the payload and application manages its own
* payload memory. If API fails to send, errno will be set
* accordingly and will be able to access librdkafka specific
* error using rd_kafka_last_error() api.
*/
if (rd_kafka_produce(prodTopicHndl,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
payload,
strlen(payload),
key,
strlen(key),
NULL) == -1) {
int errNum = errno;
printf("Failed to produce to topic : %s\n", rd_kafka_topic_name(prodTopicHndl));
printf("Error Number: %d ERROR NAME: %s\n"
,errNum, rd_kafka_err2str(rd_kafka_last_error()));
return (errNum);
}
}
printf("Wait for messages to be delivered\n");
/*
* rd_kafka_outq_len(): This API out queue contains messages waiting
* to be sent to, or acknowledged by, server.
* An application should wait for this queue to reach zero before
* terminating to make sure outstanding requests are fully processed.
*
* rd_kafka_poll(): This API polls the producer handle for events,
* which will cause application provided callbacks to be called.
* An application must call rd_kafka_poll() at regular intervals to
* serve queued delivery report callbacks. In this case
* 'msgDeliveryCB' will get called.
*/
while (rd_kafka_outq_len(prodHndle) > 0)
rd_kafka_poll(prodHndle, 100);
printf("\nDestroy topic handle\n");
/*
* Applications must eventually call rd_kafka_topic_destroy() for each
* succesfull call to rd_kafka_topic_new() to clear up resources.
*/
rd_kafka_topic_destroy(prodTopicHndl);
}
printf("Destroy producer handle\n");
/*
* rd_kafka_destroy(): This API destroys the producer handle created using
* rd_kafka_new call and frees resources.
*/
rd_kafka_destroy(prodHndle);
return(EXIT_SUCCESS);
}
/* MAIN */
int main(int argc, char *argv[]) {
/* Number of messages the producer will produce */
int nummsgs_p = 5;
/* This is pre created Stream with one topic and one partition*/
const char* fullTopicName = "/MapR_Streams:MapR-Topic1";
int ret_val;
/* Produce Messages */
ret_val = producer(nummsgs_p, fullTopicName);
if (EXIT_SUCCESS != ret_val) {
printf("\nFAIL: producer failed\n");
} else {
printf("\nPASS: %d messages produced and sent to topic partition %s \n", nummsgs_p, fullTopicName);
}
}
- Include the rdkafka.h header file (/opt/mapr/include/librdkafka/rdkafka.h).
- Use
rd_kafka_conf_new()
to create the consumer configuration. - Use
rd_kafka_conf_set()
to set the configuration parameters. For this API, you must set the "group.id." - Use
rd_kafka_new ()
to create the consumer handle. - Use
rd_kafka_subscribe()
orrd_kafka_assign()
to specify which topics to consume. - Use
rd_kafka_consumer_poll()
to poll for messages that are ready to be consumed. - Use
rd_kafka_consumer_close()
to perform auto commits and prepare to destroy the consumer handle. - Use
rd_kafka_destroy ()
to destroy the consumer handle.
For example, the following source code consumes 5 messages from topic /MapR_Streams:MapR-Topic1:
/*
* This file contains the consumer function.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <rdkafka.h>
#include <string.h>
/*
* Method : int consumer(int expected_nummsgs, const char *fullTopicName)
* Description : This is a simple consumer method. In this method the consumer
* consumes messages from a topic.
*/
int consumer(int expected_nummsgs, const char *fullTopicName) {
printf("********* CONSUMER START *********\n");
rd_kafka_t *consHndle;
rd_kafka_conf_t *consCfg;
rd_kafka_topic_conf_t *consTopicCfg;
char errstr[1000];
rd_kafka_resp_err_t errCode;
printf("Create new consumer configuration object\n");
/*
* rd_kafka_conf_new(): This API creates default rd_kafka_conf_t object to
* be passed at the time of consumer object creation using rd_kafka_new call.
*/
consCfg = rd_kafka_conf_new();
if(consCfg == NULL) {
printf("Failed to create consumer conf\n");
return(EXIT_FAILURE);
}
/*
* rd_kafka_conf_set(): This API is used to set config parameters in the
* rd_kafka_conf_t object. group.id Must be set for all the consumers.
* All changes to the consCfg must be done before creating consumer object.
*/
if(RD_KAFKA_CONF_OK != rd_kafka_conf_set(consCfg,
"group.id", "consumerGroup",
errstr, sizeof(errstr))) {
printf("rd_kafka_conf_set() failed with error: %s\n", errstr);
return (EXIT_FAILURE);
}
/*
* rd_kafka_topic_conf_new(): This API Creates topic conf object
* required to set the default topic configuration.
*/
printf("Set topic configurations\n");
consTopicCfg = rd_kafka_topic_conf_new();
/* rd_kafka_topic_conf_set(): This API sets the config property by name.
* consTopicCfg should have been previously set up with `rd_kafka_topic_conf_new()`
* property set in this call is 'auto.offset.reset', when set to
* earliest will return messages on rd_kafka_consumer_poll from beginning of
* time (for the very first time consumption) or from last commited offset
* for online consumer. If property is set to 'latest' it will return the
* messages produced after consumer has started(for first time consumer) or
* from the last committed offset for online consumer
*/
if (RD_KAFKA_CONF_OK != rd_kafka_topic_conf_set(consTopicCfg, "auto.offset.reset",
"earliest" ,errstr, sizeof(errstr))) {
printf("rd_kafka_topic_conf_set() failed with error: %s\n", errstr);
return (EXIT_FAILURE);
}
/*
* rd_kafka_conf_set_default_topic_conf(): This API sets the default topic
* configuration to use for automatically subscribed topics
* The topic config object is not usable after this call.
*/
rd_kafka_conf_set_default_topic_conf(consCfg, consTopicCfg);
printf("Create consumer Kafka handle\n");
/*
* rd_kafka_new():Creates a new Kafka handle and starts its operation
* according to the specified type (RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER).
* consCfg object passed here is freed by this function and must not be used
* or destroyed by the application subsequently. errstr must be a pointer to
* memory of at least size errstr_size where
* `rd_kafka_new()` may write a human readable error message in case the
* creation of a new handle fails. In which case the function returns NULL.
*/
consHndle = rd_kafka_new(RD_KAFKA_CONSUMER, consCfg, errstr, sizeof(errstr));
if(consHndle == NULL) {
printf("Failed to create consumer:%s", errstr);
return (EXIT_FAILURE);
}
/* rd_kafka_poll_set_consumer() is used to redirect the main queue which is
* serviced using rd_kafka_poll() to the rd_kafka_consumer_poll(). With one api
* 'rd_kafka_consumer_poll()' both callbacks and message are serviced.
* Once queue is forwarded using this API, it is not permitted to call
* rd_kafka_poll to service non message delivery callbacks.
*/
rd_kafka_poll_set_consumer(consHndle);
/* Topic partition list (tp_list) is supplied as an input to the consumer
* subscribe(using rd_kafka_subscribe()). The api rd_kafka_subscribe() expects
* that the partition argument to be set to RD_KAFKA_PARTITION_UA and internally
* all partitions are assigned to the consumer.
* Note: partition balancing/assignment is done if more consumers are part
* of the same consumer group.
*/
printf("Create topic partition list for topic: %s\n", fullTopicName);
rd_kafka_topic_partition_list_t *tp_list = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_t* tpObj = rd_kafka_topic_partition_list_add(tp_list,
fullTopicName, RD_KAFKA_PARTITION_UA);
if (NULL == tpObj) {
printf("Could not add the topic partition to the list.\n");
return (EXIT_FAILURE);
}
printf("Subscribe consumer to the topic:\n");
/*
* rd_kafka_subscribe(): This API subscribes given consumer to the topic list
* provided in tp_list, depending upon number of consumers in a consumer group
* partitions will be balanced and assigned to each consumer.
*/
errCode = rd_kafka_subscribe(consHndle, tp_list);
if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("Topic partition subscription failed. ERROR: %d\n", errCode);
return(errCode);
}
printf("Destroy topic partition list:\n");
/*
* rd_kafka_topic_partition_list_destroy(): This API is used to free all
* resources used by the list and the list itself.
*/
rd_kafka_topic_partition_list_destroy(tp_list);
printf("\nStart message consumption:\n");
int msg_count = 0;
while(1) {
/*
* rd_kafka_consumer_poll(): This API returns one message or callback at
* a time. An application should make sure to call consumer_poll() at regular
* intervals, even if no messages are expected, to serve any
* queued callbacks waiting to be called. When the application is finished
* with a message it must call rd_kafka_message_destroy() to destroy and
* message.
*/
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consHndle, 1000);
if (msg != NULL) {
if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
msg_count++;
printf("%d Consumed: %.*s\n", msg_count,(int) msg->len,
(const char*)msg->payload);
if (msg_count == expected_nummsgs){
rd_kafka_message_destroy(msg);
break;
}
}
rd_kafka_message_destroy(msg);
}
}
printf("\nCommit the offsets before closing the consumer\n");
/*
* Commit offsets on broker for the provided list of topic partitions.
* when input is NULL the current partition assignment will be used instead.
* If async is false this operation will block until the offset commit
* is done, returning the resulting success or error code.
* This call is made to be sure that offsets are committed before closing
* consumer.
*/
int retVal = rd_kafka_commit(consHndle, NULL, false/*async*/);
if(retVal != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("rd_kafka_commit() failed");
return(EXIT_FAILURE);
}
printf("\nClose and destroy consumer handle\n");
/*
* Consumer shutdown sequense:
* 1. rd_kafka_consumer_close(): This is blocking call. It makes sure to revoke
* assignments, commit offsets, leave consumer group.
* The application still needs to call rd_kafka_destroy() after
* this call finishes to clean up the underlying handle resources.
* 2. rd_kafka_destroy(): This API destroys the consumer handle created using
* rd_kafka_new call and frees resources
*/
rd_kafka_consumer_close(consHndle);
rd_kafka_destroy(consHndle);
return(EXIT_SUCCESS);
}
/* MAIN */
int main(int argc, char *argv[]) {
/* Number of expected messages for the consumer */
int expected_nummsgs = 5;
/* This is pre created Stream with one topic and one partition*/
const char* fullTopicName = "/MapR_Streams:MapR-Topic1";
int ret_val;
/* Consume Messages */
ret_val = consumer(expected_nummsgs, fullTopicName);
if (EXIT_SUCCESS != ret_val) {
printf("\nFAIL: consumer failed\n");
} else {
printf("\nPASS: %d messages consumed from topic %s\n", expected_nummsgs, fullTopicName);
}
}
The following steps compile the source code and generate executables in the same directory
as the Makefile. For example, in the librdkafka_example directory, the
consumer
and producer
executables are generated from the
producer.c and consumer.c source files.
- On your node, create a directory. For example: librdkafka_example.
- In your directory (librdkafka_example), create a producer application. For
example, if you are using the provided sample producer application:
- Create a file named producer.c.
- Copy the contents of the sample producer application into that file.
- In your directory (librdkafka_example), create a consumer application. For
example, if you are using the provided sample consumer application:
- Create a file named consumer.c.
- Copy the contents of the sample consumer application into that file.
- In your directory (librdkafka_example), create a file named Makefile with the
following content:
CC= g++ HEADERDIR=/opt/mapr/include/librdkafka/ CCFLAGS= -Wall -I$(HEADERDIR) -g -std=c99 export LD_LIBRARY_PATH=/opt/mapr/lib LIBDIR= /opt/mapr/lib/ %.o: %.c gcc $(CCFLAGS) -c $< consumer: consumer.o gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS) producer: producer.o gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS) all: consumer producer clean: /bin/rm -f *.o consumer producer
IMPORTANTFor MapR 6.0.0 and earlier, use the following Makefile:CC= g++ HEADERDIR=/opt/mapr/include/librdkafka/ CCFLAGS= -Wall -I$(HEADERDIR) -g -std=c99 #Edit JAVA_HOME to be appropriate for your environment JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/ export LD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/amd64/server LIBDIR= /opt/mapr/lib/ %.o: %.c gcc $(CCFLAGS) -c $< consumer: consumer.o gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS) producer: producer.o gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS) all: consumer producer clean: /bin/rm -f *.o consumer producer
- Complete the following edits to the Makefile: For Mac users, locate the following line of code:
Then, replace this line with the following line of code:export LD_LIBRARY_PATH=/opt/mapr/lib
export DYLD_LIBRARY_PATH=/opt/mapr/lib
IMPORTANTFor MapR 6.0.0 and earlier, the following steps apply:- For Mac users, locate the following line of code:
Then, replace this line with the following line of code:export LD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/amd64/server
export DYLD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/server
- Based on your environment, edit JAVA_HOME. This ensures that LD_LIBRARY_PATH or
DYLD_LIBRARY_PATH will include the full path to the directory containing the
libjvm library.NOTEYou can use
find / -name libjvm*
to determine the JAVA_HOME directory on your machine. However, note that the results of this command include the full path to the libjvm file not just the JAVA_HOME directory.For example, JAVA_HOME may be set to
Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/
on a Mac and JAVA_HOME may be set to/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/
on Linux.
- For Mac users, locate the following line of code:
- From your directory (librdkafka_example), run the following commands to compile
the source code:
make clean
make all
Once you have the application executables, complete the following steps to run the application:
- On a cluster node, use the maprcli to create a stream. For example, MapR_Streams.
maprcli stream create -path /MapR_Streams
NOTEAs long as autocreate is enabled for the stream when you runstream create
,the producer will create the topic. By default, autocreate is enabled. For more information, see stream create. - At the command line, set the library path to include /opt/mapr/lib and the path
to the directory that contains the libjvm library. For more information, see Configuring the HPE Ezmeral Data Fabric Streams C Client. NOTEYou must complete this step at the command line even though you already set the library path in the Makefile. If you do not complete the step, an error similar to the following displays when you run the application in the next step:
error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory
- From your directory (librdkafka_example), run the producer application from the
command line. For example, if the application is called
producer:
./producer
The following appears on the console assuming that the stream name is MapR_Streams:************************ PRODUCER ************************ Create producer configuration object Create Producer Kafka handle Create topic handle Send/Produce message to topic: /MapR_Streams:MapR-Topic1 Wait for messages to be delivered Produced: Welcome to MapR Streams CAPI Produced: MapR Streams CAPI Message Payload 1 Produced: MapR Streams CAPI Message Payload 2 Produced: MapR Streams CAPI Message Payload 3 Produced: MapR Streams CAPI Message Payload 4 Destroy topic handle Destroy producer handle PASS: 5 messages produced and sent to topic partition /MapR_Streams:MapR-Topic1
- From your directory (librdkafka_example), run the consumer application from the
command line. For example, if the application is called consumer:
./consumer
The following appears on the console assuming that the stream name is MapR_Streams:********* CONSUMER START ********* Create new consumer configuration object Set topic configurations Create consumer Kafka handle Create topic partition list for topic: /MapR_Streams:MapR-Topic1 Subscribe consumer to the topic: Destroy topic partition list: Start message consumption: 1 Consumed: Welcome to MapR-ES CAPI 2 Consumed: MapR Streams CAPI Message Payload 1 3 Consumed: MapR Streams CAPI Message Payload 2 4 Consumed: MapR Streams CAPI Message Payload 3 5 Consumed: MapR Streams CAPI Message Payload 4 Commit the offsets before closing the consumer Close and destroy consumer handle PASS: 5 messages consumed from topic /MapR_Streams:MapR-Topic1