it takes about 30 minutes to read this article. There is a lot of dry goods in this article, I hope you can read it patiently.

Starting from this article, I will conduct an in-depth analysis of Kafka’s special knowledge, today I will talk about Kafka’s storage system architecture design, speaking of storage systems, you may be familiar with MySQL, and also know that MySQL is based on B+ tree as its index data structure.

What mechanism is Kafka based on for storage? Why is it designed this way? What problem does it solve? And how was it solved? What lofty technologies are used in it?

With these questions in mind, let’s talk to you about the deep thinking and implementation principles behind the design of the Kafka storage architecture.

After reading this article carefully, I believe you will have a deeper understanding of Kafka storage architecture. There are also ideas to touch the architecture of other storage systems.

     To sum up, we see that for the storage requirements of Kafka, the following points should be guaranteed:

1. The

main thing that is stored is the message flow (it can be a simple text format or other formats, for broker storage, it does not care about

the data itself)

2. To support the efficient storage and high durability of massive data (to ensure that the data is not lost after restarting)

3. To support the efficient retrieval of massive data (which can be efficiently queried and processed by offset or timestamp when consuming)

4. Ensure data security and stability and failover fault

tolerance2

With the above scenario requirements analysis, let’s analyze and see what mechanism Kafka is based on to store, can it be directly implemented with the existing relational database we have learned? Let’s move on to in-depth.

1

Store the basics

Let’s first understand the basic knowledge or common sense of storage, in our cognition, the speed of each storage medium is roughly the same as shown in the figure below, the higher the level, the faster the speed. Obviously, the disk is in a rather awkward position, however, the fact that the disk can be faster or slower than we expect depends entirely on how we use it.

Figure 2: Comparative distribution of storage media (from the network)

About the IO speed of disk and memory We can see from the results of the performance test in the figure below that the sequential I/O performance index of ordinary mechanical disks is 53.2M values/s, while the random I/O performance index of memory is 36.7M values/s. From this, it seems to be concluded that the sequential I/O performance of disk is stronger than the random I/O performance of memory.

Figure 3: IO speed comparison of disk and memory (from the network)

In addition, from the overall data read and write performance, there are different implementations, either to increase the read speed or to increase the write speed.

1. Improve read speed: Use indexes to improve query speed, but with indexes, a large number of write operations will maintain the index, which will reduce write efficiency. Common such as relational databases: mysql, etc.

2. Improve write speed

: This generally uses log storage, through sequential append writes to improve the write speed, because there is no index, can not be quickly queried, the most serious can only be read line by line. Common fields such as big data are basically based on this way.

2

Anatomy of a Kafka storage scheme

The above analyzes the implementation of the storage class system from the aspects of storage basics, as well as storage medium IO speed and read and write performance, so let’s take a look at which way to implement Kafka’s storage?

For Kafka, it is mainly used to process massive data streams, and the characteristics of this scenario mainly include

:

1. Write operation: The write concurrency requirements are very high, basically reaching millions of TPS, and the write log can be appended sequentially, without considering the update operation2.

 Read operation: Compared with the write operation, it is relatively simple, as long as it can be efficiently queried according to certain rules (offset or timestamp).

According to the above two points of analysis, for write operations, directly using the method of sequentially appending write logs can meet Kafka’s write efficiency requirements for millions of TPS. But how to solve the problem of querying these logs efficiently? Is it possible to store B+ tree data structures directly with MySQL? Let’s analyze them one by one:

if the B+ tree index structure is used for storage, then the index must be maintained every time it is written, and there is also additional space to store the index, and there will be operations such as “data page splitting” that often occur in relational databases, which are too heavy for a system with high concurrency such as Kafka, so it is not suitable.

But in the database index, there seems to be an index that

seems to be very suitable for this scenario, that is: the hash index [the underlying implementation is based on the Hash Table], in order to improve the read speed, we only need to maintain a mapping relationship in memory That is, every time you query the message according to Offset, get the offset from the hash table, and then read the file to quickly locate the data location to be read. However, hash indexes usually require resident memory, which is very unrealistic for Kafka to write millions of message data per second, and it is easy to burst the memory and cause OOM.

At this time, we can imagine designing the offset of the message as an ordered field, so that the message is stored in an

orderly manner in the log file, and there is no need to introduce an additional hash table structure, the message can be directly divided into several blocks, for each block, we only need to index the offset of the first message of the current block, is this a bit binary lookup The meaning of the algorithm. That is, the corresponding block is found according to the offset size first, and then the block is found sequentially. As shown in the following figure:

Figure 4: Schematic diagram

of Kafka sparse index query

This allows you to quickly locate the message you are looking for, and in Kafka, we call this index structure “sparse index”.

