Previous:Alibaba Open Source Intranet High Concurrency Programming Manual .pdf

2.4 Hybrid Layer – SSD New Cache Architecture

Third, large-scale cluster management optimization

3.1 Isolation policy

3.2 Full link monitoring

3.3 Service Lifecycle Management

3.4 TOR Disaster Recovery

Fourth, the future outlook

First, the status quo and challenges

1. Current situation

Kafka is an open source stream processing platform, let’s first understand the current situation of Kafka on the Meituan data platform.

Figure 1-1 Current status of Kafka in Meituan’s data platform

As shown in Figure 1-1, the blue section depicts Kafka positioning as a streaming storage layer in the data platform. The main responsibility is to cache and distribute data, it will collect the logs distributed to different data systems, these logs from the system log, client logs and business databases. The downstream data consumption system includes offline computing through ODS warehousing, direct use for real-time computing, synchronization to the log center through DataLink, and OLAP analysis.

The total number of machines in Kafka’s cluster size in Meituan has exceeded 15,000+, and the maximum number of machines in a single cluster has reached 2,000+. In terms of data scale, the number of heaven-level messages has exceeded 30+P, and the peak of heaven-level messages has reached 4+00 million / second. However, as the size of the cluster increases and the amount of data increases, the challenges facing Kafka are becoming more and more serious, and the following are the specific challenges.

2. Challenges

Figure 1-2 Challenges faced by Kafka in Meituan’s data platform

As shown in Figure 1-2, the specific challenges can be summarized in two parts:

1) Slow node affects read and write, here the slow node refers to a concept of HDFS, specifically defined as a broker with a read-write delay TP99 greater than 300ms. There are three causes of slow nodes:

Cluster load imbalance can lead to local hotspots, that is, the disk space of the entire cluster is very abundant or the ioutil is very low, but some disks are about to be full or ioutil is full.

PageCache capacity, for example, an 80GB PageCache can only cache data for 8 minutes at a write volume of 170MB/s. Then if the consumed data is 8 minutes old, it is possible to trigger a slow disk access.

Threading model flaws in the Consumer client can cause the end-to-end latency metric to be distorted. For example, TP90 may be less than 100ms when multiple partitions consumed by Consumer are in the same broker, but TP90 may be greater than 1000ms when multiple partitions are in different brokers.

2) The complexity of large-scale cluster management is manifested in 4 types of problems:

Different topics will affect each other, and the traffic of individual topics will increase suddenly, or the backreading of individual consumers will affect the stability of the overall cluster.

Kafka’s native Broker granularity metrics are not robust enough, making problem localization and root cause analysis difficult.

Fault perception is not timely, and the processing cost is high.

A Rack-level failure can make some partitions unavailable.

Second, read and write delay optimization

Next, let’s first introduce what optimizations the Meituan data platform has made for the problem of read and write latency. First of all, from the macro level, we will divide the affected factors into the application layer and the system layer, and then introduce the problems in the application layer and the system layer in detail, and give the corresponding solutions, including pipeline acceleration, Fetcher isolation, migration cancellation and Cgroup resource isolation, etc. The following details the implementation of various optimization schemes.

1. Overview

Fig. 2-1 Kafka read-write delay optimization overview

Figure 2-1 shows an overview of the problems encountered in read and write latency and the corresponding optimization schemes. We divide the affected factors into the application layer and the system layer.

The application layer mainly includes 3 types of problems:

1) The load imbalance of the broker side, such as unbalanced disk usage and unbalanced ioutil. Individual disk load increases affect requests that affect the entire broker.

2) Broker’s data migration has efficiency problems and resource competition problems. Specifically, it includes the following three levels:

Migrations can only be submitted serially by batch, and there may be a small number of partitions per batch that are slow to submit the next batch, resulting in migration efficiency being affected.

Migrations are typically performed at night, and if the migration is not completed until the midday rush hour, it can significantly affect read and write requests.

