Developing HPE Data Fabric Streams Python Applications

This topic includes basic information about how to develop a HPE Data Fabric Streams Python application and an example program that you can run.

Before you Begin

Confirm that your environment meets the following requirements:
  • HPE Data Fabric cluster version 5.2.1 or greater.
  • HPE Data Fabric core client (mapr-client) package. See Installing the Data Fabric Client (Non-FIPS) for more information.
  • HPE Data Fabric Streams C Client (mapr-librdkafka) is installed and configured on the node. See Configuring the HPE Data Fabric Streams C Client.
  • HPE Data Fabric Streams Python Client (mapr-streams-python) is installed on the node. See Installing HPE Data Fabric Streams Python Client.
  • Python installed on the node.
    • For a cluster running Release 7.9 or a lower version core, Python version 2.7.x and above, up to version 3.6.x.
    • For a cluster running Release 7.10 or a higher version core, Python version 3.7 up to version 3.12.

Create a HPE Data Fabric Streams Producer Application

In general, you want to create a producer that performs the following steps:
  1. Import the producer class.
  2. Define the producer and its configuration.
  3. Produce data.
  4. Wait for all messages to be sent to consumer.
As of EEP 5.0 HPE Data Fabric Streams Python Client: In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.
from confluent_kafka import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
     p.produce('mytopic', data.encode('utf-8'))
     p.flush()

Create a HPE Data Fabric Streams Consumer Application

In general, you want to create a consumer that performs the following steps:
  1. Import the consumer class.
  2. Define the consumer and its configuration.
  3. Consume data.
  4. Wait for all messages to be consumed.
As of EEP 5.0 HPE Data Fabric Streams Python Client: In following example code, the HPE Data Fabric Streams consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.
from confluent_kafka import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
  msg = c.poll(timeout=1.0)
  if msg is None: continue
  if not msg.error():
    print('Received message: %s' % msg.value().decode('utf-8'))
  elif msg.error().code() != KafkaError._PARTITION_EOF:
    print(msg.error())
    running = False
c.close()

Run the Example Applications

To run the sample producer and consumer applications:
  1. Create a stream named mystream.
  2. Create a file named producer.py.
  3. Add the producer example code into the producer.py file.
  4. Create a file named consumer.py.
  5. Add the consumer example code into the consumer.py file.
  6. Verify that you have completed the steps to configure the HPE Data Fabric Streams C client or complete the steps now. See Configuring the HPE Data Fabric Streams C Client.
    NOTE
    The HPE Data Fabric Streams Python Client is dependent on the HPE Data Fabric Streams C Client. Therefore, the HPE Data Fabric Streams C Client must be configured before you can run the application.
  7. Run producer.py from the command line to generate messages.
    $ python producer.py
  8. Run consumer.py from the command line:
    $ python consumer.py