With performance as the beginning of Kafka’s journey, let’s dive into the inner secrets of Kafka’s “fast”. Not only can you learn about Kafka’s various methods of performance optimization, but you can also extract various performance optimization methodologies that can also be applied to our own projects to help us write high-performance projects.
65: Redis and Kafka are completely different middleware, is there a comparison?
Yes, so this article is not about “Distributed Cache Selection”, nor is it “Distributed Middleware Comparison”. We focus on the performance optimization of these two different areas of the project, look at the common means of performance optimization of excellent projects, and the optimization methods for different scenarios.
Many people learn a lot and understand a lot of frameworks, but when they encounter practical problems, they often feel that they lack knowledge. This is the failure to systematize the learned knowledge and not abstract the methodology that works from concrete implementations.
It is important to learn about open source projects by induction, summarizing the methodology of the excellent implementation of different projects, and then interpreting it into my own practice.
From a highly abstract point of view, performance issues do not escape the following three aspects:
For networked distributed queues like Kafka, network and disk are top priorities for optimization. For the abstract problems raised above, the solution is also very simple to be highly abstract:
Knowing the problems and ideas, let’s take a look at what roles are available in Kafka, and these roles are the points that can be optimized:
Yes, all the problems, ideas, optimization points have been listed, we can refine as much as possible, all three directions can be refined, so that all the implementation is clear at a glance, even if we do not look at the implementation of Kafka, we can also think of one or two points where we can optimize.
That’s the way of thinking. Ask a question > List problem points > List optimization methods > List specific points that can be cut > tradeoff and refine implementations.
Now, you can also try to think about the optimization points and methods yourself, without perfection, regardless of whether it is achieved or not, think a little is a little.
65 Brother: No, I’m stupid and lazy, you better tell me directly, I’m a white prostitute.
65 Brother: People Redis is a pure memory-based system, you kafka also read and write disks, can you compare?
Why is it slow to write to disk?
We can’t just know the conclusion, but we don’t know why. To answer this question, we have to go back to the operating system course we took when we were in school. 65 Does Brother still have textbooks? Now, turn to the chapter on disks, and let’s review how disks work.
65 Brother: The ghost is still there, oh, the course is not half of the book is gone. If it weren’t for the good look in the exam, it is estimated that I have not graduated yet.
Look at the classic big picture:
To complete a disk IO, you go through three steps: seeking, spinning, and data transfer.
The factors that affect disk IO performance also occur in the above three steps, so the main time spent is:
Seek time: Tseek refers to the time it takes to move the read/write head to the correct track. The shorter the seek time, the faster the I/O operation, and the average seek time of the disk is generally 3-15ms.
Rotation Delay: Trotation is the time it takes for the platter to rotate and move the sector where the requested data is located below the read and write disks. The rotation delay depends on the disk speed and is usually expressed as 1/2 of the time it takes for the disk to spin one week. For example, a disk with an average rotational latency of 7200rpm is about 60*1000/7200/2 = 4.17ms, while a disk with a rotational speed of 15000rpm has an average rotational delay of 2ms.
Data transfer time: Ttransfer refers to the time required to complete the transfer of the requested data, which depends on the data transfer rate, and its value is equal to the data size divided by the data transfer rate. Currently, the IDE/ATA can reach 133MB/s and the SATA II can achieve an interface data transfer rate of 300MB/s, which is usually much less than the time consumed by the first two parts. Ignorable in simple calculations.
Therefore, if you omit seek and rotate when writing to the disk, you can greatly improve the performance of disk reads and writes.
Kafka improves disk write performance by writing files sequentially. Sequential file writes basically reduces the number of disk seek and rotation. The magnetic head no longer has to dance on the track, but sps forward.
Each partition in Kafka is an orderly, immutable sequence of messages, new messages are constantly appended to the end of the partition, in Kafka Partition is just a logical concept, Kafka divides the partition into multiple segments, each Segment corresponds to a physical file, Kafka appends to the segment file, which is the sequential write file.
65 Brother: Why can Kafka use appendix?
This has to do with the nature of Kafka, let’s look at Kafka and Redis, to put it bluntly, Kafka is a Queue, and Redis is a HashMap. What is the difference between Queue and Map?
The queue is FIFO and the data is orderly; HashMap data is unordered and is read and written randomly. Kafka’s immutability, ordering allows Kafka to write files using append writes.
In fact, many data systems that meet the above characteristics can use append and write methods to optimize disk performance. Typical are Redis AOF files, WAL (Write ahead log) mechanisms for various databases, and so on.
Therefore, if you clearly understand the characteristics of your own business, you can optimize it accordingly.
65 Brother: Haha, I was asked about this interview. Unfortunately, the answer is average, alas.
What is Zero Copy?
Let’s look at the Kafka scenario, Kafka Consumer consumes data stored on the Broker disk, from reading the Broker disk to the network to the consumer, what system interactions are involved in the process. Kafka Consumer consumes data from the broker, and the broker reads the log and uses sendfile. If you use the traditional IO model, the pseudocode logic looks like this:
As shown in the figure, if you adopt the traditional IO process, first read the network IO, and then write the disk IO, you actually need to copy the data four times.
First: read the disk file to the operating system kernel buffer;
The second time: copy the kernel buffer data to the application’s buffer;
Step 3: copy the data in the application buffer to the socket network send buffer;
The fourth time: copy the data of socket buffer to the network card, and the network card will transmit it on the network.
65 Brother: Ah, is the operating system so stupid? copy to copy to go.
It is not that the operating system is stupid, the design of the operating system is that each application has its own user memory, user memory and kernel memory isolation, this is for the sake of program and system security considerations, otherwise each application memory is flying all over the sky, read and write at will.
However, there is also zero-copy technology, in English – Zero-Copy. Zero copy is to minimize the number of copies of the above data, thereby reducing the CPU overhead of the copy, reducing the number of context switches in the user-state kernel state, and thus optimizing the performance of data transfer.
There are three common zero-copy ideas:
Direct I/O: Data directly crosses the core, passes between the user address space and the I/O device, and the core only performs the necessary virtual storage configuration and other auxiliary work;
Avoid copying data between the kernel and user space: When the application does not need to access the data, copying data from kernel space to user space can be avoided;
Copy-on-write: Data does not need to be copied in advance, but is partially copied when modifications are needed.
Kafka uses mmap and sendfile to implement zero copies. MappedByteBuffer and FileChannel.transferTo for Java, respectively.
Implement zero copies using Java NIO, as follows:
Under this model, the number of context switches is reduced to one. Specifically, the transferTo() method instructs the block device to read data into the read buffer through the DMA engine. The buffer is then copied to another kernel buffer to be staged to the socket. Finally, the socket buffer is copied to the NIC buffer via DMA.
We reduced the number of replicas from four to three, and only one of those replicas involved CPU. We also reduced the number of context switches from four to two. This is a big improvement, but zero copies have not been queried. When running Linux kernel 2.4 and later and network interface cards that support collection operations, the latter can be implemented as a further optimization. This is shown below.
Based on the previous example, calling the transferTo() method causes the device to read data through the DMA engine into the kernel read buffer. However, when using the gather operation, there is no replication between the read buffer and the socket buffer. Instead, give the NIC a pointer to the read buffer along with the offset and length, which are cleared by DMA. The CPU absolutely does not participate in copying buffers.
For details on zero-copy, you can read this article on Zero-copy and its application.
When the producer produces a message to the broker, the broker uses the pwrite() system call [FileChannel.write() API corresponding to Java NIO to write the data by offset, in which case the data is written to the page cache first. When consumer consumes messages, the broker uses the sendfile() system to call [corresponding to the FileChannel.transferTo() API] to transmit the data from the page cache to the broker’s Socket buffer in zero copies, and then over the network.
The synchronization between leader and follower is the same as the process of consumer consumption data above.
The data in the page cache is written back to disk with the scheduling of the flusher thread in the kernel and the call to sync()/fsync(), so you don’t have to worry about data loss even if the process crashes. In addition, if the message to be consumed by consumer is not in the page cache, it will go to the disk to read, and some adjacent blocks will be read out and put into the page cache by the way to facilitate the next read.
So if the production rate of the Kafka producer is not much different from the consumption rate of the consumer, then the entire production-consumption process can be completed almost exclusively by reading and writing to the broker page cache, with very few disk accesses.
65 Brother: Well, as a Java programmer, Netty
Yes, Netty is an excellent networking framework in the JVM space, providing high-performance network services. When most Java programmers mention web frameworks, the first thing that comes to mind is Netty. Great frameworks like Dubbo, Avro-RPC, and more use Netty as the underlying network communications framework.
Kafka implements the network model itself to do RPC. The underlying layer is based on Java NIO, using the same Reactor threading model as Netty.
The Reacotr model is divided into three main roles
Reactor: Assigns IO events to the corresponding handler handler for processing
Acceptor: Handles client connection events
Handler: Handles non-blocking tasks
In the traditional blocking IO model, each connection needs independent thread processing, and when the number of concurrency is large, the number of creation threads is large, and the resource is consumed; Using the blocking IO model, after the connection is established, if the current thread has no data to read, the thread will block the read operation, resulting in waste of resources
In view of the two problems of the traditional blocking IO model, the Reactor model is based on the idea of pooling, avoiding the creation of threads for each connection, and handing over the business processing to the thread pool after the connection is completed. Based on the IO multiplexing model, multiple connections share the same blocking object without waiting for all connections. When traversing to the point where new data can be processed, the operating system notifies the program and the thread jumps out of the blocking state for business logic processing
Kafka implements multiplexing and processing thread pools based on the Reactor model. Its design is as follows:
It contains an Acceptor thread to process new connections, Acceptor has N Processor threads select and read socket requests, and N Handler threads process requests and correspondingly, that is, process the business logic.
I/O multiplexing allows the system to process multiple client requests simultaneously by multiplexing multiple I/O blocking onto the same select block. Its biggest advantage is that the system overhead is small, and there is no need to create new processes or threads, which reduces the resource overhead of the system.
Summary: Kafka Broker’s KafkaServer design is an excellent network architecture, and students who want to understand Java network programming or need to use this technology may wish to read the source code. Subsequent Kafka articles in the “Code Brother” series will also cover the interpretation of this source code.
Kafka Producer sends a message to a broker not a message sent by a message. Students who have used Kafka should know that Producer has two important parameters: batch.size and linger.ms. These two parameters are related to the bulk sending of Producers.
The execution flow of Kafka Producer is shown in the following figure:
Messages are sent through the following processors in turn:
Serialize: Both the key and the value are serialized according to the serializer passed. Excellent serialization can improve the efficiency of network transmission.
Partition: Determines which partition of the subject to write the message to, following the murmur2 algorithm by default. A custom partitioner can also be passed to producers to control which partition the message should be written to.
Compress: By default, compression is not enabled in Kafka producers. Compression not only transfers faster from producer to agent, but also faster during the replication process. Compression helps increase throughput, reduce latency, and improve disk utilization.
Accumulate: Accumulate, as the name suggests, is a message accumulator. It maintains a Deque double-ended queue internally for each partition, which holds the batch data to be sent, and Accumulate accumulates the data to a certain amount, or sends the data in batches within a certain expiration time. Records are accumulated in a buffer for each partition of the topic. Records are grouped according to the producer batch size attribute. Each partition in the topic has a separate accumulator/buffer.
Group Send: The batches of partitions in the record accumulator are grouped by the agent to which they are sent. Records in the batch are sent to the proxy based on batch.size and linger.ms properties. Records are sent by the producer based on two conditions. When a defined batch size is reached or a defined delay time is reached.
Kafka supports several compression algorithms: lz4, snappy, gzip. Kafka 2.1.0 officially supports ZStandard – ZStandard is Facebook’s open-source compression algorithm designed to provide ultra-high compression ratios, see zstd for details.
Producer, Broker, and Consumer use the same compression algorithm to save a lot of network and disk overhead when the producer writes data to the broker, and Consumer reads the data to the broker without even decompressing, and finally unzips the Consumer Poll when the message arrives.
Kafka’s Topic can be divided into partitions, each Paritition resembling a queue, keeping the data in order. Different Consumer Concurrent Consumption Parititions under the same Group, partitioning is actually the smallest unit to tune Kafka’s parallelism, so it can be said that each additional Paritition adds a consumption concurrency.
Kafka has an excellent partition allocation algorithm, StickyAssignor, which ensures that the partition distribution is as balanced as possible, and that the result of each redistribution is as consistent as possible with the previous allocation. In this way, the partitions of the entire cluster are as balanced as possible, and the processing of the individual brokers and consumers is not too skewed.
65 Brother: Isn’t the more partitions the better?
Of course not.
The more partitions you need to open more file handles
In kafka’s broker, each partition is compared to a directory of the file system. In the kafka’s data log file directory, each log data segment is assigned two files, an index file and a data file. Therefore, as partitions increase, the number of file handles required increases dramatically, and if necessary, you need to adjust the number of file handles that the operating system allows to open.
The more memory the client/server side needs to use
The client producer has a parameter, batch.size, which defaults to 16KB. It caches messages for each partition and sends them out in bulk as soon as they are full. It looks like this is a performance-enhancing design. Obviously, however, because this parameter is partition-level, the more partitions there are, the more memory footprint this part of the cache will require.
Reduce high availability
The more partitions there are, the more partitions are allocated on each broker, and when a broker goes down, the recovery time will be long.
Kafka messages are categorized in Topics, and each topic is independent of each other and does not affect each other. Each topic can in turn be divided into one or more partitions. Each partition has a log file that records message data.
Kafka Each partition log is physically physically divided into multiple segments by size.
Segment file composition: composed of 2 parts, respectively index file and data file, these 2 files correspond to one by one, appear in pairs, suffix “.index” and “.log” are represented as segment index files, data files, respectively.
segment file naming rules: The first segment of the partion global starts at 0, and each subsequent segment file name is the offset value of the last message of the previous segment file. Numeric values are up to 64 bits long in size, 19 digits in character length, and no numbers are padded with 0.
The index uses sparse indexes so that each index file size is limited, and Kafka uses mmap to map the index file directly to memory, so that operations on index do not require disk IO. The Java implementation of mmap corresponds to MappedByteBuffer.
65 Brother Note: mmap is a method of memory-mapping files. That is, map a file or other object to the address space of the process, and realize the one-to-one relationship between the file disk address and a virtual address in the virtual address space of the process. After implementing such a mapping relationship, the process can use the pointer to read and write this piece of memory, and the system will automatically write back the dirty page to the corresponding file disk, that is, complete the operation on the file without having to call read, write and other system call functions. Conversely, kernel space modifications to this area also directly reflect user space, allowing for file sharing between different processes.
Kafka makes full use of the dichotomous method to find the message location corresponding to offset:
Follow the dichotomy to find .log and .index of segments smaller than offset
Subtract offset from offset in the file name to get the offset of the message in this segment.
Again, use dichotomies to find the corresponding index in the index file.
to the log file, look in order until you find the message corresponding to offset.
Kafka is an excellent open source project. Its performance optimization is done vividly, and it is a project that is worth our in-depth study. Whether it is thought or realized, we should seriously look at it and think about it.
Kafka Performance Optimization:
Zero copies of network and disk
Excellent networking model, based on Java NIO
Efficient file data structure design
Parition is parallel and extensible
Bulk transfer of data
Sequentially read and write disks
Lockless lightweight offset