Issues with migration requests and live pull with shared Fetcher threads cause partitioned migration requests to affect live consumption requests.

3) There is a defect in the single-threaded model of the Consumer side, resulting in distortion of O&M indicators, and the number of partitions consumed by a single Consumer is not limited, and the insufficient consumption power cannot keep up with the real-time latest data, which may cause retrospective reading when the number of partitions consumed increases.

The system layer also mainly includes 3 types of problems:

1) PageCache contamination. Kafka uses the ZeroCopy technology provided by the kernel layer to improve performance, but the kernel layer is unable to distinguish between real-time read and write requests and back-read requests, resulting in disk reads that may pollute the PageCache and affect real-time reads and writes.

2) HDD performance is poor under random read and write loads. HDDs are friendly to sequential reads and writes, but performance is significantly reduced in the face of random reads and writes in mixed-load scenarios.

3) Resource competition problems for system resources such as CPU and memory in mixed scenarios. In Meituan’s big data platform, in order to improve the utilization of resources, IO-intensive services (such as Kafka) will be mixed with CPU-intensive services (such as real-time computing operations), and there will be resource competition in the mixing, affecting read and write latency.

On the issues mentioned above, we have adopted a targeted strategy. For example, disk balancing at the application layer, migration pipeline acceleration, support for migration cancellation, and consumer asynchrony. RAID card acceleration at the system level, Cgroup isolation optimization, etc. In addition, in response to the problem of insufficient random read and write performance of HDDs, we also designed and implemented an SSD-based cache architecture.

2. Application layer

1) Disk balancing

Figure 2-2 Kafka application layer disk balancing

Disk hotspots cause two issues:

Real-time read and write latency becomes higher, for example, TP99 request processing time exceeds 300ms, which may cause consumption delay problems and data collection congestion problems in real-time jobs.

The overall utilization of the cluster is insufficient, and although the cluster capacity is very abundant, some disks have been written full, which may even cause some partitions to stop serving.

For these two issues, we adopted a partitioned migration plan based on idle disk prioritization, which is divided into 3 steps and managed by component Rebalancer:

Generate a migration plan. Rebalancer continuously generates a specific partition migration plan through target disk utilization and current disk utilization (reported through Kafka Monitor).

Submit the migration plan. Rebalancer submits the migration plan just generated to the Reassigner node of Zookeeper, and the Kafka Controller will submit the Reassign event to the entire Kafka Broker cluster after receiving this Reassign event.

Review the migration plan. Kafka Broker is responsible for performing the specific data migration tasks, and Rebalancer is responsible for checking the progress of the tasks.

As shown in Figure 2-2, each Disk holds 3 partitions is a relatively balanced state, if part of the Disk holds 4 partitions, such as Broker1-Disk1 and Broker4-Disk4; Some Disks hold 2 partitions, such as Broker2-Disk2 and Broker3-Disk3, and Reblasecer will migrate the excess partitions on Broker1-Disk1 and Broker4-Disk4 to Broker2-Disk2 and Broker3-Disk3 respectively, ultimately ensuring that the overall disk utilization is as balanced as possible.

2) Migration optimization

Although partition migrations based on idle disk prioritization achieve disk balancing, the migration itself still has efficiency issues and resource competition problems. Next, we will describe in detail the targeted strategies we have adopted.

Adopt a pipeline acceleration strategy to optimize migration efficiency problems caused by slow migration.

Supports migration cancellation to resolve read and write requests affected by slow migration of long-tail partitions.

Take Fetcher isolation to alleviate the problem of data migration requests and real-time read and write requests sharing the Fetcher thread

Optimization 1: Pipeline acceleration

Figure 2-3 Pipeline acceleration

