Class StickyAssignor
- java.lang.Object
-
- org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
-
- org.apache.kafka.clients.consumer.StickyAssignor
-
- All Implemented Interfaces:
org.apache.kafka.clients.consumer.internals.PartitionAssignor
public class StickyAssignor extends org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
This class is not supported.
The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:
- the numbers of topic partitions assigned to consumers differ by at most one; or
- each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.
Starting fresh it would work by distributing the partitions over consumers as evenly as possible. Even though this may sound similar to how round robin assignor works, the second example below shows that it is not. During a reassignment it would perform the reassignment in such a way that in the new assignment
- topic partitions are still distributed as evenly as possible, and
- topic partitions stay with their previously assigned consumers as much as possible.
Example 1. Suppose there are three consumers
C0
,C1
,C2
, four topicst0,
t1
,t2
,t3
, and each topic has 2 partitions, resulting in partitionst0p0
,t0p1
,t1p0
,t1p1
,t2p0
,t2p1
,t3p0
,t3p1
. Each consumer is subscribed to all three topics. The assignment with both sticky and round robin assignors will be:C0: [t0p0, t1p1, t3p0]
C1: [t0p1, t2p0, t3p1]
C2: [t1p0, t2p1]
C1
is removed and a reassignment is about to happen. The round robin assignor would produce:C0: [t0p0, t1p0, t2p0, t3p0]
C2: [t0p1, t1p1, t2p1, t3p1]
C0 [t0p0, t1p1, t3p0, t2p0]
C2 [t1p0, t2p1, t0p1, t3p1]
Example 2. There are three consumers
C0
,C1
,C2
, and three topicst0
,t1
,t2
, with 1, 2, and 3 partitions respectively. Therefore, the partitions aret0p0
,t1p0
,t1p1
,t2p0
,t2p1
,t2p2
.C0
is subscribed tot0
;C1
is subscribed tot0
,t1
; andC2
is subscribed tot0
,t1
,t2
. The round robin assignor would come up with the following assignment:C0 [t0p0]
C1 [t1p0]
C2 [t1p1, t2p0, t2p1, t2p2]
C0 [t0p0]
C1 [t1p0, t1p1]
C2 [t2p0, t2p1, t2p2]
C0
is removed, these two assignors would produce the following assignments. Round Robin (preserves 3 partition assignments):C1 [t0p0, t1p1]
C2 [t1p0, t2p0, t2p1, t2p2]
C1 [t1p0, t1p1, t0p0]
C2 [t2p0, t2p1, t2p2]
Impact on
The sticky assignment strategy can provide some optimization to those consumers that have some partition cleanup code in theirConsumerRebalanceListener
onPartitionsRevoked()
callback listeners. The cleanup code is placed in that callback listener because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. The listener code would look like this:class TheOldRebalanceListener implements ConsumerRebalanceListener { void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition: partitions) { commitOffsets(partition); cleanupState(partition); } } void onPartitionsAssigned(Collection<TopicPartition> partitions) { for (TopicPartition partition: partitions) { initializeState(partition); initializeOffset(partition); } } }
onPartitionsRevoked()
listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the cleanup after the rebalance only on the partitions they have lost (which is normally not a lot). The code snippet below clarifies this point:class TheNewRebalanceListener implements ConsumerRebalanceListener { Collection<TopicPartition> lastAssignment = Collections.emptyList(); void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition: partitions) commitOffsets(partition); } void onPartitionsAssigned(Collection<TopicPartition> assignment) { for (TopicPartition partition: difference(lastAssignment, assignment)) cleanupState(partition); for (TopicPartition partition: difference(assignment, lastAssignment)) initializeState(partition); for (TopicPartition partition: assignment) initializeOffset(partition); this.lastAssignment = assignment; } }
consumer.subscribe(topics, new TheNewRebalanceListener());
-
-
Constructor Summary
Constructors Constructor Description StickyAssignor()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description java.util.Map<java.lang.String,java.util.List<TopicPartition>>
assign(java.util.Map<java.lang.String,java.lang.Integer> partitionsPerTopic, java.util.Map<java.lang.String,org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription> subscriptions)
java.lang.String
name()
void
onAssignment(org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment assignment)
org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
subscription(java.util.Set<java.lang.String> topics)
-
-
-
Method Detail
-
assign
public java.util.Map<java.lang.String,java.util.List<TopicPartition>> assign(java.util.Map<java.lang.String,java.lang.Integer> partitionsPerTopic, java.util.Map<java.lang.String,org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription> subscriptions)
- Specified by:
assign
in classorg.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
-
onAssignment
public void onAssignment(org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment assignment)
- Specified by:
onAssignment
in interfaceorg.apache.kafka.clients.consumer.internals.PartitionAssignor
- Overrides:
onAssignment
in classorg.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
-
subscription
public org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription subscription(java.util.Set<java.lang.String> topics)
- Specified by:
subscription
in interfaceorg.apache.kafka.clients.consumer.internals.PartitionAssignor
- Overrides:
subscription
in classorg.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
-
name
public java.lang.String name()
-
-