In this article, we’re going to discuss how partitions are assigned to a consumer and what happens when a consumer dies. We shall discuss how we can distribute the partitions among available consumers for better usability and also which strategy will give the better throughput and availability. Once we understand all the assignment strategies, we can decide which one to set for our use case.
When a consumer dies, all of its partitions get assigned to other available consumers and that rebalance of partitions impacts the throughput, availability, and latency of the application. Using the right partition assignment setting for our use case can significantly minimize the impact.
When a new consumer group is created a consumer group coordinator is elected. The coordinator lives on a broker assigned by a hash() of the consumer group name. This coordinator is responsible for various tasks, including partition assignment, offset commits, and deciding when a consumer should be marked “failed”. Consumers in a consumer group share ownership of the partitions in the topics they subscribe to. When we add a new consumer to the group, it starts consuming messages from partitions previously consumed by another consumer. The same thing happens when a consumer shuts down or crashes; it leaves the group, and the partitions it used to consume will be consumed by one of the remaining consumers.
Reassignment of partitions to consumers also happens when the topics the consumer group is consuming are modified, e.g. new partitions are added to the topic.
PartitionAssignor is the class that decides which partitions will be assigned to which consumer. When creating a new Kafka consumer, we can configure the strategy that will be used to assign the partitions amongst the consumers. We can set it using the configuration partition.assignment.strategy.
All the Kafka consumers which belong to the same consumer group must have a single assignment strategy. If a consumer attempts to join a consumer group that has a different assignment strategy, it will end up getting an InconsistentGroupProtocolException.
By default, Kafka has the following assignment strategies.
The aim of the RangeAssignor strategy is to have a single consumer instance co-host matching partitions from 1 or more topics. This is useful to join records from topics that have the same number of partitions. This strategy will put all the consumers in an order of member_id assigned by the coordinator and after that, the partitions are assigned starting from the first consumer.
We can see that Partitions 0 and 1 are assigned to the same Consumer but Consumer-3 is sitting idle. So if you’re planning to consume from multiple topics and you don’t have co-localized partitions, you should not use the default partitions assignment strategy.
With RoundRobinAssignor, the partitions are evenly distributed among all the consumers in the Consumer group. As with RangeAssignor, the partitions and consumers are put in lexicographic order before the assignment. The RoundRobin Assignor gives the advantage of effective usage of all the available Consumers and better performance.
Even if RoundRobin provides the advantage of maximizing the number of consumers used, it has one major drawback: it does not attempt to reduce partition movements when the number of consumers changes (i.e. when a rebalance occurs). Let’s try to understand this through the following diagram where Consumer-2 is disconnected:
Here we can see that once a Consumer gets disconnected, the assignment of Partitions A-1, B-0, and B-1 change. So out of 4, 3 Partitions got re-assigned to available Consumers. This unnecessary partition movement can impact Consumer performance.
The StickyAssignor overcomes the challenges of RoundRobin Assignor (unnecessary partition movement). It minimizes the Partitions movement along with having an assignment that is as balanced as possible among available Consumers.
So let us try to understand the situation when the Consumer-2 instance is marked down or failed in the previous example with StickyAssignor:
In cases where consumers in the same group subscribe to different topics, the assignment achieved by StickyAssignor is more balanced than that of the RoundRobinAssignor. This normally improves throughput, and latency as well as reducing rebalance time.
To understand CooperativeStickyAssignor, we need to first understand how cooperative rebalances work. But before that, we need to understand what a rebalance is. Moving partition ownership from one consumer to another, in case of:
Addition of new Consumer to the Consumer group,
Removal of Consumer from Consumer group and
If new Partitions get added to the existing Topic
is called a rebalance. We have two kinds of rebalances:
During an Eager rebalance, all consumers stop consuming the data. The current assignment of partitions expires and Consumers need to rejoin the group, then the new partition assignment happens. So during this time, the consumer group remains idle and does not consume any messages or allow offset commits. Eager rebalances have two distinct phases: first, all consumers give up their assigned partitions, and second, they rejoin the group, get new partition assignments, and resume consuming messages and processing.
Cooperative rebalances (also called incremental rebalances) do not stop all the consumer instances from consuming from the partitions; they only affect the small set of partitions that are moved from one consumer to another. So only the specific partitions which need to be reassigned will not be consumed from and the rest of the partitions will work as usual.
The rebalance is done in two or more phases. In the first phase, the leader informs all the consumers that they will lose ownership of some partitions, then the consumer will give up their ownership of these partitions. In the next phase, the coordinator will assign these orphaned partitions to their new Consumers. The Cooperative rebalance may take a few iterations until a stable partition assignment is achieved, but it avoids the unavailability that occurs with the eager approach. This is very important in the significantly large consumer groups where rebalance takes more time.
As we can see in the above diagram, when Consumer-C joins the group, one of the partitions from Consumer-A gets revoked for better usage of all the available Consumers. This was the first phase of the cooperative rebalance; the other partitions (P0 and P1) are still consumable. In the next rebalance, P2 gets assigned to the new Consumer and now it is also back in business. So we have seen that a cooperative rebalance is very effective and tries to minimize the effect of rebalancing.
Now finally coming back to CooperativeStickyAssignor: it is identical to the Sticky Assignor, but supports cooperative rebalances in which consumers can continue consuming from the partitions that are not reassigned.
Creating a Custom Partitioner Strategy
The partition.assignment.strategy configuration allows you to choose a partition assignment strategy out of the above three strategies. For most cases, one out of the available three options would be sufficient but for more advanced scenarios, one can implement their own custom assignment strategy.
To implement the custom assignor, one can use the PartitionAssignor interface. It has the following 4 methods:
topics); 2 3Map assign(Cluster metadata, Map subscriptions); 4 5void onAssignment(Assignment assignment); 6 7String name();
The subscription() method is invoked on all consumers, which are responsible for creating the Subscription that will be sent to the broker coordinator. A Subscription contains the set of topics that the consumer subscribes to.
The Partition assignment will be done using the assign() method.
All the consumers will receive their assignment from the leader and the onAssignment() method will be invoked.
Finally, a PartitionAssignor must be assigned to a unique name returned by the method name(). Here you can provide any custom name to the assignor.
In this blog, we have discussed all the available Partition assignment strategies in Kafka. We can choose any of the strategies based on the use case and we can also embed our own protocol to customize how partitions are assigned to the group members for complex and different use cases.
The partition.assignment.strategy configuration allows you to choose a strategy. The default is org.apache.kafka.clients.consumer.RangeAssignor, which implements the Range strategy described above. You can replace it with org.apache.kafka.clients.consumer.RoundRobinAssignor, org.apache.kafka. clients.consumer.StickyAssignor, or org.apache.kafka.clients.consumer. CooperativeStickyAssignor. A more advanced option is to implement your own assignment strategy, in which case partition.assignment.strategy should point to the name of the custom class.
We'd love to hear from you!
If you want to accelerate your project delivery, fortify your security, and federate your Kafka ecosystem, you know where to find us.