As shown in Figure 2-3, the native Kafka version above the arrow only supports batch submission, for example, a batch of four partitions is submitted, and when the TP4 partition has been stuck and cannot be completed, all subsequent partitions cannot continue. After using Pipeline Acceleration, you can continue to commit new partitions even if the TP4 partition has not yet been completed. In the same time, the original solution was blocked by TP4 not completed, all subsequent partitions could not be completed, in the new scheme, TP4 partitions have been migrated to TP11 partitions. The dotted line in the figure represents an unordered time window, which is mainly used to control concurrency, in order to be consistent with the original number of batches committed by group, so as to avoid excessive migration affecting the read and write request service.

Optimization 2: Migration cancellation

Figure 2-4-1 Migration issues

As shown in Fig. 2-4-1, the left side of the arrow describes the three types of lines affected by migration. The first is because the migration will trigger the oldest read, synchronize a large amount of data, in this process will first brush the data back to the PageCache caused by PageCache pollution, resulting in a real-time read partition Cache Miss, triggering the disk degree and then affecting read and write requests; The second is that when there are some abnormal nodes that cause the migration hang to be lived, some O&M operations cannot be performed, such as the topic automatic partition expansion triggered by traffic rise. This is because such operations are prohibited during the Kafka migration. The third type is similar to the second type, its main problem is that when the target node crashes, the topic expansion cannot be completed, and the user may have endured the read and write requests being affected.

Figure 2-4-2 Migration canceled

For the 3 issues mentioned above, we support the migration cancellation feature. The administrator can call the migration cancellation command to interrupt the partition being migrated, and for the first scenario, PageCache will not be polluted, and real-time reading can be guaranteed; In the second and third scenarios, the partition expansion is completed because the migration is canceled. Migration cancellation deletes partitions that have not completed migration, and deletion may cause disk IO bottlenecks affecting reads and writes, so we avoid performance problems caused by a large number of deletions by supporting smooth deletion.

Optimization three: Fetcher isolation

Figure 2-5 Fetcher isolation

As shown in Figure 2-5, green represents real-time reading and red represents delayed reading. When a Follower’s real-time read and time-lapse read share the same Fetcher, the time-lapse read affects the real-time read. Because the amount of data read per delay reading is significantly greater than that of real-time reads, and the delay reading is easy to trigger disk reads, the data may no longer be in the PageCache, which significantly slows down Fetcher’s pull efficiency.

In response to this problem, the strategy we implemented is called Fetcher isolation. That is to say, all ISR followers share Fetcher, and all non-ISR followers share Fetcher, which ensures that real-time reads in all ISRs will not be affected by non-ISR backreadings.

3) Consumer asynchrony

Fig. 2-6 Kafka-Broker phased delay statistical model

Before describing Consumer asynchrony, it is necessary to explain the Kafka-Broker phased delay statistical model shown in Figure 2-6 below. The Kafka-Broker side is a typical event-driven architecture where components communicate through queues. When the request is transferred by different components, the timestamp is recorded in turn, and finally the execution time of the request at different stages can be calculated.

Specifically, when a Kafka Producer or Consumer request enters Kafka-Broker, the Processor component writes the request to the RequestQueue, and the RequestHandler pulls the request from the RequestQueue for processing, and the wait time in the RequestQueue is RequestQueueTime. The specific execution time of RequestHandler is LocalTime. When the RequestHandler finishes executing, the request is passed to the DelayedPurgatory component, which is a time-lapse queue.

When a delay condition is triggered, the request will be written to the ResponseQueue, and the time that the DelayedPurgatory queue lasts is RemoteTime, and the Processor will continue to pull the data out of the ResponseQueue and send it to the client, and the red ResponseTime may be affected by the client, because if the client receives insufficient capacity, Then ResponseTime will continue to increase. From Kafka-Broker’s point of view, the total time required per request, RequestTotalTime, includes the sum of the phased timing of all the processes just now.

Figure 2-7 Consumerization asynchronously

