Click on “Middleware Circle of Interest” above and select “Set as Star”

Click on the image below to go directly to the article collection

Hello everyone, I am Viagra, author of “RocketMQ Technology Insider”, chief evangelist of RocketMQ community, columnist of Geek Time “Middleware Core Technology and Practice”, senior architect of Zhongtong Express Infrastructure, the more I work harder, the luckier I am, only perseverance and encouragement with everyone.

Recently tossing Kafka log cluster, due to the increasing number of applications deployed by the company, the log collection program will collect the logs to the Kafka cluster when there is a large delay, the total TPS has not been able to go up, in order not to affect the business team through the log troubleshooting problem, took the first problem to solve, and then troubleshoot, the Kafka cluster expansion, but after expansion is embarrassing is that the newly added 5 machines, there are two machines The consumption sending response time is significantly higher than that of other machines. In order to ensure the stability of the message service, the cluster is temporarily scaled down and the machine is removed from the cluster, the specific operation is to use the kill pid command simply and rudely, but an accident occurs.

The Java client was found to report the following error:

The error reported by the Go client is as follows:

It can basically be considered that some partitions do not have an online leader and cannot successfully send messages.

So why is this problem? If a node in Kafka goes offline, won’t failover automatically trigger and the partition leader won’t be re-elected? With this question in mind, please begin our journey of inquiry today.

First of all, we can first look at the routing information of the currently problematic partition, from the first figure to see the subject dw_test_kafka_0816000 101 partition message sent failure, we look at its status in Zookeeper, the specific command is as follows:

The command can see the trust information of the corresponding partition, as shown in the following figure:

Here it shows that the status of the leader is -1, and there is only one copy in the isr list, on broker-1, but at this time the machine with broker id 1 has gone offline, so why not trigger the partition Leader re-election?

In fact, see here, I believe you only need to think about it a little, you can find the clue, the value of the isr field is 1, indicating that the number of replicas of the partition is 1, indicating that the partition only stores data on a broker, once the broker goes offline, because there is no data on the partition on other brokers in the cluster, it is impossible to fail over at this time, because once the failover is carried out, the data of the partition will be lost, so the impact will be very serious.

So why is the number of copies of the theme set to 1? That’s because the pressure on the cluster was too great, the amount of data copied between nodes was huge, the network card was basically running at full capacity, and it was a log cluster, which had a large degree of acceptance of data loss, so in order to avoid a large number of data replication between clusters, the number of copies of the topic was set to 1.

However, the downtime and maintenance of cluster nodes is indispensable, and every time the maintenance is stopped, there will be a period of data writing failure. To solve this problem, we need to partition the topic before downtime, removing the partition of the topic from the cluster that needs downtime.

For specific practices for topic partition migration, please refer to the third part of my previous articleKafka Topic Migration Practices.

Kafka single copy of the theme in the cluster after a node off the line, will not be able to complete the partition failover mechanism, in order to grasp some of the implementation details of the underlying layer, I want to delve into some of the failover mechanisms of the Kafka node downline.

Tips: The next step is to explore the implementation principle in depth from the source code point of view, deepen the understanding of the process, if you are not interested, you can directly enter the 4th part of this article: summary.

The Zookeeper server that Kafka relies on stores the broker information of the current cluster memory alive, and the specific path is /{namespace}/brokers/brokers/ids, as shown below:

And each node under ids records some information of the broker, such as the protocol and port of the external service, it is worth noting that these nodes are temporary nodes, as shown in the following figure:

In this way, once the corresponding Broker is downline, the corresponding node will be deleted, and the Controller role in the Kafka cluster will listen for changes in the nodes under the node at startup, and respond, and eventually call the onBrokerFailure method of the KafkaController, as follows:

This method implementation is more complicated, we do not do too much dispersion here, focus on finding the failover mechanism of the partition, that is, we will specifically analyze the onReplicasBecomeOffline method of KafkaController, mainly explore the failover mechanism of the partition.

Due to the complexity of the implementation of this method, the distribution is explained in detail.

Step1: Group from the partition that needs to be set to the downline state, grouped by whether it needs to be deleted, the collection that does not trigger the deletion is represented by newofflineReplicasNotForDeletion, and the collection that needs to be deleted is represented by newofflineReplicasForDeletion.

Step2: Select the partition without a Leader, use partitionsWithoutLeader, the code is shown in the following figure:

The criterion for a partition without a Leader is that the Broker where the Leader copy of the partition is located is not offline and has not been deleted.

Step3: Change the partition state without a Leader to OfflinePartition, where the status update is placed in the memory of the kafka Controller, the specific memory structure: Map[TopicPartition, PartitionState].

Step4: Kafka partition state machine drive (trigger) partition state is OfflinePartition, NewPartition to OnlinePartition conversion, the state conversion mainly includes two important steps:

