> included in Hehe Set #Kafka
id=”js_article-tag-card__right” class=”article-tag-card__right”> 4
> object is very high. Usually 2 times the actual data size (or more) As
amount of data on the heap increases, Java’s GC performance will be worse
Therefore, using a file system and relying on the operating system memory page cache is better than maintaining a memory cache or other structure in a program. At least the available memory for the operating system memory page cache has doubled. In addition, if you cache data using a compact byte structure, the available memory may be doubled compared to using objects. On a machine with 32GB of memory, the cache is available to 20-30GB, and it will not have any bad impact on the GC. Moreover, even if the service is restarted, this cache space is hot (unless the machine is restarted), and the memory cache in the user’s process is rebuilt after the service restart (10GB of data cache may take about 10 minutes).
This also simplifies the code logic, because consistency between the cache and the file system is guaranteed by the operating system.
With this analysis, the design is simple: we do the opposite, all data is written directly to the persistent log file on the file system, and there is no need to use an in-memory cache in the program or ensure that the data is flushed to disk. This effectively means that the data is transferred to the kernel’s memory page cache.
The O(log N) time complexity of the B-tree is not equivalent to constant time complexity for disk operations.
Kafka uses a log file to ensure that the time complexity of read and write operations is O(1).
Kafka doesn’t delete messages as soon as they’re consumed, but keeps them for a period of time, which is more flexible for consumers.
For systems like Kafka, even if poor disk access patterns are eliminated as described above, there are two data inefficiencies: too many small I/O operations, and too many byte copies.
Small I/O problems occur between the client and the server, as well as in data persistence operations within the server. In this regard, the Kafka protocol is based on the abstraction of “message sets” (that is, a batch of messages), so that network requests are read and written in batches of messages, reducing the time overhead of network round trips (note: the real-time nature of message processing will be relatively poor). The server also writes a batch of messages to the log file at a time, and the consumer also obtains a batch of messages at a time in order. This simple optimization can increase throughput by orders of magnitude.
For the problem of excessive byte copying, the impact is obvious when the message volume is large. Kafka uses a standardized binary message format that is used by producers, brokers, and consumers so that blocks of data do not need to change during transmission.
The message log maintained by the broker is just a bunch of files in a directory, and the file content is a sequential message set, and the data format of the message set is the same as that used by producer and consumer. Sharing a common data format facilitates an important operational optimization: network transmission of persistent log blocks. For data transfer operations from the pagecache to a network socket, modern UNIX operating systems provide a highly optimized path to code execution. Linux using sendfile system calls can take advantage of this optimization.
To understand the benefits of sendfile, you need to understand the general code execution path for transferring data from a file to a socket:
> The operating system reads data from disk to the kernel-space pagecacheThe application reads data from
This code execution path, involving 4 copies of data and 2 system calls, is clearly inefficient. With sendfile, unnecessary data copies between kernel space and user space are avoided, and the operating system can send data directly from the in-memory page cache to the network.
Learn more about sendfiles and how the Java platform supports zero-copy in this article (https://developer.ibm.com/articles/j-zerocopy/).
kernel space to the user-space
bufferThe application reads data
from the user-space buffer to the socket buffer in the kernel-space
The operating system reads data from the socket buffer to the NIC buffer, and the NIC reads data from the NIC buffer and sends it out over the network
The partition to which the message should be sent is determined by the
client based on the hashing algorithm (or random), and the message is sent directly by the producer to the leader broker of the destination partition, without any intermediate routing layer.
All Kafka nodes can respond to metadata requests – telling the client (producer or consumer) which service nodes are still alive and which node the leader of each partition of a topic is (wonder: if a partition leader node is down, how will the client know?). When will I find out? Message
delivery semantics between producer and consumer, divided into 3:
consumed at most once –
messages may be lost, but not consumed twice at
least once – Messages are not lost, but may
be consumed only once – the problem that
each message is consumed and consumed only once
can be divided into two phases: the persistence guarantee when the producer publishes a message to the broker and the durability guarantees for publishing a message and the guarantees when consuming a message.
When producer sends a message to a Kafka cluster, it provides a request parameter
acks=0: means that the producer does not need to wait for the partition leader broker to return any response, and storing the message in the socket buffer is considered to have been sent successfully. So reliability is not guaranteed.
In the second case, if the partition leader broker hangs up/does not live, the replica will lose messages that have time to synchronize in the future.
In the third case, as long as there is a synchronous copy to synchronize messages normally, then even if the leader is hung up, no data will be lost.
If the leader is judged by the system to be alive, a new leader is elected from the (synchronous) copy, so how does Kafka determine whether a node is alive? Survival decisions depend on 2 conditions:
acks=1: means that the partition leader broker writes the message to its own local log file, and responds successfully to the producer, without waiting for the partition replica broker to synchronize the good news.
acks=-1 or acks=all: means that the partition leader broker needs to wait for all synchronous replica brokers to synchronize good news and respond successfully before responding successfully to the producer
If the producer has a network problem while sending a message, it cannot determine whether the partition leader received the message. Prior to version 0.11.0.0, the producer could only resend messages and had no other way, so it could only provide “least once” delivery semantics. After version 0.11.0.0, Kafka producer supports an idempotent delivery option that ensures that message retransmissions do not result in duplicate entries in Kafka’s message log: the broker assigns an ID to each producer and then recycles based on the message sequence number.
Also starting with version 0.11.0.0, Producer supports sending messages to multiple topic partitions with transaction-like semantics: either all messages are sent successfully or none are successful. This capability is mainly used to implement only once-to-once semantics between Kafka topics.
From the consumer point of view, all replicas of the same partition have the same log data and the same consumption progress. Consumer can control where it consumes partition log data.
node must maintain a session connection to Zookeeper (via Zookeeper’s heartbeat mechanism).
If it is a follower, the message data must be constantly synchronized from the leader node, and the synchronization progress is not too far behind
How do I implement the “process once” semantics? Take advantage of the transaction power of Producer.
The granularity/unit of replication is the topic partition. In a Kafka cluster, each partition has one leader broker node and 0 or more followers. Partition reads and writes are handled by the leader broker.
Just like a normal consumer, pull messages from the leader broker from the node and write them to its own message log file. Let the slave node get the leader’s message data in the form of pull, the advantage is batch reading and writing.
For follower nodes, “alive” actually means “whether messages were successfully synchronized from the leader”, and the leader node tracks the set of “synchronizing” nodes (ISRs). If a follower hangs up/gets stuck/is too far behind synchronization, remove it from this ISRs. Whether follow is stuck or lagging behind too much depends on the
if the consumer reads the message, submits the consumption location to kafka before processing the message; If the consumer hangs up or restarts, it can result in dropped messages that only satisfy the “process at most” delivery semantics.
If the consumer reads the message, it is processed first and then the consumption location is submitted; If the consumer hangs up or restarts, it can result in duplicate consumption of messages, which only satisfies the “at least once” delivery semantics.
replica.lag.time.max.ms configuration parameters.
If a message is written to a partition,
if all synchronous replicas of the partition have written the message to their own message log files, the message can be considered committed, that is, the real write is successful.
Only write submitted messages are distributed to consumers.
The producer can choose whether to wait for message write operations to commit, making a trade-off between latency and durability.
After the leader node
of a partition is hung up, the Kafka cluster will quickly perform a short fail-over period and elect a new partition leader node without affecting availability. However, if network partitions occur, availability is not guaranteed. CAP – C (Consistency): Consistency, A (Availability): Availability, P (Partition Tolerance): Partition Tolerance – Partition Fault Tolerance is abandoned.
(Note: I don’t understand this section very well.)
A common type of distributed system is the master-slave model, where the master node determines the order of a series of values. Synchronize state data from nodes through replicated log. For commit decision and leader election, it is usually based on majority voting. Assuming that the number of replicas (note: personal understanding includes the primary node) is 2f+1, then only when f+1 replicas are successfully written, the master node will mark this write operation as committed. When the master node is down, a new primary node can be elected based on the f most recent replica nodes without any loss of state.
The majority voting method has an advantage: the latency depends on the fast node, not the slow one. The disadvantage is that for the actual production system, the anti-risk ability is not enough, and it is not flexible enough for users to make trade-offs.
Kafka chooses quorum sets differently, not based on majority votes, but dynamically maintains a set of in-sync replicas (ISRs) that are synchronized with the primary node. Only members of this set of replicas are eligible to be elected as the master node. ISR sets are persisted to Zookeeper when they change.
Based on the ISR model, if a topic partition has f+1 replicas, it can tolerate f nodes hanging up without losing any committed messages.
The academic paper that most closely matches the actual implementation of the Kafka ISR model is Microsoft’s PacificA(http://research.microsoft.com/apps/pubs/default.aspx?id=66814).
Availability and Durability Guarantee
Note: The producer sets
acks=all when sending messages Instead of requiring all replicas to confirm a successful write, the partition leader responds successfully to producer when all replicas (ISRs) in the current synchronization confirm a successful write. For example, if a topic is set to 2 replicas and one of the replica nodes is hooked, the write operation requiring
acks=all will also succeed. If the remaining replica nodes are also hung, then the message will be lost.
To facilitate the trade-off between availability and persistence, Kafka provides two topic-level configurations for cases where persistence is more important than availability:
disable dirty leader elections
Specify a minimum ISR set size
min.insync.replicas parameter setting): The partition [leader] will accept message writes only if the ISR set size is greater than the set minimum. This setting only takes effect if the producer uses
acks=all. (Note: In our production environment, the number of partition replicas is usually requested to be 3 (including leader), then
min.insync.replicas should be set to 2, but the default is 1.) With 1, when the partition has only one copy (that is, leader), the producer can write successfully, but if this copy is hung again, the data will be lost. Replica
A Kafka cluster generally has multiple topics, and each topic has multiple partitions, and for load balancing between nodes, partitions and partitions are usually distributed across all nodes in a round-robin fashion Leader role.
In addition, there is
an unavailable window of time before the leader is re-elected after partitioning the leader node, and in order to shorten this window, Kafka selects one of all brokers as a “controller”, which detects broker-level failures when a broker is found After hanging up, it is responsible for assigning a new leader to the affected partition, rather than each partition being responsible for re-selecting the owner, which is a lighter and faster selection process. If the controller node hangs, one of the surviving brokers becomes the new controller.
Consumer Spending Progress Tracking
Kafka designates a broker for each consumer group to store the offsets of each partition of the target topic, called the group coordinator 。 Any consumer instance in this consumer group should either submit the consumption progress to the group coordinator or get the last consumption progress from this group coordinator before starting. Kafka assigns coordinators to consumer groups based on their name. Consumers can send a FindCoordinatorRequest request to any broker to find their coordinator and get the coordinator’s details from the FindCoordinatorResponse response.
After the group coordinator receives an OffsetCommitRequest request, it writes the request data to a special compacted (http://kafka.apache.org/documentation/#compaction) Kafka topic – __ consumer_offsets。 All replicas in the target partition acknowledge receipt before the coordinator sends a successful response to the consumer for progress submission. The message log data for this topic is compacted periodically, because only the latest consumption progress needs to be maintained for each partition. The coordinator also caches the consumption progress in memory, which is convenient to quickly respond to consumption progress query requests.
Note: If there are a large number of consumer/consumer groups (for example: our advertising engine service, reading the front-line message topic, a machine instance is a consumer group, the number ranges from hundreds to thousands), then the pressure of the group coordinator will be relatively large, then it is more critical to ensure that the role of the group coordinator is evenly distributed to all brokers in the cluster. In addition, __consumer_offsets the number of partitions of this topic should not be too small, preferably the same number as the number of brokers or an integer multiple.
public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.
like + Looking, less bugs 👇