Sample Kafka Python Producer and Consumer
This topic provides kafka-python
examples with SASL and SSL client
configurations for Apache Kafka Wire Protocol Service.
-
If you have not done so already, install
kafka-python
:pip3 install kafka-python $pip3 install kafka-python $pip3 list | grep kafka-python
-
Save the Sample Kafka Python Producer and Consumer code in a file with the following name:
- SASL:
saslPlaintextKafkaPythonClient.py
- SSL:
sslPlaintextKafkaPythonClient.py
- SASL:
-
Run the sample code with the following command:NOTEWhen running the SASL sample code, you must fill in the username and password. When running the SSL sample code, you must fill the
ssl_password
from the file/opt/mapr/conf/store-passwords.txt
.
For example:python3 saslPlaintextKafkaPythonClient.py <broker> <topicName>
python3 saslPlaintextKafkaPythonClient.py localhost:9092 topicTestKafkaPythonClient_SASL
For example:python3 sslPlaintextKafkaPythonClient.py <broker> <topicName>
python3 sslPlaintextKafkaPythonClient.py localhost:9092 topicTestKafkaPythonClient_SSL
Sample Kafka Python Producer and Consumer
from kafka import KafkaProducer
from kafka import KafkaConsumer
import sys
import time
#Specify 'broker' and 'topic' arguments. Example: 'python3 saslPlaintestKafkaPythonClient.py localhost:9092 topicTestKafkaPythonClient_SASL'
server = sys.argv[1]
print("BROKER: " + server)
topic = sys.argv[2]
print("TOPIC: " + topic)
#PRODUCER
print("\n**Starting Producer**")
producer=KafkaProducer(bootstrap_servers=[server],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username='<username>',
sasl_plain_password='<user-password>')
numMsgProduced = 0
for _ in range(100):
producer.send(topic, b'msg')
numMsgProduced += 1
producer.flush()
print("Messages produced: " + str(numMsgProduced))
time.sleep(2)
# CONSUMER
print("\n**Starting Consumer**")
consumer = KafkaConsumer(bootstrap_servers=[server],
auto_offset_reset='earliest',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username='<username>',
sasl_plain_password='<user-password>')
consumer.subscribe(topic)
numMsgConsumed = 0
for _ in range(10):
records = consumer.poll(timeout_ms=500)
for topic_data, consumer_records in records.items():
for consumer_record in consumer_records:
# print("Received message: " + str(consumer_record.value.decode('utf-8')))
numMsgConsumed += 1
print("Messages consumed: " + str(numMsgConsumed))
from kafka import KafkaProducer
from kafka import KafkaConsumer
import sys
import time
#Specify 'broker' and 'topic' arguments. Example 'python3 sslPlaintestKafkaPythonClientJSK.py localhost:9092 topicTestKafkaPythonClient_SSL'
server = sys.argv[1]
print("BROKER: " + server)
topic = sys.argv[2]
print("TOPIC: " + topic)
#PRODUCER
print("\n**Starting Producer**")
producer=KafkaProducer(bootstrap_servers=[server],
security_protocol='SSL',
ssl_check_hostname=False,
ssl_password='<>', #from /opt/mapr/conf/store-passwords.txt ssl.server.keystore.password
ssl_cafile='/opt/mapr/conf/ssl_truststore.pem',
ssl_certfile='/opt/mapr/conf/ssl_keystore-signed.pem',
ssl_keyfile='/opt/mapr/conf/ssl_keystore.pem')
numMsgProduced = 0
for _ in range(100):
producer.send(topic, b'msg')
numMsgProduced += 1
producer.flush()
print("Messages produced: " + str(numMsgProduced))
time.sleep(2)
# CONSUMER
print("\n**Starting Consumer**")
consumer = KafkaConsumer(bootstrap_servers=[server],
auto_offset_reset='earliest',
security_protocol='SSL',
ssl_check_hostname=False,
ssl_password='<>', #from /opt/mapr/conf/store-passwords.txt ssl.server.keystore.password
ssl_cafile='/opt/mapr/conf/ssl_truststore.pem',
ssl_certfile='/opt/mapr/conf/ssl_keystore-signed.pem',
ssl_keyfile='/opt/mapr/conf/ssl_keystore.pem')
consumer.subscribe(topic)
numMsgConsumed = 0
for _ in range(10):
records = consumer.poll(timeout_ms=500)
for topic_data, consumer_records in records.items():
for consumer_record in consumer_records:
# print("Received message: " + str(consumer_record.value.decode('utf-8')))
numMsgConsumed += 1
print("Messages consumed: " + str(numMsgConsumed))