Developing HPE Ezmeral Data Fabric Streams Python Applications
This topic includes basic information about how to develop a HPE Ezmeral Data Fabric Streams Python application and an example program that you can run.
Before you Begin
Confirm that your environment meets the following requirements:
- HPE Ezmeral Data Fabric cluster version 5.2.1 or greater.
- HPE Ezmeral Data Fabric core client (mapr-client) package. See Installing the Data Fabric Client (Non-FIPS) for more information.
- HPE Ezmeral Data Fabric Streams C Client (mapr-librdkafka) is installed and configured on the node. See Configuring the HPE Ezmeral Data Fabric Streams C Client.
- HPE Ezmeral Data Fabric Streams Python Client (mapr-streams-python) is installed on the node. See Installing HPE Ezmeral Data Fabric Streams Python Client.
- Python installed on the node (Python version 2.7.x and above, up to version 3.6.x).
Create a HPE Ezmeral Data Fabric Streams Producer Application
In general, you want to create a producer that performs the following steps:
- Import the producer class.
- Define the producer and its configuration.
- Produce data.
- Wait for all messages to be sent to consumer.
As of EEP 5.0 HPE Ezmeral Data Fabric Streams Python Client: In the following example code,
three messages are produced to a topic named
mytopic
in a stream named
my_stream
.
from confluent_kafka import Producer p = Producer({'streams.producer.default.stream': '/my_stream'}) some_data_source= ["msg1", "msg2", "msg3"] for data in some_data_source: p.produce('mytopic', data.encode('utf-8')) p.flush()
Create a HPE Ezmeral Data Fabric Streams Consumer Application
In general, you want to create a consumer that performs the following steps:
- Import the consumer class.
- Define the consumer and its configuration.
- Consume data.
- Wait for all messages to be consumed.
As of EEP 5.0 HPE Ezmeral Data Fabric Streams Python Client: In following example code, the HPE Ezmeral Data Fabric Streams consumer
is subscribed to
my_stream/mytopic
and it prints the content of each
message that it reads.
from confluent_kafka import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
msg = c.poll(timeout=1.0)
if msg is None: continue
if not msg.error():
print('Received message: %s' % msg.value().decode('utf-8'))
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
Run the Example Applications
To run the sample producer and consumer applications:
- Create a stream named
mystream
. - Create a file named producer.py.
- Add the producer example code into the producer.py file.
- Create a file named consumer.py.
- Add the consumer example code into the consumer.py file.
- Verify that you have completed the steps to configure the HPE Ezmeral Data Fabric Streams C client or
complete the steps now. See Configuring the HPE Ezmeral Data Fabric Streams C Client.
NOTEThe HPE Ezmeral Data Fabric Streams Python Client is dependent on the HPE Ezmeral Data Fabric Streams C Client. Therefore, the HPE Ezmeral Data Fabric Streams C Client must be configured before you can run the application.
- Run producer.py from the command line to generate
messages.
$ python producer.py
- Run consumer.py from the command line:
$ python consumer.py