API for HPE Ezmeral Data Fabric Streams Python Client

HPE Ezmeral Data Fabric Streams Python Client is a binding for librdkafka and it supports the following APIs.

As of HPE Ezmeral Data Fabric 5.2.1, you can create python applications for HPE Ezmeral Data Fabric Streams using the HPE Ezmeral Data Fabric Streams Python client. The HPE Ezmeral Data Fabric Streams Python client is a binding for librdkafka and the HPE Ezmeral Data Fabric Streams C Client is a distribution of librdkafka that works with HPE Ezmeral Data Fabric Streams.

Table 1. Supported Apache Kafka librdkafka versions
Core release EEP Release Kafka librdkafka version
As of HPE Ezmeral Data Fabric 6.0.1 As of 5.0 0.11.3
As of HPE Ezmeral Data Fabric 5.2.1 through 6.0.0 As of 3.0 0.9.0

class mapr_streams_python.Consumer

A high-level Kafka Consumer.

Method Behavior
Consumer(**kwargs) Create new Consumer instance using provided configuration dictionary.
assign(partitions) Set consumer partition assignment to the provided list of TopicPartition and starts consuming.
Parameters(s):
  • partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming
unassign()

Unassign from all TopicPartitions that have been assigned with the .assign(*topic_partition_list) method.

NOTE
This method is applicable as of HPE Ezmeral Data Fabric Streams Python Client EEP 5.0 which is associated with librdkafka 0.11.3.
assignment()

Return a list of assignments for a consumer object.

NOTE
This method is applicable as of HPE Ezmeral Data Fabric Streams Python Client EEP 5.0 which is associated with librdkafka 0.11.3.
close() Close down and terminate the Kafka Consumer.
Actions(s):
  • Stops consuming
  • Commits offsets
  • Leave consumer group
commit([message=None][, offsets=None][, async=True])

Commit a message or a list of offsets.

Message and offsets are mutually exclusive, if neither is set the current partition assignment’s offsets are used instead.

Parameters(s):
  • message (confluent_kafka.Message) – Commit message’s offset+1.
  • offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
  • async (bool) – Asynchronous commit, return immediately.
committed(partitions[, timeout=None])

Retrieve committed offsets for the list of partitions.

Parameters(s):
  • partitions (list(TopicPartition)) - List of topic+partitions to query for stored offsets.
  • timeout (float) – Request timeout

Returns: List of topic+partitions with offset and possibly error set.

Return type: list(TopicPartition)

Raises: KafkaException

NOTE
As of HPE Ezmeral Data Fabric 6.0, the message offset in a partition starts from zero (0). If you are upgrading and do not enable the HPE Ezmeral Data Fabric Database/HPE Ezmeral Data Fabric Streams feature, mfs.feature.db.streams.v6.support, the message offset in a partition starts from one (1).
on_commit(err, partitions)

A callback for Consumer.commit() that triggers custom actions when a commit request completes.

Parameters(s):
  • err (KafkaError) – Commit error object, or None on success.
  • Partitions (list(TopicPartition)) – List of partitions with their committed offsets or per-partition errors
poll([timeout=None])

Consume messages, calls callbacks and returns events.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).

Parameter(s): timeout (float) – Maximum time to block waiting for message, event or callback

Returns: A Message object or None on timeout

Return type: Message or None

position(partitions[, timeout=None])

Retrieve current positions (offsets) for the list of partitions.

Parameter(s): partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1

Returns: List of topic+partitions with offset and possibly error set.

Return type: list(TopicPartition)

Raises: KafkaException

This function returns 0 when the messages have not yet been consumed from partitions. librdkafka returns -1001 instead.
subscribe(topics[, listener=None])

Set subscription to supplied list of topics This replaces a previous subscription.

Parameters:
  • topics (list(str)) – List of topics (strings) to subscribe to.

  • on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.

  • on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.