The main problem with the continued increase in ResponseTime is due to a flaw in Kafka’s native Consumer’s NIO-based single-threaded model. As shown in Figure 2-7, in Phase1, User initiates the Poll request first, Kafka-Client sends the request to Broker1, Broker2, and Broker3 at the same time, and when Broker1’s data is ready, the Kafka Client writes the data to the CompleteQueue and returns it immediately, instead of continuing to pull the data from Broker2 and Broker3. Subsequent poll requests read data directly from the CompleteQueue and then return directly until the CompleteQueue is emptied. Even if the data on the Broker2 and Broker3 sides is ready until the CompleteQueue is emptied, it will not be pulled in time. As shown in Phase2 in the figure, due to the defect of the single-threaded model, the duration of the WaitFetch part becomes larger, resulting in the continuous increase of the RespnseTime delay index of Kafka-Broker, which brings the problem that the processing bottleneck of the server cannot be accurately monitored and segmented.

Figure 2-8 Introducing an asynchronous pull thread

In response to this problem, our improvement is the introduction of asynchronous pull threads. The asynchronous pull thread pulls the ready data in time to avoid the server-side delay indicator being affected, and the native Kafka does not limit the number of partitions pulled at the same time, we have done the speed limit here to avoid GC and OOM. Asynchronous threads continuously pull data in the background and put it into the CompleteQueue.

3. System layer

1) RAID card acceleration

Figure 2-9 RAID card acceleration

HDDs have the problem of insufficient random write performance, which is manifested as increased delay and reduced throughput. For this problem we introduced RAID acceleration. RAID card comes with its own cache, similar to PageCache, in the RAID layer will merge the data into a larger block to write to the Disk, more fully utilize the bandwidth of the sequential write HDD, with the help of the RAID card to ensure random write performance.

2) Cgroup isolation optimization

Figure 2-10 Cgroup isolation

In order to improve resource utilization, Meituan Data Platform deploys a mix of IO-intensive applications and CPU-intensive applications. IO-intensive applications here refer to Kafka, and CPU-intensive applications here refer to Flink and Storm. However, there are two problems with the original isolation strategy: the first is that there will be resource competition in the physical core itself, and under the same physical core, there is competition between the shared L1Cache and L2Cache, which will cause Kafka read and write delay to be affected when the real-time platform CPU soars; Secondly, Kafka’s HT crosses NUMA, increasing the memory access time, as shown in Figure 2-10, across the NUMA node is through QPI to do remote access, and this remote access time consumption is 40ns.

In response to these two problems, we have improved the isolation strategy, for the resource competition of the physical core, our new mixing strategy guarantees that Kafka has the exclusive physical core, that is, in the new isolation strategy, there is no same physical core used by Kafka and Flink at the same time; Then there is the NUMA that ensures that all of Kafka’s hyperthreads are on the same side, avoiding the access latency that Kafka brings across NUMA. With the new isolation strategy, Kafka’s read and write latency is no longer affected by the Flink CPU spike.

4. Hybrid layer – SSD new cache architecture

Figure 2-11 Performance problems caused by Page contamination

1) Background and challenges

Kafka uses the ZeroCopy technology provided by the operating system to process data read requests, and when PageCache is full, the data is copied directly from PageCache to the network card, effectively reducing the read delay. But in reality, PageCache’s capacity is often insufficient because it won’t exceed the memory of a machine. When the capacity is insufficient, ZeroCopy triggers disk reads, which not only become significantly slower, but also pollute PageCache and affect other reads and writes.

As shown in the left half of Fig. 2-11, when a delayed consumer pulls data and finds that the data it wants in the PageCache does not have the data it wants, a disk read is triggered. After the disk reads, the data will be written back to the PageCache, resulting in PageCache pollution, delaying the consumer consumption delay to slow down and also causing another real-time consumption to be affected. Because for real-time consumption, it is always reading the latest data, the latest data should not normally trigger a disk read.

2) Selection and decision-making

In view of this problem, we provide two solutions when doing program selection:

Solution 1, do not write back the PageCache when reading the disk, such as using DirectIO, but Java does not support it;

