HPE Data Fabric Streams Java Applications
This section contains information about developing client applications with Java, including information about the HPE Data Fabric Streams and Apache Kafka Java APIs, configuration parameters, and compiling and running producers and consumers.
Apache Kafka Support
| Core version | Apache Kafka API |
|---|---|
| 7.8 and later | 3.6.1 |
| 7.0 through 7.7 | 2.6.1 |
| 6.2 and later | 2.1 |
| 6.1 and later | 1.1 |
| 6.0.1 and later | 1.0 |
| 6.0.0 and earlier | 0.9.0 |
Partition Assignment in Release 7.8.0 and Later
Release 7.8.0 added a significant change to the Consumer behavior related to partition assignment flow. In release 7.8.0 and later, partition assignment happens synchronously, according to the Apache Kafka contract defined in ConsumerRebalanceListener.
The process of updating the consumer assignment from the client side and invoking all
related callbacks (in ConsumerRebalanceListener) happens within the
consumer.poll() call in the user thread. This means that even after you
call consumer.assign() / consumer.subscribe(), the new
assignment will not come up to your consumer (the result of the
consumer.assignment() call will not update) until you invoke
consumer.poll(). You might need to do this multiple times or configure a
longer timeout to complete the assignment in time. Also, the callback methods of the ConsumerRebalanceListener class are now guaranteed
to be called synchronously from within the poll call by the user thread.
By comparison, with the old behavior, the partition assignment happened asynchronously in a
background thread after you called consumer.assign() /
consumer.subscribe(). It was enough to sleep for a couple of seconds to
update your consumer.assignment() result and invoke the callback methods of
the ConsumerRebalanceListener class in a background
thread.
The new (synchronous) behavior is toggleable and can be disabled (switched to asynchronous
behavior) by setting streams.async.subscription.callback.enabled=true in
core-site.xml. In release 7.8.0 and later, the default value is
false, meaning that by default the new behavior is enabled.
consumer.assign() /
consumer.subscribe() until partition assignment happens (that is, ConsumerRebalanceListener callbacks are invoked and
the consumer.assignment() result is updated), do one of the following:- Disable the new behavior.
- Redesign your application so that it starts polling without the new assignment and uses ConsumerRebalanceListener#onPartitionsAssigned() as a way to notify about successful assignment.
Synchronous Consumer Partition Assignment
As of HPE Data Fabric 7.8,
synchronous consumer partition assignment is supported. For streams created with HPE Data Fabric 7.8 and later, this
feature is enabled by default. To disable this feature, set
streams.async.subscription.callback.enabled=true
in core-site.xml. By default, this value is set to
false.
With this feature enabled, consumer assignment updates and the invocation of related
callbacks (in ConsumerRebalanceListener) occur synchronously
within the consumer.poll() call in the user thread.
consumer.assignment()
does not update until you invoke consumer.poll().consumer.poll() multiple times, or with
a long enough timeout for assignment to be completed. - Disable synchronous consumer partition assignment with
streams.async.subscription.callback.enabled=trueincore-site.xml. - Enable the application to begin polling without the new user assignment. You can use ConsumerRebalanceListener#onPartitionsAssigned() to notify of successful consumer partition assignment.
Log Compaction
As of HPE Data Fabric 6.1, log compaction is supported. Log compaction can be enabled for streams created with HPE Data Fabric core 6.1 and later. In addition, clients older than HPE Data Fabric 6.1 are prevented from consuming from streams that have had log compaction enabled on them at least once in their lifetime.
- If a replica cluster has been upgraded and the stream data for a source cluster is compacted (that is, one or more messages have been deleted), then the source cluster replicates the compacted data to the replica cluster.
- If a replica cluster has not been upgraded, then the source cluster fails the replication and an error is generated that requests an replica cluster upgrade.
In the context of a scan by a client that is not upgraded, the (upgraded) server inspects the row header to check if it is serving a compacted row. If it is serving a compacted row, then the server fails the consumer request. This behavior applies both to a stream that is explicitly configured for compaction and a replica that has received a compacted row.
-force
option can be used. The -force option should only be used when ALL clients
have been upgraded to HPE Data Fabric 6.1.Idempotent Producer
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)The idempotent producer feature is supported by EEP HPE Data Fabric 6.0 clients and HPE Data Fabric 6.1.0 servers.
- You must upgrade all servers to v6.1.0 and enable all the v6.1.0 features, before you enable the idempotent producer.
- If you use a pre-HPE Data Fabric 6.1 client and a HPE Data Fabric 6.1 server, and if a group of messages are atomically persisted without a valid producer ID, the server treats the request as a non-idempotent producer.
- If you use a HPE Data Fabric 6.1 client and a pre-HPE Data Fabric 6.1 server, the idempotent producer is not supported. In
this case, the idempotent producer fails to produce to the stream and the following
exception is
thrown:
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Operation not permitted (1) null at com.mapr.streams.impl.producer.MarlinFuture.valueOrError(MarlinFuture.java:46) at com.mapr.streams.impl.producer.MarlinFuture.get(MarlinFuture.java:41) at com.mapr.streams.impl.producer.MarlinFuture.get(MarlinFuture.java:17) at com.mapr.qa.marlin.common.StandaloneProducer.main(StandaloneProducer.java:75) Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Operation not permitted (1) null
TimestampType Permissions
The following discussion describes the Access Control Expression (ACE) permissions that you need when using the timestamp type parameter. See Stream Security for general information about HPE Data Fabric Streams streams security.
A HPE Data Fabric Streams stream topic inherits the default timestamp type value from its stream. To override the stream's default value, set the timestamp type for the topic to a different value.
- Setting the value at the stream-level requires
adminpermpermissions. The stream-level timestamp type parameter isdefaulttimestamptype. See stream create and stream edit for more information on setting this parameter using themaprclicommand. - Setting the
timestamptypeat the topic-level requirestopicpermpermissions. The topic-level timestamp type parameter istimestamptype. See stream topic create and stream topic edit for more information on setting this parameter using themaprclicommand.
User Impersonation
As of HPE Data Fabric 6.0, user impersonation is supported for HPE Data Fabric Streams.
You can set up user impersonation programmatically. To do so, use the
UserGroupInformation.doAs() method in the Hadoop documentation. See Class UserGroupInformation for more information.
If you are setting up user impersonation in a secure cluster, you need to generate an impersonation ticket. See the Generating and Printing Service with Impersonation Ticket section in the maprlogin Command Examples topic.
- Ensure that user
mapruser1has read permissions on the ticket. - If you moved the ticket file to a different location, set the
$MAPR_TICKETFILE_LOCATIONenvironment variable with the appropriate path.
Backward Compatibility
As of HPE Data Fabric 6.0.1,
along with the support of Apache Kafka, the java.util.Collection interface
is being used. This impacts applications using certain APIs. See HPE Data Fabric Streams Java API Library for detailed information.
References
- HPE Data Fabric Streams Sample Programs on GitHub.