Due to the length of the article, our article will not systematically introduce the implementation details of the Kafka partition state machine, but first focus on the process of converting the offline state of the OfflinePartition to the OnlinePartition.

Let’s first explain the meaning of the parameters when the offline state of OfflinePartition is converted to OnlinePartition:

The basis for judging whether a partition is valid here is mainly based on the driving conditions set by the state machine, for example, only the three states of OnlinePartition, NewPartition, and OfflinePartition can be converted to OnlinePartition.

Next, we will focus on the specific implementation logic of changing to OnlinePartition, and the specific code is as follows:

The implementation is divided into 3 steps:

Next, we focus on the partition leader election implementation of the offline state change to OnlinePartition, the specific method is: PartitionStateMachine’s electLeaderForPartitions method, the code is as follows:

The implementation structure of this method is relatively simple, the return value is two sets, a set of successful elections, a set of failed elections, and if a recoverable exception occurs during the election process, it will be retried.

The specific retry logic is implemented by the doElectLeaderForPartitions method, which is very complex.

Zoning elections are implemented by PartitionStateMachine’s doElectLeaderForPartitions method, which is explained step by step.

Step1: First get the meta information from Zookeeper that needs to elect partitions, the code is as follows:

The routing information for topics in Kafka is stored in Zookeeper at the following path: /{namespace}/brokers/topics/{topicName}}/partitions/{partition}/state, as follows:

Step2: Assemble the query topic partition meta information into Map< TopicPartition, LeaderIsrAndControllerEpoch> Map structure, the code is as follows:

Step3: Compare the controllerEpoch in the partition with the epoch of the current Kafka Controller, and swipe out the invalid and valid collections as follows:

If the controllerEpoche of the current controller is smaller than the controllerEpoche in the partition state, it means that a new Broker has replaced the current Controller as the new Controller of the cluster, and the Leader cannot be selected and the log is printed.

Step4: Leader election according to Leader election strategy, code as follows:

Since we are transitioning from the OfflinePartition state to the OnlinePartition state this time, the incoming branch is leaderForOffline, and we will describe the method in detail later, the return value after the election is two sets, where partitionsWithoutLeaders represents the partition that did not successfully elect the Leader, And partitionsWithLeaders indicates that the division of the Leader was successfully elected.

Step5: The partition that has not successfully elected the Leader prints the corresponding log and joins it to the failed queue collection, as shown in the following figure:

Step5: Update the election results to the zookeeper as shown in the following figure:

Step6: Synchronize the latest partition election results to other broker nodes.

After the request to update the partition status LEADER_AND_ISR accepted by other brokers, it will become the leader node or slave node of the partition according to the leader and copy information of the partition, and the implementation details of this piece will be mentioned in the subsequent articles of the column.

So how exactly does the OfflinePartitionLeaderElectionStrategy electoral strategy conduct elections? Let’s explore the details of its implementation.

OfflinePartitionLeaderElectionStrategy’s election strategy implementation code can be found in PartitionStateMachine’s leaderForOffline, let’s take a step-by-step approach.

Step1: Initialize several collections first, the code is as follows

A brief introduction to the above variables:

Step2: Perform the partition Leader election, the specific implementation code is as follows:

First, explain the meaning of the following variables:

The specific election algorithm is as follows:

The electoral algorithm that goes offline to online is relatively simple: if unclean.leader.election.enable=false, the first one from the surviving ISR collection becomes the partitioned Leader, if there is no surviving ISR copy, and unclean.leader.election.enable=true, select an online copy, otherwise return to NONE, Indicates that a suitable Leader was not successfully selected.

Then return to the results of this election and complete this election.

This article starts from a production failure to analyze, after analysis shows that a single copy of the topic in the cluster of a single node offline will cause some queues can not be written, the solution is to first perform the topic partition move, that is, the need to stop the broker on the partition moved to other brokers, this process does not affect the message sending, message consumption.

Finally, if you, like me, like to look at the implementation details of partition failover, I will also lead you to see the source code, deepen your understanding of the zoning election mechanism, and learn from each other.

If this article is helpful to you, or inspired, help scan the QR code to pay attention to it, your support is the biggest motivation for me to keep writing.

Ask for one click and three links: like, forward, watching.

Pay attention to the public account: “middleware interest circle”, reply in the public account: “PDF” can get a large number of learning materials, reply to “column” can get 15 mainstream Java middleware source code analysis columns, and reply: Add group, you can communicate and learn with many BAT manufacturers.

Walk into the author

Some advice for 10-year IT veterans for newcomers to the workplace

“I” was spoiled by Alibaba

How programmers can increase their influence

A must-have skill for a good programmer is how to read source code efficiently

Another way for me to participate in the RocketMQ open source community

Click to view “Read the original article” to go directly to the [Middleware Interest Circle] article collection.