Raises: KafkaException
NOTE
You cannot use the rd_kafka_subscribe API to subscribe a consumer to topics when that consumer is already assigned to topics. If you call this API for an assigned consumer, error RD_KAFKA_RESP_ERR__CONFLICT is returned.
on_assign(consumer, partitions) Same as librdkafka.
unsubscribe() Same as librdkafka.
on_revoke(consumer, partitions) Parameter(s):
  • consumer (Consumer) – Consumer instance.

  • partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.

get_watermark_offsets(confluent_kafka.TopicPartition) Get WatermarkOffsets for a given Topic Partition.

Parameter(s): TopicPartition - Gets the watermark offset

NOTE
This method is applicable as of HPE Ezmeral Data Fabric Streams Python Client EEP 5.0 which is associated with librdkafka 0.11.3.

сlass mapr_streams_python.Producer

Asynchronous Kafka Producer.

Method Behavior
Producer(**kwargs)

Create new Producer instance using provided configuration dict.

len() This API returns a positive number to indicate that messages are waiting to be produced to a streams topic but the value does not indicate the actual number of messages. librdkafka returns the actual number of messages that are waiting to be sent to or acknowledged by the broker.

Return type: int

flush()

Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len()is zero.

poll([timeout])

Polls the producer for events and calls the corresponding callbacks (if registered).

Parameter(s):
  • timeout (float) – Maximum time to block waiting for events

Returns: Number of events processed (callbacks served).

Return type: int

produce(topic[, value][, key][, partition][, callback])

Produce message to topic. This is an asynchronous operation, an application may use the callback( alias on_delivery) argument to pass a function (or lambda) that will be called from poll() when the message has been successfully delivered or permanently fails delivery.

Parameters:
  • topic (str) – Topic to produce message to

  • value (str|bytes) – Message payload
  • key (str|bytes) – Message key
  • partition (int) – Partition to produce to, else uses the configured partitioner.
  • on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed deliver
Raises:
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)
  • KafkaException – for other errors, see exception code
NOTE
When this function is called with NULL payload, an invalid argument error is sent to the callback. librdkafka creates a message with NULL payload and key value instead.

class mapr_streams_python.Message

The Message object represents either a single consumed or produced message, or an event . An application must check with error() to see if the object is a proper message (error() returns None) or an error/event. This class is not user-instantiable.

Method Behavior
len()

Returns: Message value (payload) size in bytes.

Return type: int

error()

The message object is also used to propagate errors and events. Applications must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)

Return type: None or KafkaError

key()

Returns: message key or None if not available

Return type: str|bytes or None

offset()

Returns: message offset or None if not available

Return type: int or None

partition()

Returns: partition number or None if not available

Return type: int or None

topic()

Returns: topic name or None if not available

Return type: str or None

value()

Returns: message value (payload) or None if not available

Return type: str|bytes or None

timestamp() Returns: message timestamp
NOTE
This method is applicable as of HPE Ezmeral Data Fabric Streams Python Client EEP 5.0 which is associated with librdkafka 0.11.3.

class mapr_streams_python.TopicPartition

TopicPartition is a generic type to hold a single partition and various information about it. It is typically used to provide a list of topics or partitions for various operations, such as Consumer.assign().

Method Behavior
TopicPartition(topic[, partition][, offset])

Instantiate a TopicPartition object.

Parameter(s)
  • topic (string) – Topic name

  • partition (int) – Partition id

  • offset (int) – Initial partition offset

Return type: TopicPartition

error

Attribute that indicates an error (with KafkaError) unless None.

offset Attribute for offset.
partition Attribute for partition number.
topic Attribute for topic name.

class mapr_streams_python.KafkaError

Kafka error and event object.

The KafkaError class serves multiple purposes:
  • Propagation of errors
  • Propagation of events
  • Exceptions

This class is not user-instantiable.

Method Behavior
code()

Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.

Returns: error/event code

Return type: int

name()

Returns the enum name for error/event.

Returns: error/event enum name string

Return type: str

str()

Returns the human-readable error/event string.

Returns: error/event enum message string

Return type: str