Option two, introduce a middle layer between memory and HDD, such as an SSD. As we all know, SSDs have good random read and write capabilities compared with HDDs, which is very suitable for our use cases. We also have two options for SSD solutions:

Scheme two: based on the application layer of Kafka to implement, specifically Kafka’s data is stored on different devices according to the time dimension, for near real-time data directly placed on the SSD, for the older data directly placed on the HDD, and then Leader directly according to the Offset from the corresponding device to read the data. The advantage of this scheme is that its cache strategy fully considers the read and write characteristics of Kafka, ensuring that all near-real-time data consumption requests fall on the SSD, ensuring the low latency of this part of the request processing, and at the same time the data read from the HDD does not brush back to the SSD to prevent cache pollution, and because each log segment has a unique and clear state, the purpose of each request is clear, and there is no additional performance overhead caused by Cache Miss. At the same time, the disadvantages are also obvious, requiring improvements in the server-side code, involving a large amount of development and testing work.

Figure 2-13 KafkaSSD new cache architecture

3) Specific implementation

Let’s introduce the specific implementation of the new SSD cache architecture.

First, the new cache schema stores multiple segments within a log on different storage devices in time dimensions.

As shown in the red circle 1 in Figure 2-14, the new cache architecture data will have three typical states, one is called Only Cache, which means that the data has just been written into the SSD and has not been synchronized to the HDD; The second is Cached, which means that the data is synchronized to the HDD and part of it is cached on the SSD; The third type, called WithoutCache, refers to synchronization to the HDD but there is no longer a cache in the SSD. In addition, search for the public account Internet architect background reply “9” to get a surprise package.

Then the background asynchronous thread continuously synchronizes the SSD data to the HDD.

As SSDs continue to write, when the storage space reaches the threshold, the data that is oldest from the current time is deleted in chronological order because the SSD’s data space is limited.

Replicas can flexibly enable whether to write to the SSD according to availability requirements.

Data read from the HDD will not be brushed back to the SSD, preventing cache contamination.

Figure 2-14 Optimization of the new cache architecture of SSD

4) Detail optimization

After introducing the specific implementation, let’s look at the details of optimization.

The first is about log segment synchronization, that is, the segment just mentioned, only synchronizing Inactive’s log segment, Active refers to the log segment that is not written now, and solves the data consistency problem at low cost.

The second is to do synchronization speed limit optimization, which is required to limit the speed when the SSD is synchronized to the HDD, and at the same time protects the two devices and will not affect the processing of other IO requests.

Third, large-scale cluster management optimization

1. Isolation strategy

Meituan’s big data platform’s Kafka serves multiple businesses, and if the topics of these businesses are mixed together, it is likely to cause different topics of different services to affect each other. In addition, if the Controller node also undertakes data read and write requests, when the load becomes significantly higher, the Controller may not be able to control class requests in time, such as metadata change requests, which may eventually cause the entire cluster to fail.

In response to these interrelated issues, we do isolation optimization from three dimensions: business, role, and priority.

Figure 3-1 Isolation optimization

The first point is service isolation, as shown in Figure 3-1, each large business will have a separate Kafka cluster, such as takeaway, store, and preferred.

The second point is role-segregation, where Kafka’s Broker and Controller, and the component they depend on, Zookeeper, are deployed on different machines to avoid mutual influence.

The third point is priority, some business topic availability level is particularly high, then we can divide it into VIP clusters, give it more resource redundancy to ensure its availability.

2. Full link monitoring

With the growth of cluster scale, cluster management has encountered a series of problems, mainly including two aspects:

1) The Broker terminal delay indicator cannot respond to user problems in a timely manner.

As the volume of requests grows, neither the TP99 or even the TP999 latency metrics currently offered by Kafka may be able to reflect long-tail delays.

The latency indicator on the Broker side is not an end-to-end indicator and may not reflect the real problem of the user.

2) Fault awareness and processing are not timely.

Figure 3-2 Full link monitoring