3

      From the background of the birth of Kafka, storage scene analysis, IO comparison of storage media, and Kafka storage scheme selection, the final storage implementation scheme of Kafka is obtained, that is, based on sequential append write log + sparse hash index.

Let’s take a look at the Kafka log storage structure:

>Figure 5: Kafka log storage junction As
can be seen from the figure above, Kafka is based on the structure of “theme + partition + replica + fragmentation + index”

: 1. Messages in kafka are categorized by topic topic as the basic unit, here topic is a logical concept, in fact, disk storage is stored according to partition partition, that is, each topic is divided into multiple partitions, the number of partition partitions can be specified when the topic topic is created.

2.  Partition partition is mainly designed to solve the horizontal scaling problem of Kafka storage, if all messages of a topic are stored only on one Kafka broker, for Kafka writes millions of messages per second high concurrency system, this broker will definitely have a bottleneck, failure is difficult to recover, so Kafka divides the message of the topic into multiple partitions, and then evenly distributes it to the whole Kafka Broker in the cluster.

3.  Each message in the partition partition will be assigned a unique message ID, which is what we usually call offset offset, so kafka can only guarantee the internal order of each partition, and does not guarantee global order.

4.  Then each partition partition is divided into multiple LogSegments, in order to prevent the log log from being too large, Kafka introduced the concept of log segmentation (LogSegment), which divides the log into multiple LogSegement, which is equivalent to a huge file being evenly divided into some relatively small files, which is also convenient for message finding, maintenance and cleaning. In this way, when doing historical data cleaning, you can directly delete the old LogSegement file.

4.  Log logs are physically only stored as folders, and each LogSegement corresponds to one log file and two index files on disk, as well as possibly other files (such as snapshot index files with the “.snapshot” suffix, etc.)

can also be directly viewed from the previous writing  Storage mechanisms section of Kafka Fundamentals is also explained in detail.

4

      After understanding the Kafka storage selection and storage architecture design, let’s take a closer look at the architecture design of the Kafka logging system.

According to the

above storage architecture analysis, we know that kafka messages are classified according to the topic Topic as the basic unit, each topic is logically independent, each topic can be divided into one or more partitions, each message will be appended to the specified partition according to the partition rules when sent, as shown in the following figure:

Figure 6: Figure 1 of the thematic logical structure of four partitions

Log directory layout

then Kafka What is the layout of the log directory where messages are written to disk? Veteran drivers who have been exposed to Kafka generally know that Log corresponds to a folder named . For example, if there is now a topic named “topic-order” with 4 partitions in the topic, then the actual physical storage is represented as “topic-order-0”, “topic-order-1”, “topic-order-2“, ” topic-order-3” These 4 folders.

Looking at the figure above, we know that the first message written to Log is written sequentially. But only the last LogSegement can perform write operations, and all previous LogSegements cannot perform write operations. To better understand this concept, we call the last LogSegementactiveSegement”, which indicates the currently active log segment. As messages are written, when activeSegement meets certain conditions, a new activeSege needs to be created, and then additional messages are written to a new activeSegement.

Figure 7: Schematic diagram

of activeSegment In order to retrieve messages more efficiently, each log file in LogSegment (with “.log” as the file suffix) has several corresponding index files: Offset index file (file suffix “.index”), timestamp index file (file suffix “.timeindex”), snapshot index file (file suffix “.snapshot”). Each of these LogSegments has an offset as a baseOffset offset ), an offset that represents the first message in the current LogSegment. The offset is a 64-bit long number, and the log file and these index files are named according to the base offset (baseOffset), the name is fixed to 20 digits, and the number of digits that are not reached is preceded by a 0. For example, the base offset of the first LogSegment is 0, and the corresponding log file is 0.log 000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000

Let’s take an example to write a certain number of messages to the topic topic-order, and the layout in the topic-order-0 directory at a certain moment looks like this:

>Figure 8: log Directory layout diagram

The reference offset corresponding to the LogSegment in the above example is 12768089, which also shows that the offset of the first message in the current LogSegment is 12768089, and it can also indicate that there are 12768089 in the current LogSegment messages (messages with an offset from 0 to 12768089).

Note that each LogSegment contains not only “.log”, “.index”, “.timeindex”, but also “.snapshot”, “.

txnindex”, “leader-epoch-checkpoint” and other files, as well as “.deleted”, “. Temporary files such as cleaned”, “.swap”.

In addition, when consumers consume, they will save the submitted displacement in the theme __consumer_offsets inside Kafka, and those who do not understand it can directly view what was written before  Talking about the shift commit part of Kafka Consumer, let’s look at the overall log directory structure

Figure 9: Schematic diagram of the overall directory layout

of log 2

Log format evolution

