Kafka is a distributed message queue with high performance, persistence, multi-copy backup, and scale-out capabilities. The producer writes messages to the queue, and the consumer takes the messages from the queue for business logic. In general, it plays the role of decoupling, peak shaving, and asynchronous processing in the architecture design.
Kafka uses the concept of Topic externally, where producers write messages into Topics and consumers read messages from them.
In order to achieve horizontal scaling, a topic is actually composed of multiple partitions, and when a bottleneck is encountered, you can scale out by increasing the number of partitions. Within a single Parition is to keep messages organized.
Every time a new message is written, Kafka is written in the corresponding file append, so the performance is very high.
The basic process is this:
Serialize first, and then follow Topic and Partition to put them into the corresponding send queue. Kafka Produce is a batch of requests, will accumulate a batch, and then send together, not to call send() immediately network packets.
If the Partition is not filled in, then the situation will look like this:
Key has to be filled. Hash by Key and go to a partition for the same Key. (If the number of partitions is expanded, then it is not guaranteed)
Key is not filled. Round-Robin to select Partition.
These requests to the same partition are configured to be sent in waves, and then sent by a separate thread at once.
There is the High Level API, which has done a lot of things for us, Offset, routing and everything has been done for us, and it is very simple to use.
And then there’s the Simple API, and Offset is something we have to keep track of. (Note: When the message is consumed, first of all, we must know where to consume, this is the route, after consumption, to record the consumption order, is Offset)
When there are multiple replicas, try to assign multiple replicas to different brokers.
Kafka chooses a Leader for the Partition, after which all requests for that Partition are actually Leader, and then synchronized to other Followers.
When a broker stops cooking, all Leaders on that Broker’s partition are re-elected to elect a Leader. (The number of replicas that are not automatically replicated here is not the case with distributed file storage systems.)
Then there are two details involved:
How to assign a partition
How to choose Leader
Regarding the distribution of partitions, and the election of Leaders, there must be an executor. In Kafka, this performer is called the Controller.
Kafka uses ZK to elect a Controller in the Broker for partition assignment and Leader election.
Sort all brokers (assuming there are n brokers) and partitions to be assigned.
Assign the ith partition to the (i mod n) brokers (this is the Leader).
Assign the jth replica of the ith partition to the (i + j) mode n) brokers.
Leader disaster recovery
The Controller registers the Watch on ZK’s /brokers/ids node, which it knows as soon as the Broker goes down.
When the Broker goes down, the Controller selects a new Leader for the affected partition.
The controller reads the list of ISRs (in-sync replica synchronized copies) of the corresponding partition from ZK’s /brokers/topics/[topic]/partitions/[partition]/state, and selects one to be the Leader.
After the Leader is selected, update the ZK, and then send the LeaderAndISRRequest to the affected brokers to let them know about changing the matter.
Why instead of using ZK notifications here, but sending RPC requests directly to the broker, my understanding may be that ZK has performance problems with doing so.
If the ISR list is empty, then according to the configuration, randomly choose a replica as the leader, or simply this partition is a stop.
If there are machines on the ISR list, but they are also closed, then you can wait for the ISR’s machines to come alive.
The strategy here, the processing on the server side is that Follower pulls data from the Leader in bulk to synchronize. But the specific reliability is determined by the producer.
When producers produce messages, set the reliability of the data through the request.required.acks parameter.
The machines in the ISR list here change, and depending on the configuration replica.lag.time.max.ms, how long it has not been synchronized will be excluded from the ISR list.
In the past, there were also knocks out of the ISR based on how many messages were behind, but it was removed after version 1.0, because this value is difficult to take, and it is easy to have nodes constantly coming in and out of the ISR list at peak times.
After selecting a Leader from the ISA, Follower removes the record after the last high watermark in its log and goes to get new data with the Leader.
Because after the new Leader is selected, the data on the Follower may be more than the new Leader, so it is necessary to intercept.
The meaning of the high water mark here, for Partition and Leader, is the latest record in all ISRs. At most, consumers can only read high water marks.
From Leader’s point of view, a high-water update is delayed by one round, such as writing a new message that the brokers in the ISR have arrived, but the brokers in the ISR can only tell the Leader in the next round of Fetch.
It is also because of this high water delay that in some cases, Kafka will have data loss and active and standby data inconsistencies, starting from 0.11, using Leader Epoch instead of the high water mark.
Think: When Acks=-1
Will Follwers return to success after all come to Fetch, or will they wait for Follwers to come to Fetch in the second round?
The Leader has been written locally, but some machines in the ISR have failed, so how to deal with it?
A subscription topic is a subscription to a consumer group, which can have multiple consumers. Two consumers in the same consumer group will not consume a partition at the same time.
In other words, it is a partition that can only be consumed by one consumer in the consumer group, but can be consumed by multiple consumer groups at the same time.
Therefore, if there are more consumers in the consumer group than in the partition, then there will be individual consumers who are idle all the time.
When subscribing to a Topic, you can use a regular expression, and if there is a new topic match, it can automatically subscribe to it.
Offset for saving
A consumer group consumes a partition, you need to save the Offset record where to consume, previously saved in ZK, due to the poor write performance of ZK, the previous solution is to Consumer every minute to report.
The performance of ZK here severely affects the speed of consumption, and it is prone to duplicate consumption. After version 0.10, Kafka stripped this offset save from the ZK total and saved it in a topic called consumeroffsets topic.
The Key written into the message consists of Groupid, Topic, and Partition, and Value is the offset offset. The cleanup policy configured by Topic is Compact. Always keep the latest key, delete the rest.
In general, the offset of each key is cached in memory, and the query does not have to traverse the partition, if there is no cache, the first time it will traverse the partition to establish the cache, and then the query returns.
Thinking: If the service that is running has modified offsets.topic.num.partitions, is the save of Offset messed up?
The broker assigns a partition during the production process, and here in the consumption process, it also assigns a partition to the consumer.
Similar to the Broker to choose a Controller out, the consumer must also choose a Coordinator from the Broker for the allocation of partitions.
The following is explained from top to bottom:
How to choose Coordinator
Here we can see that the Consumer Group’s Coordinator and the Partition Leader that holds the Consumer Group’s offset are the same machine.
When Consumer starts, or the Coordinator goes down, Consumer arbitrarily requests a broker to send a ConsumerMetadataRequest request.
The broker will select the address of the Consumer corresponding to the Coordinator according to the method described above.
Consumer sends a Heartbeat request to the Coordinator and returns IllegalGeneration, which means that the Consumer information is old and needs to be re-added for Reblance.
Returns success, and Consumer continues execution from the last allocated partition.
Consumer sends a JoinGroupRequest request to the Coordinator.
When other consumers send Heartbeat requests, the Coordinator tells them that Reblance is coming.
Other consumers send JoinGroupRequest requests.
After all the recorded consumers have sent a JoinGroupRequest request, the Coordinator will randomly select a leader in the Consumer here.
Then go back to JoinGroupRespone, which tells Consumer whether you’re a Follower or a Leader, and for Leader, brings Follower’s information to it and lets it assign a Partition based on that information.
Consumer sends a SyncGroupRequest to the Coordinator, where the Leader’s SyncGroupRequest contains the assignment.
The Coordinator returns the package and tells the Consumer, including the Leader, about the assignment.
When the number of partitions or consumers changes, Reblance is done.
To list the cases of Reblance:
Consumers voluntarily shut down
The consumer is down
Coordinator himself went down
Message delivery semantics
Kafka supports 3 message delivery semantics:
At most once: At most once, messages may be lost, but not repeated.
At least once: At least once, messages are not lost and may be duplicated.
In the business, the model at least once is often used, and if reentrant is required, it is often implemented by the business itself.
At least once
First obtain the data, then carry out the business processing, after the successful business processing Commit Offset:
Producer production message exception, whether the message was successfully written is uncertain, redo, may write duplicate messages.
After the consumer processes the message, the update offset fails after the business is successfully processed, and the consumer restarts and repurchases it.
At most once
First get the data, then Commit Offset, and finally do the business processing:
The producer produces the message abnormally, no matter what, the next message is produced, and the message is lost.
The consumer processes the message, first updates the Offset, and then does the business processing, the business processing fails, the consumer restarts, and the message is lost.
The idea is this, first of all, to ensure that the news is not lost, and then to ensure that it is not repeated. So stare at the reasons at least once to get involved.
First come up with it:
Producer redo results in repeated write messages: production guarantees idempotence.
Consumer Repeat Consumption: Eliminate duplicate consumption, or business interfaces ensure idempotent duplicate consumption.
Since whether the business interface is idempotent, is not guaranteed by Kafka, so the Exactly once provided by Kafka here is limited, and the downstream of the consumer must also be Kafka.
So the following discussion, without special instructions, the downstream system of consumers is Kafka (Note: using Kafka Conector, it adapts some systems to achieve Exactly once). Producer idempotency is easy to do, no problem.
Downstream systems guarantee idempotence, and duplicate consumption does not result in multiple records.
Originally Exactly once implemented point 1 OK. However, in some usage scenarios, our data source may be multiple topics, and after processing, output to multiple topics, we will expect the output to either all succeed or all fail. This requires transactionality.
Since you want to do a transaction, simply solve the problem of duplicate consumption from the root, and bind Commit Offset and output to other topics into a transaction.
The idea is to assign each Producer a Pid as a unique identifier for that Producer.
Producer maintains a monotonically increasing Seq for each one. Similarly, Broker records the latest Seq for each one.
The broker accepts the message when req_seq == broker_seq+1 because:
When the Seq of the message is larger than the Seq of the broker, it means that there is data in the middle that has not been written, that is, it is out of order.
The Seq of the message is not smaller than the Seq of the broker, which means that the message has been saved.
The scenario goes like this:
Get data from multiple source topics first.
Do business processing, write to the downstream of multiple purposes of the topic.
Points 2 and 3 are a transaction that either all succeed or all fail. This is thanks to the fact that Offset is actually saved with a special Topic, both of which are combined into transactional processing of writing multiple topics.
The introduction of a Tid (transaction id), unlike Pid, is provided by the application to identify the transaction and does not matter who the Producer is.
That is, any producer can use this Tid to do transactions, so that transactions that die halfway through can be restored by another producer.
At the same time, in order to record the state of the transaction, similar to the handling of Offset, the Transaction Coordinator was introduced to record the Transaction Log.
There will be multiple Transaction Coordinators in the cluster, each of which corresponds to a unique Transaction Coordinator.
When doing a transaction, first mark the open transaction, write data, and all success is recorded in the Transaction Log as Prepare Commit status, otherwise the state of Prepare Abort is written.
First use Tid to request any broker (the broker with the lowest load is written in the code) and find the corresponding Transaction Coordinator.
Request the Transaction Coordinator to get the corresponding Pid, and the corresponding Epoch of the Pid, which is used to prevent the resurgence of the zombie process from causing the message to be messed up.
When the Epoch of the message is smaller than the Epoch currently maintained, it is rejected. Tids and PIDs have a one-to-one correspondence relationship, so that the same Pid is returned for the same Tid.
The client first requests the transaction state recorded by the Transaction Coordinator, the initial state is Begin, and if it is the first in the transaction to arrive, the transaction is timed.
The client outputs data to the relevant partition; The client then requests the Transaction Coordinator to record the transaction status of the Resolved; The client sends Offset Commit to the corresponding Offset Partition.
The client sends a Commit request, the Transaction Coordinator logs Prepare Commit/Abort, and then sends a Marker to the relevant Partition.
After all success, record the status of Commit/Abrot, and the last record does not need to wait for the ACK of other replicas, because Prepare does not lose to ensure the final correctness.
Here the state of Prepare is mainly used for transaction recovery, such as sending a control message to the relevant Partition, which is down without finishing, and after the standby is up, when the Producer sends a request to get the Pid, it will complete the incomplete transaction.
When the Marker of Commit is written to the Partition, the related message is read. So during the time period from Prepare Commit to Commit for Kafka transactions, messages are gradually visible, not at the same time.
The front is all about looking at things from a production perspective. There are also some issues that need to be considered from the perspective of consumption.
When consuming, there will be some messages in the partition that are not in the commit state, that is, the messages that the business side should not see, and it is necessary to filter these messages so that the business does not see them, and Kafka chooses to filter in the consumer process instead of filtering in the broker, the main consideration is performance.
A key point of Kafka’s high performance is Zero Copy, if you need to filter in the broker, then you will inevitably need to read the message content to memory, you will lose the characteristics of Zero Copy.
Kafka’s data is actually stored in the file system as files. There is a partition under Topic, there is a Segment under Partition, Segment is an actual file, and Topic and Partition are abstract concepts.
Under the directory /partitionid}/, the actual Log file (i.e. Segment) is stored, along with the corresponding index file.
Each Segment has an equal file size, the file name is named after the smallest offset in the segment, and the file extension is .log. Segments correspond to indexes with the same file name as .index.
There are two Index files:
One is the Offset Index used to look up the Message by Offset.
The overall organization is as follows:
In order to reduce the size of the index file, reduce the use of space, and facilitate direct loading into memory, the index here uses a sparse matrix, not every Message records the specific location, but every certain number of bytes, and then an index is built.
The index consists of two parts:
BaseOffset: This index corresponds to the Messages in the Segment file. Doing so facilitates the use of numerical compression algorithms to save space. For example, Kafka uses Varint.
When looking for records corresponding to Offset, it will first use the dichotomy to find out which Offset is in the corresponding Segment, and then use the index to locate the approximate position of Offset in the Segment, and then traverse to find the Message.
Common configuration items
There is also a version before 0.10, the time looks at the Mtime of the log file, but this value is inaccurate, it is possible that the file is touched a bit, Mtime will change. Therefore, starting with version 0.10, the time of the latest message in the file is used instead.
Cleanup by size should also note here that Kafka tries to compare the current log volume total size in the scheduled task to see if the total size of the current log volume exceeds the threshold for at least one log segment. If more than but not more than one log segment is not deleted.
End of body
1. Please don’t use SpringMVC, it’s too low! Spring has officially announced a more awesome alternative framework!
2. Build a startup backstage technology stack from scratch
3. What platform can programmers generally pick up private jobs?
4. Spring Boot+Redis+interceptor+Custom Annotation implements automatic power of the interface
5. Why can’t the domestic 996 do the foreign 955?
6. What is the level of China’s railway booking system in the world?
7.15 pictures to understand the difference between being busy and being efficient!