In response to these two problems, the strategy we adopted is full link monitoring. Full-link monitoring collects and monitors metrics and logs for Kafka’s core components. The full-link monitoring architecture is shown in Figure 3-2. When a client read and write request becomes slow, we can quickly locate which link is slow through full-link monitoring, and the full-link metric monitoring is shown in Figure 3-3.

Figure 3-3 Monitor the full link metrics

Figure 3-4 is an example of locating request bottlenecks based on full-link metrics, and it can be seen that RemoteTime accounts for the highest proportion of servers, which indicates that the time consumption is mainly spent on data replication. The log and indicator parsing service can automatically sense the failure and slow node in real time, most of the faults (memory, disk, RAID card and network card, etc.) and slow nodes have supported automatic processing, and there is a type of failure that is unplanned, such as the unavailability caused by multiple copies of the partition, the migration of hanging and unexpected error logs, etc., which need to be manually involved in processing.

Figure 3-4 Example of full-link monitoring metrics

3. Service life cycle management

Figure 3-5 Service lifecycle management

The server scale of Meituan Online Kafka is in the 10,000 level, and with the growth of service scale, our management of services and machines themselves is also constantly iterating. Our automated O&M system can handle most machine failures and service slow nodes, but the management of machines and services themselves is fragmented, resulting in two types of problems:

There is ambiguity in the semantics of the state, which cannot truly reflect the state of the system, and it is often necessary to use logs and indicators to find out whether the real system is healthy or abnormal.

The status is not comprehensive, the abnormal case needs to be manually intervened and handled, and the risk of misoperation is extremely high.

To address these two types of issues, we have introduced lifecycle management mechanisms to ensure that the state of the system is truly reflected. Lifecycle management refers to the whole process management from the start of service operation to the end of the machine scrapping service, and the service status and machine status are linked, without the need for manual synchronization of changes. Moreover, the status change of the new lifecycle management mechanism is triggered by specific automated O&M, which prohibits manual changes.

4. TOR disaster recovery

Figure 3-6 TOR disaster recovery challenge

From the perspective of engineering implementation, we summarize the basic paradigm of the current mainstream graph neural network model, and implement a common framework to cover a variety of GNN models. The following are discussed separately by the type of graph (homogeneous map, heterogeneous map, and dynamic map).

Figure 3-7 TOR disaster recovery

TOR disaster recovery guarantees that different replicas of the same partition are not under the same Rack, as shown in Figure 3-7, even if Rack1 fails entirely, it can guarantee that all partitions are available.

Fourth, the future outlook

In the past period of time, we have made a lot of optimizations around reducing the read and write latency on the server side, but there is still some work to be done in terms of high service availability. In the coming period, we will focus on improving robustness and narrowing fault domains through isolation mechanisms of various granularities. For example, let the client actively avoid some failed nodes, isolate abnormal requests through multiple queues on the server side, support hot disk on the server side, active anti-pressure and flow restriction at the network layer, and so on.

In addition, with the overall development of Meituan’s real-time computing business, it is increasingly difficult to meet the needs of the business in the mode of hybrid deployment of real-time computing engines (typically such as Flink) and streaming storage engines (typically such as Kafka). Therefore, we needed to deploy Kafka independently while keeping the current costs constant. This means that fewer machines (in our business model, 1/4 of the original machines) need to be used to carry the same traffic volume. How to carry business requests with fewer machines while ensuring service stability is also one of the challenges we face.

Finally, with the advent of cloud-native trends, we are also exploring the path to the cloud for streaming storage services.

End of body

1. Please don’t use SpringMVC, it’s too low! Spring has officially announced a more awesome alternative framework!

2. Build a startup backstage technology stack from scratch

3. What platform can programmers generally pick up private jobs?

4. Spring Boot+Redis+interceptor+Custom Annotation implements automatic power of the interface

5. Why can’t the domestic 996 do the foreign 955?

6. What is the level of China’s railway booking system in the world?                        

7.15 pictures to understand the difference between being busy and being efficient!