For a mature messaging middleware, the log format not only affects the expansion of functionality, but also affects the optimization of performance dimensions. Therefore, with the rapid development of Kafka, its log format is constantly being upgraded and improved, and Kafka’s log format has gone through a total of 3 major versions: V0, V1 and V2 versions.

We know that inside Kafka Partition partitions are made up of every message, if the log format is not designed to be sophisticated, its functionality and performance will be greatly compromised.

V0 version

Versions prior to Kafka 0.10.0 were in this version of the log format. In this version, each message corresponds to an offset and message size. Offset is used to represent its offset in the Partition partition. message size indicates the size of the message. The two add up to a total of 12B, known as the log header. The log header and Record as a whole are treated as a message. As shown in the following figure:

CRC

  • : the value after CRC calculation of the message;
  • magic:0;
  • attribute: 0x00 (no compression used);
  • Key length: 5;
  • key:hello;
  • value length: 5;
  • value:world。
  • The message length is: 4 + 1 + 1 + 4 + 5

    + 4 + 5 = 24 bytes.

    V1 version

           With the continuous iterative development of the Kafka version, users find that the V0 version of the log format cannot be judged by Kafka according to the specific time of the message due to the lack of retention time information, and can only use the modification time of the log file when cleaning the log, which may be deleted by mistake.

    The version of the log format used from V0.10.0 to

    V0.11.0 is V1, which has one more timestamp field than V0 to represent the timestamp of the message. As shown in the following figure:

    Figure 12

    : V2 version log format

    As can be seen from the above figure, the V2 version of the message batch (RecordBatch) has the following changes compared with the V0 and V1 versions:

    1.  The CRC value is removed from the message and extracted into the message batch.

    2.  Information such as procuder id, producer epoch, sequence number, etc. was added mainly to support idempotency and transaction messages.

    3.  Use incremental form to save timestamps and displacements.

    4.   The minimum message batch is 61 bytes, which is much larger than the V0 and V1 versions, but in the batch message sending scenario, it provides sending efficiency and reduces the space used.

    In summary, it can be seen that the V2 version log format mainly improves the space usage of the message format through variable length, and extracts some fields into the message batch (RecordBatch), and the message batch can store multiple messages, so as to send messages in batches , which can greatly save disk space.

    3

    Log cleanup mechanism

    Kafka stores messages on disk, and as the amount of written data increases, the disk footprint increases, and it is necessary to clean up the messages in order to control the footprint. From the above Kafka storage log structure analysis, each partition replica corresponds to a log, and the log can be divided into multiple log segments (LogSegments), which is convenient for Kafka to clean up the log.

    Kafka provides two log cleanup strategies:

    1.  Log Retention: Directly delete unqualified log segments according to certain retention policies.

    2.  Log Compaction: Integrates the keys of each message, and retains only the last version for different value values with the same key.

    Here we can set the log cleanup policy

    through the Kafka Broker parameter log.cleanup.policy, the default value is “delete”, That is, the cleanup policy of log deletion is adopted. If you want to adopt a cleanup strategy for log compression, you need to set log.cleanup.policy to compact”, which is not enough, it must be used log.cleaner. Enable (default is true) is set to true.

    If you want to support both cleanup strategies, you can directly set the log.cleanup.policy parameter to “delete,compact”.

    3.1 Log Deletion

    Kafka’s Log Manager has a special log cleaning task to periodically detect and delete unqualified log segment files (LogSegment), here we can use the parameter log.retention.check. on the Kafka Broker side interval.ms to configure, the default value is 300000, that is, 5 minutes.

    There are 3 retention policies in Kafka:

    time-based policies

           The log deletion task periodically checks whether the retention time in the current log file exceeds the set threshold (retentionms) to find a collection of log segment files that can be deleted deletableSegments)

    Among them, retentionMs can be judged by the size of these parameters on the Kafka Broker side

    log.retention.ms > log.retention.minutes > log.retention.hours priority is set, by default only the log.retention.hours parameter is configured, and a value of 168 is 7 days.

            It should be noted here that deleting the expired log segment file is not simply calculated according to the modification time of the log segment file, but calculated according to the largest timestamp largestTimeStamp in the log segment, first query the timestamp index file corresponding to the log segment, find the last index data of the timestamp index file, if the timestamp value is greater than 0, the value is taken, otherwise the last modified time (lastModifiedTime) will be used.

    【Delete steps】:

    1.  The log segments to be deleted are first removed from the hop table of the log segments maintained by the Log object to ensure that there are no more threads to read them.

          2.  Add the suffix “.deleted” to all files corresponding to the log segment, including index files.

    3. Finally, give a delayed task named “delete-file” to delete these files with the “.deleted” suffix. The default execution is once every 1 minute, which can be configured by file.delete.delay.ms.

    Figure 13: Time-based retention policy diagram

    Based on log size policy

    The log deletion task periodically checks whether the current log size exceeds the set threshold value (retentionSize) to find a collection of log segment files (deletableSegments) that can be deleted.

    Among them, retentionSize can be set by the parameter log.retention.bytes on the Kafka Broker side, and the default value is -1, that is, infinity.

    It should be noted here that log.retention.bytes sets the size of all log files in the log, not the size of a single log segment. A single log segment can be set with the parameter log.segment.bytes, and the default size is 1 G.

    【Delete steps】:

    1.  First, calculate the difference between the total size of the log file Size and the retention size, that is, the total size of the log that needs to be deleted.

            2.  Then start from the first log segment in the log file to find the file collection of log segments that can be deleted (deletableSegments)

    3.  Once found, you can delete it.

    Figure 14: Log size-based retention policy diagram

    based on log start offset

          The policy determines whether the starting offset baseOffset of the next log segment of the log segment is less than or equal to logStartOffset, and if so, the log segment can be deleted.

    【Delete steps as shown in the following figure】:

    1. First iterate through each log segment from the beginning, the next log segment of log segment 1 has a starting offset of 20, which is less than the size of the logStartOffset, and add log segment 1 to deletableSegments.

           2.  The next log offset for log segment 2 starts at 35, which is also less than the size of logStartOffset, adding log segment 2 pages to deletableSegments.

           3.  The next log offset of log segment 3 starts at 50, which is also less than the size of logStartOffset, adding log segment 3 pages to deletableSegments.

           4.  After the next log offset of log segment 4 is compared, to the right of logStartOffset, then all log segments starting from log segment 4 will not be added to deletableSegments.

           5.  After collecting all the log collections that can be deleted, you can delete them directly.

    Figure 15: Schematic diagram of retention policy based on log start offset
    5.2 Log compression


    Log Compaction Only the last version is kept for different value values with the same key. If your application only cares about the latest value value corresponding to the key, you can enable the corresponding log cleaning function of Kafka, and Kafka will periodically merge messages of the same key and retain only the latest value value.

    Log Compaction can be compared to the persistence mode of RDB in Redis. We can imagine that if Kafka is stored in every message change, at a certain time, after Kafka crashes abnormally, if you want to recover quickly, you can directly use the log compression strategy, so that only the latest data needs to be restored when recovering, which can speed up recovery.

    Figure 16: Schematic

    4

    of the log compression policy

    Disk datastore

    We know that Kafka relies on the file system to store and cache messages, as well as typical sequential append log writes, and it uses the operating system’s PageCache to reduce disk I/O operations, that is, to cache the data of the disk into memory, turning access to disk into access to memory.

    In Kafka, PageCache is used extensively , this is also one of the important factors for Kafka to achieve high throughput, when a process is ready to read the file content on the disk, the operating system will first check whether the data page to be read is in PageCache, and if it hits, return the data directly, thus avoiding I/O operations to the disk; If there is no hit, the operating system initiates a read request to disk and stores the read data page in PageCache, after which the data is returned to the process. Similarly, if a process needs to write data to disk, the operating system checks whether the data page is in the page cache, and if not, adds the corresponding data page in PageCache, and finally writes the data to the corresponding data page. The modified data page becomes a dirty page, and the operating system will write the data in the dirty page to disk at an appropriate time to maintain data consistency.

    In addition to message order append write logs and PageCache, kafka also uses zero-copy technology to further improve system performance, as shown in the following figure

    Figure 17: Kafka Zero-copy schematic diagram

    can also be viewed here Kafka Triple High Architecture Design Anatomy of a High Performance section.

    The overall process of messages from production to disk is shown in the following figure:

    Figure 18: Schematic diagram of the log message writing to disk

    class=”rich_pages wxw-img js_insertlocalimg” src=”https://mmbiz.qpic.cn/mmbiz_jpg/FrBePKkiazpqJibAKy9cChlumq32YW4MnTfkKyFguicVdGC37Z0On8Y7jdSQjUdWf10JE9Jz5NBBFwJUOxRQBLjKA/640?wx_fmt=jpeg”>

    5

    Starting from the scenario analysis of Kafka storage, the analysis and comparison of Kafka storage selection, and then the analysis of Kafka storage architecture design,

    and the in-depth analysis of Kafka log system architecture design details, this article takes you step by step to unveil the mystery of Kafka storage architecture.

    If my article is helpful to you, please follow, like, read, and retweet, thank you very much!

    Adhere to the summary, continue to output high-quality articles Follow me: Hua Tsai chats about technology

    featured article recommended:
    Kafka Core Principles Stage Summary <

    a class=”weapp_text_link js_weapp_entry” href> have problems communicating with Huazai messages

    Click on it to see your best