In the previous article, we talked in detail about the underlying principle design ideas and
details of Kafka Producer, and in this article we mainly talked about the design ideas of Kafka Consumer, the internal underlying principle of consumers.
In Kafka, we Calling the party consuming the message the consumer is one of the core components of Kafka. Its main function is to consume and process the messages produced by Producer to complete the consumption task. So how are the messages generated by these producers consumed by consumers? What kind of consumption mode is consumption based on, what are the partition allocation strategies, how are consumer groups and rebalancing mechanisms handled, how offsets are submitted and stored, how are consumption progress monitored, and how to ensure that consumption processing is completed? Let’s explain each of them next.
We know that message queues generally have two implementations, (1) Push (push mode) (2) Pull (pull mode), so what kind of way does Kafka Consumer use for consumption? In fact, Kafka Consumer adopts the pull mode of actively pulling broker data for consumption. These two methods have their own advantages and disadvantages, let’s analyze them:
1) Why not use the Push mode? If you choose the push mode, the biggest disadvantage is that the broker does not know the consumption speed of the consumer, and the push rate is controlled by the broker, which is easy to cause the accumulation of messages, if the task operation performed in the consumer is more time-consuming, then the consumer will be very slow, and the serious situation may cause the system to crash.
2) Why use Pull mode? If you choose Pull mode, the Consumer can pull data according to its own situation and state, and can also perform delayed processing. But thepmutll pattern also has shortcomings, and how does Kafka solve this problem? If there is no message from Kafka Broker, then every time the Consumer pulls empty data, it may always loop back empty data. In response to this problem, the Consumer consumes data every time it calls Poll(), with a timeout parameter, and when empty data is returned, it will block in Long Polling and wait for timeout Consume again until the data arrives.
After talking about consumer consumption methods, advantages and disadvantages, and how Kafka weighs and solves the shortcomings, let’s talk about what consumer initialization has done
Kafka consumer initialization code:
As you can see from the code,
there are 4 steps to initialize Consumer
construct a Propertity object for consumer-related configuration;
Create an object for KafkaConsumer Consumer;
Subscribe to the corresponding list of topics;
Call Consumer’s poll() method to pull the subscribed message
Kafka consumer consumption flow diagram is as follows ：
Consumer Group mechanism
After talking about the initialization process of Consumer, let’s talk about the Consumer Group mechanism Why did Kafka design Consumer Group, only Consumer can’t? We know that Kafka is a high-throughput, low-latency, high-concurrency, and highly scalable message queuing product, so if a topic has millions to tens of millions of data, relying only on the consumer process consumption, the consumption speed can be imagined, so a more scalable mechanism is needed to ensure the consumption progress, and this is when the Consumer Group came into being Consumer Group is a scalable and fault-tolerant consumer mechanism provided by Kafka.
Kafka Consumer Group features the following:
each consumer Each
Consumer Group has a public and unique Group ID
When the Consumer Group consumes a topic, each partition of the topic can only be assigned to a certain consumer in the group, as long as it is consumed once by any consumer, then this data can be considered to be successfully consumed by the current consumer group
Partition allocation policy mechanism
We know that there are multiple consumers in a consumer group, and a topic also has multiple partitions, so it will inevitably involve the allocation of partitions: the problem of determining which partition is consumed by which consumer.
The Kafka client provides 3 partition allocation strategies: RangeAssignor, RoundRobinAssignor, and StickyAssignor, the first two of which are relatively simple The StickyAssignor distribution scheme is a bit more complex.
RangeAssignor is Kafka’s default partition allocation algorithm, which is allocated according to the dimension of the topic, for each topic, first sort the partition by partition ID, and then subscribe to the consumer group of the topic The consumers are then sorted and then the partitions are assigned to consumers as evenly as possible by range segments. This may cause the Consumer process that allocates partitions first to be overburdened (the number of partitions is not divisible by the number of consumers).
The partition allocation scenario analysis is shown in the following figure (multiple consumers under the same consumer group):
Conclusion: The obvious problem with this distribution method is that as the number of topics subscribed to by consumers increases, the imbalance will become more and more serious.
RoundRobinAssignor’s partition allocation strategy is to sort the partitions of all topics subscribed to in the consumer group and allocate them one by one in order as evenly as possible 。 If each Consumer subscription in the Consumer Group subscribes to the same topic, the distribution result is balanced. If the subscription topics are different, then the distribution result is not guaranteed to be “as balanced as possible”, because some consumers may not participate in the allocation of some topics.
The partition allocation scenario analysis is shown in the following figure
1) When the topic of each consumer subscription in the group is the same:
2) When the topic of each subscription in the group is different, this may cause the skew of partitioned subscriptions:
StickyAssignor partition allocation algorithm is the most complex allocation strategy provided by Kafka Java client, which can be set through the partition.assignment.strategy parameter, introduced from version 0.11, the purpose is to perform new allocation, try to make as few adjustments as possible in the previous allocation result, which mainly achieves the following 2 goals:
1) The distribution of Topic Partition should be as balanced as possible.
2) When Rebalance occurs, try to be consistent with the previous allocation result.
Note: When two goals conflict, give priority to the first goal, which can make the distribution more even, where the first goal is tried to complete as much as possible by all 3 allocation strategies, and the second goal is the essence of the algorithm.
Let’s talk about the difference between RoundRobinAssignor and StickyAssignor as an example.
The partition allocation scenario analysis is shown in the following figure:
1) The topic of each consumer subscription in the group is the same RoundRobinAssignor is assigned the same as StickyAssignor:
When the above situation occurs in a Rebalance situation, the distribution may be different, if C1 fails offline
Conclusion: As can be seen from the results after Rebalance above, although both allocation strategies are evenly distributed in the end, RoundRoubinAssignor It’s completely redistributed over and over again, and StickyAssignor It is to achieve a uniform state on the basis of the original.
2) When the topic of each consumer subscription in the group is different:
When the above situation occurs in a Rebalance situation, the distribution may be different, if C1 fails offline: RoundRobinAssignor:
As you can see from the above results, RoundRoubin’s allocation strategy caused a severe allocation skew after Rebalance. Therefore, if you want to reduce the overhead caused by reallocation in a production environment, you can choose StickySignor’s partition allocation strategy.
After talking about consumer groups and partition allocation strategies above, let’s talk about the Rebalance mechanism in the consumer group, for the consumer group, there may be consumers joining or exiting at any time, then the change of the consumer list will definitely cause the reallocation of partitions. We call this allocation process Consumer Rebalance, but this allocation process requires the help of the coordinator component on the broker side to complete the partition reallocation of the entire consumer group with the help of the coordinator It is also triggered by listening to ZooKeeper’s /admin/reassign_partitions node.
Rebalance triggers and notifications
There are three trigger conditions for Rebalance:
when the number of consumer group members changes ( Active joining or active leaving group, failure offline, etc.)
When the number of subscribed topics
and the number of partitions subscribed to a topic changes,
how does Rebalance notify other consumer processes?
Rebalance’s notification mechanism relies on the heartbeat thread on the consumer side, which periodically sends heartbeat requests to the coordinator on the broker side, and when the coordinator decides to enable Rebalance, it will encapsulate the “REBALANCE_IN_PROGRESS”
The response to the
heartbeat request is sent to the consumer, and when the consumer finds that the heartbeat response contains “REBALANCE_IN_PROGRESS”, it knows that Rebalance begins.
In fact, Rebalance is essentially a set of protocols. The Consumer Group works with the Coordinator to use it to complete the Consumer Group’s Rebalance. Let’s take a look at what these 5 protocols are and what functions they accomplish:
- Heartbeat request:
Consumer needs to be given to the coordinator on a regular basis Send a heartbeat to prove you’re alive.
LeaveGroup Request: Proactively tell the Coordinator to leave
SyncGroup Request: Group Leader Consumer Tell all members of the group the allocation plan JoinGroup Request
: The member requests to join the group
DescribeGroup Request: Displays all the information of the group, including member information, agreement name, assignment scheme, subscription information, etc. Usually the request is for the administrator.
The Coordinator mainly uses the first four requests when rebalanced
Consumer Group state machine
Once the rebalance occurs, it will definitely involve the state flow of the consumer group, and Kafka designed a complete state machine mechanism for us to help the broker Coordinator completes the entire rebalancing process. Understanding the entire state flow process can help us gain insight into the design principles of Consumer Group.
The five states are defined as follows:
The Empty status indicates that there are no members in the current group, but there may be displacement data submitted by the Consumer Group that has not expired, and this status can only respond to JoinGroup requests.
The dead state indicates the state of no more members in the group, the metadata in the group has been removed by the broker coordinator, and this state is a Response:UNKNOWN_MEMBER_ID in response to various requests.
The PreparingRebalance status indicates that you are ready to start a new Rebalance and wait for all members of the group to rejoin the group.
The CompletingRebalance status indicates that all members of the group have joined successfully and are waiting for the allocation plan, which was called “AwaitingSync” in the old version.
The Stable status indicates that Rebalance has been completed and consumers in the group can start spending.
The flow diagram of the five states is as follows:
Rebalance Process Analysis
Next, let’s look at the process of Rebalance, which can be seen through the above 5 states, Rebalance There are two main steps: joining the group (corresponding to the JoinGroup request) and waiting for the Leader Consumer allocation scheme (SyncGroup request).
1) JoinGroup request: All members of the group submit to the coordinator Send a JoinGroup request, request to join the group, and report the topic you subscribe to, so that the Coordinator can collect the JoinGroup request and subscription topic information of all members, the Coordinator will choose from these members to serve as the leader of the consumer group (under normal circumstances, the first consumer to send a request will become the leader), the leader mentioned here refers to a specific consumer, its task is to collect the subscription topic information of all members, and then formulate a specific consumption partition allocation plan 。 After selecting the Leader, Coordinator will encapsulate the subscription topic information of the Consumer Group into the Response of the JoinGroup request, and then send it to the Leader, and then the Leader will make a unified allocation plan and enter the next step, as shown below:
2) SyncGroup request: The leader starts to allocate consumption plans, that is, which consumer is responsible for consuming which partitions of which topics 。 Once the allocation is completed, the leader will encapsulate this allocation scheme into the SyncGroup request and send it to the coordinator, and other members will also send the SyncGroup request, but the content is empty, and after the coordinator receives the allocation plan, the scheme will be encapsulated into the response of the SyncGroup and sent to the members of the group, so that each of them knows which partitions should be consumed, as shown in the following figure
Rebalance scenario analysis
Just talked in detail about Rebalance’s state flow and process analysis, next we focus on analyzing several scenarios through the timing diagram to deepen our understanding of Rebalance.
Scenario 1: A new member (C1) joins the group
Scenario 2: A member (C2) actively leaves the group
Scenario 3: Member (C2) is kicked out of the group
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/FrBePKkiazpoxy9PicA1zF2ufQsqdhsC5ibcr2QoTmW98eXJeUS6iaOgVC319FaaicPQml4bBMZehWFSmLUJOERMaLw/640?wx_fmt=png”>
Scenario 4: Members (C2) Submit displacement data
Automatic submission means that Kafka Consumer silently submits the displacement for us in the background, and the user does not need to care about this matter. To enable autocommit displacement, when initializing KafkaConsumer, set the parameter enable.auto.commit = true (the default is true), and another parameter is required to cooperate after opening auto.commit.interval.ms, this parameter means that Kafka Consumer automatically submits displacements every X seconds, which defaults to 5 seconds.
Auto-commit looks great, but will auto-commit be lost to consume data? When enable.auto.commit = true is set, Kafka guarantees that when the Poll() method is called, the displacement of the previous batch of messages is submitted and the next batch of messages is processed, so it guarantees that there is no loss of consumption. But auto-commit displacement also has a design flaw, which is that it can be duplicated. That is, when the rebalance occurs between the auto-commit interval, when the offset has not been committed, after the rebalance is completed, all consumers need to re-consume the message before the rebalance occurred.
The counterpart to automatic commit is manual commit. The way to turn on manual commit displacement is to set the parameter enable.auto.commit = false when initializing KafkaConsumer, but only set it to false is not enough, it just tells Kafka Consumer that there is no need to automatically submit displacement, you also need to call the corresponding Consumer API after processing the message Manually commit offset, for manual commit offset, it is divided into synchronous commit and asynchronous commit.
1) Synchronous commit API:
KafkaConsumer#commitSync(), The method submits the latest offset value returned by the KafkaConsumer#poll() method, which is a synchronous operation that blocks until the offset is successfully committed, and throws an exception if an exception occurs during the submission. Here we know that the timing of calling the commitSync() method is to commit after processing all the messages returned by the Poll() method, and if the commit is committed too early, there will be consumption data loss.
2) Asynchronous submission API:
KafkaConsumer#commitAsync(), This method is committed asynchronously, and after calling commitAsync(), it returns immediately and does not block, so it does not affect the consumer’s TPS. In addition, Kafka provides callback for it, which is convenient for us to implement post-commit logic, such as logging or exception handling. Since it is an asynchronous operation, if something goes wrong, it will not be retried, and the retry shift value may not be the latest value, so retrying is meaningless.
3) Mixed commit mode:
From the above analysis, it can be concluded that commitSync and commitAsync have their own flaws We need to use commitSync and commitAsync in combination to achieve the best results, not affecting consumer TPS, and taking advantage of commitSync’s automatic retry function to avoid some transient errors (network jitter, GC, Rebalance issues), and it is recommended to use the hybrid commit mode in production environments to improve the robustness of Consumer 。
After talking about consumer shift submission above, we know that the consumer needs to be displaced after consuming data, so where exactly is the submitted displacement data stored and how it is stored, and then we will look at the new and old versions of Kafka for Offset storage methods.
We know that older versions of Kafka (before 0.8) relied heavily on Zookeeper for all kinds of orchestration, of course The version of Consumer Group is to store the displacement in ZooKeeper to reduce the overhead of state storage on the broker side, in view of the storage architecture design of Zookeeper, it is not suitable for frequent write updates, and the displacement commit of the Consumer Group is a high-frequency write operation, which will slow down the performance of the ZooKeeper cluster So in the new version of Kafka, the community redesigned the consumer group’s displacement management method, using to store the displacement inside Kafka (this is because Kafka Topic naturally supports high-frequency writing and persistence), which is the so-called famous __consumer_offsets.
__consumer_offsets: Used to save the displacement information submitted by Kafka Consumer, in addition, it is automatically created by Kafka, the same as a normal topic, its message format is also defined by Kafka itself, we cannot modify it. Here we are curious about what its message format is, let’s analyze and demystify it together.
__consumer_offsets Message format analysis dedicated:
the so-called message format can be simply understood as a KV pair. Key and Value represent the key value and message body of the message, respectively.
So what does Key store? Since it is to store the displacement information of Consumer, in Kafka, there will be a large number of Consumers, so there must be a field to identify which Consumer the displacement data belongs to, and how to identify the Consumer field? Earlier in the explanation of Consumer Group, we know that it shares a common and unique Group ID, so is it enough to just save it? We know that the Consumer submits the displacement in the dimension of the partition, and obviously, the key should also hold the partition where the Consumer is to submit the displacement.
parts should be stored in the key of the offset theme:
can be simply thought of as storing the offset value, of course, the underlying also stores some other metadata to help Kafka complete some other operations. For example, delete expired displacement data.
__consumer_offsets Message format diagram:
__consumer_ Offsets creation process
After talking about message formatting, let’s talk about __consumer_offsets How was it created? When the first consumer in a Kafka cluster starts, Kafka automatically creates __consumer_offsets 。 As mentioned earlier, it is an ordinary topic, it also has a corresponding number of partitions, if it is automatically created by Kafka, then how to set the number of partitions? This relies on the Broker side parameter offsets.topic.num.partitions (default is 50), so Kafka will automatically create one with 50 partitions __consumer_offsets 。 That’s why we see a lot of directories like __consumer_offsets-xxx under the Kafka log path. Since there is a number of partitions, there must be a corresponding number of replicas, which is another parameter on the broker side, offsets.topic.replication.factor (the default value is 3). To summarize, if the __consumer_offsets is automatically created by Kafka, then the number of partitions of the topic is 50, the number of replicas is 3, and the consumption of the specific group is stored in which partition, according to abs(GroupId.hashCode()) % NumPartitions This ensures that the Consumer offset information is on the same broker node as the coordinator corresponding to the consumer group.
View __consumer_offsets data
By default, Kafka provides scripts for users to view Consumer information in the following ways:
: 9092 --list
:9092 --group test-group-1 --describe
abs( GroupId.hashCode()) % NumPartitions
./bin/ kafka-console-consumer.sh --bootstrap-server message-1:9092 --topic __consumer_offsets -- formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" -- partition xx
./bin/kafka-console-consumer.sh -- bootstrap-server message-1:9092 --topic __consumer_offsets --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --partition xx
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition xx --broker-list
: 9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
kafka-simple-consumer-shell.sh --topic __consumer_ offsets --partition xx --broker-list
:9092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
[order-group- 1,topic-order,0]::[OffsetMetadata[36672 ,NO_METADATA],CommitTime 1633694193000 ,ExpirationTime 1633866993000]
After talking about the various implementation details of Consumer, let’s talk about the most important thing for Consumer, that is, the monitoring of consumption progress, or monitoring its lag degree (the degree to which Consumer is currently lagging behind Producer), there is a professional term here Consumer Lag。 For example: Kafka Producer successfully produces 10 million messages to a topic, and the consumer currently consumes 9 million messages, then it can be considered that the consumer lags 1 million messages, that is, lag equals 1 million.
For consumers, Lag should be considered the most important monitoring metric 。 It directly reflects how a consumer is performing. The smaller the lag value, the consumer can consume the message produced by the producer in time, and the lag degree is very small; If the value has a tendency to increase, there may be accumulation, which will seriously slow down downstream processing.
How can we monitor such an important metric? There are several main methods:
using Kafka’s own command-line tool, kafka-consumer-groups script
Use the Kafka Java Consumer API to program
JMX monitoring metrics that come with KafkaIf
are a cloud product, you can directly use the monitoring functions that come with the cloud product
and use the
So far, I have comprehensively and deeply analyzed all aspects of the underlying principle design of Kafka Consumer, and the chapter related to Kafka principle has come to an end, and the follow-up will be a special topic and source code analysis for Kafka detailed technical points, so stay tuned….