click to follow the official account 👆 to explore more Shopee technical practices
catalog 1. Checkpoint problem 2. Introduction to Unaligned Checkpoint 3. Significantly increase UC revenue4. Significantly reduce UC risk5. UC's production practices and future planning in Shopee
Flink serves as a benchmark for big data flow computing, ensuring Exactly Once semantics through Checkpoint and State. In production practice, Shopee encountered a lot of problems with Checkpoints and tried to introduce Flink’s Unaligned Checkpoint to solve them. However, after research, it was found that the performance was not as expected, so it was deeply improved in the internal version, and most of the improvements have been reported to the Flink community.
This article will introduce the problems with Checkpoint, the principle of Unaligned Checkpoint,
Shopee’s improvement of Unaligned Checkpoint, contributions to the Flink community, and internal practices and implementation.
1. Problems with
Checkpoint Flink job backpressure
Severe Checkpoint timeout failure is a common problem in Flink production, and continuous backpressure can cause a checkpoint that has not been successful for a long time.
For example, external query or write performance bottlenecks, CPU bottlenecks, data skews, and other common scenarios during large promotions or peak periods will indirectly cause Checkpoints to fail continuously.
1.2 The impact of continuous failure of Checkpoint on business
1) After consuming half an hour of lag data, dev found that the consumption rate of this half-hour task was slow and did not meet expectations, and wanted to increase the parallelism of tasks and restart to improve consumption power. If the checkpoint keeps failing, you need to recover from the checkpoint half an hour ago, and the data consumed in this half hour will be consumed repeatedly, resulting in the risk of waste of resources and possible duplication of business data.
2) When consuming lag, if the
tolerable-failed-checkpoints (the number of tolerant CP failures is 0 by default) is too low, the Flink job may enter an endless loop (consuming lag causes the job to have severe backpressure, the
backpressure is serious to cause the checkpoint to fail to timeout, and the checkpoint failure causes the job to fail, job Failure leads to consuming more lag), and lag can never be consumed to completion.
3) Unlimited tolerance Checkpoint failure is not an elegant solution, if the toleration is too high:
production problems cannot be found in time;
Some connectors submit data or files when they are checkpointed. If the Checkpoint continues to fail, the data or files cannot be committed for a long time, which causes data delays and transaction timeouts. For example, a Kafka Producer transaction timeout causes the transaction to fail;
Once the job is restarted, a large amount of data will be consumed repeatedly.
4) Business peaks and promotions are similar to consumer lags and encounter the same problems.
1.3 Introduction of Unaligned
Based on the above background, many users hope that when there is a bottleneck in Flink tasks (severe backpressure), Checkpoint can be successful, so the Flink community is here Unaligned Checkpoint (UC) was introduced in FLIP-76.
2. Unaligned Checkpoint Principle When
the backpressure is severe, the Aligned Checkpoint (AC) timeout is mainly due to the barrier being queued in the data stream. When the backpressure is severe, the slow flow of data causes the barrier to flow slowly, which eventually causes the AC to time out.
The core idea of UC is that when data flows slowly, Barrier overtakes the data through some mechanism that allows Barrier to quickly overtake all the way from Source to Sink.
2.2 The UC flow details of the Task
assume that the current task upstream task parallelism degree is
3, and the downstream parallelism degree is 2. After UC starts, the three InputChannels of the Task will receive the Barriers sent upstream.
As shown in the figure, the gray box represents the data in the buffer, InputChannel-0 receives the barrier first, and other InputChannel does not receive the barrier.
When an InputChannel receives a barrier, it directly starts the first stage of UC, which is the UC synchronization phase. Note:
long as any barrier enters the input buffer of the Task network layer, the Task directly starts UC;
There is no need to wait for other InputChannel to receive the barrier, nor do you need to process the data before the barrier in the InputChannel.
As shown in the following
figure, in order to ensure data consistency, the UC synchronization phase Task cannot process data, and the synchronization phase does the following things:
: Snapshot all transcendent input&output buffers;
Call the operator’s snapshotState method;
The Flink engine takes a snapshot of the state inside the operator.
have a few caveats:
When doing UC, the buffer data that Barrier surpasses is directly skipped. In order to ensure that data is not lost, these buffers need to be written to HDFS together with the State, and when restored from the Checkpoint, the data will be consumed;
synchronous phase Task can not process data, in order to minimize the blocking time, the synchronous phase is only a reference to the buffer and state data, and the actual writing of data to HDFS will be done asynchronously;
The last two steps of the UC synchronization phase are exactly the same as AC, and the State inside the operator is snapshotted.
After the UC synchronization phase completes, the Task continues to process data while the second phase of UC is started: Barrier alignment and UC asynchronous phase. The asynchronous phase writes the state and buffer of the shallow copy of the synchronous phase to HDFS.
Why does UC still have Barrier alignment?
When the Task starts UC, there are many InputChannels that do not receive barriers, and there may be network buffers before the barriers of these InputChannels that need to be snapshotted, so the second stage of UC needs to wait for all InputChannel barriers to arrive, and the buffers before Barriers need to be snapshotted. It can be considered that UC needs to write three types of data to HDFS:
all input&output buffers referenced during the synchronization phase;
State inside the operator referenced by the synchronization phase;
Buffers before other InputChannel Barriers after the synchronization phase.
After the asynchronous stage has written all these three parts of data, the file address is reported to the JobManager, and the UC of the current task ends.
Note: In theory, the Barrier alignment of the UC asynchronous phase will be fast. As shown in the above Task, Barrier can quickly surpass all input&output buffers and send Barrier to the downstream Task first, so the upstream Task is similar: Barrier surpasses all upstream buffers and is quickly sent to the current Task.
2.3 Problems in UC Practice
When any of the barriers enter the input buffer of the Task network layer, the Task starts UC directly. The barrier is sent downstream quickly beyond all buffers, so UC is not affected by backpressure. Theoretically: no matter how severe the backpressure, UC Barrier can overtake all the way, quickly flow from Source to Sink, and each Task can quickly complete a snapshot.
The theory is very beautiful, but in the actual research and task use process, we found that the UC effect did not meet expectations:
many scenarios, when the task backpressure is serious, UC still cannot be successful, resulting in a great reduction in the expected benefits of UC;
UC will significantly increase the number of files written to HDFS, which will affect the stability of online services and increase the difficulty of large-scale applications.
UC has some bugs.
The following sections describe these issues, as well as Shopee’s solutions and contributions to the community.
3. Greatly improve
the UC revenue
Task cannot process the checkpoint during the processing of data, and must process the currently processed data and write the result to the OutputBufferPool before checking whether the InputChannel has received the UC Barrier, and if so, starting UC.
If the Task processes a piece of data and writes the result to the OutputBufferPool for more than 10 minutes, UC will still time out. Processing a piece of data is usually not slow, but writing results to the OutputBufferPool can be time-consuming.
From the perspective of OutputBufferPool, the upstream task is the producer and the downstream task is the consumer. Therefore, when the downstream task has a bottleneck, the output result of the upstream task to the OutputBufferPool will be stuck waiting for the buffer and cannot start UC.
To solve this problem, the Flink community introduced a mechanism for reserved buffers in FLINK-14396. The solution idea is: the Task checks the OutputBufferPool for free buffers before processing the data, and continues to wait if there is no free buffer. The detailed process is shown in the following figure.
Wait for the free buffer in the OutputBufferPool to process the data to
ensure that the Task can successfully write the result to the OutputBufferPool after processing the data, and will not get stuck in the data output in step 5. If there is no idle buffer after optimization, the Task will get stuck in step 3 waiting for the free buffer and UC Barrier, where UC can be quickly started when the UC Barrier is received.
3.1 Processing a piece of
data requires multiple buffer scenarios
as shown in the following figure, because only one buffer is reserved, when processing a data requires multiple buffers, the Task may still be stuck in step 5 when processing the data output result to the OutputBufferPool, resulting in the Task cannot process UC.
For example, a single piece of data is
large, flatmap, window triggering, and broadcast watermark are all scenarios that require multiple buffers to process a piece of data, and in these scenarios, the Task is stuck in the data output link in step 5, resulting in poor UC performance. The core idea of solving this problem is how to make the Task not stuck in step 5 but stuck in the waiting link of step 3.
Based on the above problem, Shopee FLIP-227 The idea is that if there is insufficient buffer and the TaskManager has free network memory during data processing, the current TaskOutputBufferPool will overdraft some buffers to TM to complete the data processing step 5.
Note: The OutputBufferPool will only use an overdraft buffer when there is no idle buffer. So once the overdraft buffer is used, when the Task enters the waiting for the barrier and free buffer in the next round step 3, the Task will consider the OutputBufferPool to have no free buffer, and the Task will not continue to process data until all overdraft buffers are consumed by downstream tasks and OutputBufferPool has at least one free buffer.
taskmanager.network.memory.max-overdraft-buffers-per-gate=5 , i.e., each OutputBufferPool of a Task can overdraft 5 buffers to the TM. With the introduction of the overdraft buffer mechanism, when the TM network memory is sufficient, if 5 buffers are required to process a piece of data, UC will not get stuck at all. If the TM has more network memory, you can adjust the parameters to accommodate more scenarios.
Flink-1.16 began to support the overdraft buffer function, involving JIRA are: FLINK-27522, FLINK-26762、FLINK-27789。
3.2 The promotion of Legacy
is divided into two types of Tasks from the source of the data, SourceTask and non-SourceTask:
Non-SourceTask checks the OutputBufferPool before reading data from the InputChannel and is only read if there is a free buffer. If SourceTask does not check the OutputBufferPool for free buffers before reading data from an external component, UC will perform poorly.
Flink has two sources, Legacy Source and New Source:
a push mode, that is, Legacy Source reads data from external components and sends it directly downstream, when the OutputBufferPool has no free buffer, Legacy Source will be stuck and cannot process UC normally.
Legacy Source is
However, almost all of our Flink jobs in production still use Legacy Source, and since Legacy Source has been deprecated by the Flink community and is no longer maintained, improvements have been made to the commonly used Legacy Source within Shopee.
The improvement idea is similar to the above idea: Legacy Source checks that the OutputBufferPool has free buffers and then sends data downstream.
most commonly used FlinkKafkaConsumer in Flink is actually Legacy Source, so many Flink users in the industry still use Legacy Source. We shared an internally improved version of Legacy Source with FLINK-26759.
4. Greatly reduce the risk
After the above optimization, UC can quickly succeed in Legacy Source and scenarios where multiple buffers are required to consume a piece of data, which has achieved the expected effect of some Flink users, but UC still does not meet the standard of mass production. The main reason is that UC will write network buffers to Checkpoint compared to AC, so it introduces some additional risks:
will write more files to HDFS, causing additional pressure on NameNode;
upgraded, if the serialization is incompatible, the data cannot be recovered;
buffer data between operators cannot be restored (for example, from rebalance to forward).
After the schema of the data is
When the connection between operators changes, the
4.1 Unable to switch from
AC to UC
smoothly Users want to avoid these risks and enjoy the benefits brought by UC, so the Flink community has introduced the Aligned checkpoint timeout mechanism, that is, the default checkpoint is AC, and if AC cannot be completed within the specified time, it will switch to UC.
After the introduction of the AC timeout mechanism, the risks of UC are not completely avoided, but in the absence of backpressure on the task, it is still AC, and there is no additional risk. When the backpressure is severe AC failure, switch to UC to ensure that the checkpoint can succeed.
Let’s assume that AC timeout = 1min and Checkpoint
timeout = 5min, that is, Checkpoint still starts with AC, AC fails to switch to UC if it fails for one minute, and the total
checkpoint duration exceeds 5 minutes and fails to time out.
There are three stages in the development of AC timeout, the first two stages do not meet the expected goal, that is: when the time of 1 minute is up, the job still cannot switch from AC to UC, and even 5 minutes cannot be switched to UC, which eventually causes the checkpoint to fail timeout. We can understand these three stages with a goal.
4.2 InputChannel supports switching from AC to UC
FLINK-19680 For the first time, the AC timeout mechanism is supported, and the principle of the first stage is that each Task starts timing from the first Barrier received, and if the AC Barrier alignment time within the Task exceeds AC timeout, the current Task switches from AC to UC.
The problem with this mechanism is that when the number of tasks of the job is large, it takes 10 tasks from Source to Sink. Assuming that the internal barrier alignment time of 10 tasks is 59 seconds, all tasks will not switch to UC, but all 10 tasks need to be aligned, and the total checkpoint duration needs to be at least 590 seconds (greater than 5 minutes), so the final checkpoint still fails to time out.
Based on the Phase One question, FLINK-23041 The principle of the second stage is: the timestamp of the start of the checkpoint is carried in the barrier, and when the InputChannel receives the barrier, how long has passed since the checkpoint is indicated by the current system time minus the checkpoint start time:
if it has exceeded 1 minute , switch directly to UC;
If it is less than 1 minute, subtract the time that the AC has elapsed by 1 minute indicates how long you want to switch to UC later. Set a timer and when the time is up, it will switch to UC.
Compared with phase 1, Phase 2 solves the problem of multiple task time accumulation, as long as the InputChannel receives the Barrier, the AC can be switched to UC at regular intervals if the AC is not completed within the specified time.
4.3 Output buffer support from AC to
After Phase 2 is completed, it can be considered that InputChannel has well supported AC to UC. But the problem is also obvious: output buffer does not support switching from AC to UC.
If the task backpressure is severe and the Barrier is queued in the output buffer, if the Barrier cannot be sent to the downstream Task’s InputChannel within 5 minutes, the Checkpoint will still time out.
Based on this problem, Shopee > FLINK-27251 and
The community designed a benchmark for Checkpoint early on to evaluate the performance of Checkpoint, as shown in the following figure, after merging into the Flink master branch UC performance improved by 11x. 4.4 After the AC timeout mechanism is enabled for UC The additional risk of UC is greatly reduced, and the benefits of UC can also be enjoyed when the backpressure is severe. But in mass production, there are still risks. By default, Flink writes one file per Subtask for buffer, assuming that the task has 10 tasks and each task has a concurrency of 1000, UC may write an additional 10,000 small files. Assume that the Kafka cluster fails or is bottlenecked, and a large number of Flink jobs write Kafka slowly, which will cause a large number of Flink tasks to switch from AC to UC. This situation causes a large number of tasks to instantly write hundreds of thousands of small files to HDFS, which can cause a NameNode avalanche. To solve the small file problem, Shopee > FLINK-26803 and FLINK-22946 when deadlock issues. Shopee In order to avoid the additional risks brought by UC, Shopee internally sets The development page of the Shopee Flink platform has also added a switch for UC, users can choose whether to turn on Unaligned Checkpoint for jobs, there are currently hundreds of Flink tasks to open UC, and the current UC work performance is good, UC can also be successful when counterpressure. of UC, we will continue to pay attention to the problems encountered by users on UC, and after several months of stable operation, we can consider enabling UC for all tasks under the premise of enabling AC timeout. The Shopee build has major changes to the Flink scheduling and network memory modules, which can accurately calculate the network memory required by TM, and will reserve separate memory for UC overdraft buffer in the future. This article is written by Rui and Guichao from the Shopee Data Infrastructure team. Team ProfileThe Shopee Data Infrastructure team focuses on building a stable, efficient, secure, and easy-to-use big data infrastructure and platform for the company. Our business includes: real-time data link support, Kafka, Flink related development; Development and maintenance of Hadoop ecosystem components such as HDFS and Spark; Operation and maintenance of Linux operating system and operation and maintenance of big data components; Development and business support for OLAP components, Presto, Druid, Trino, Elasticsearch, ClickHouse; Development of big data platform system, resource management, task scheduling and other platforms.
small file merging, Flink can use AC when the backpressure is not serious, and smoothly switch to UC when the backpressure is severe.
5. UC production practices and future planning in
aligned-checkpoint-timeout to 1 minute, indicating that the task backpressure is not serious, and if the AC can be completed within 1 minute, AC is used. When the backpressure severe AC cannot be completed within 1 minute, switch to UC.
5.2 In the future planning
The community designed a benchmark for Checkpoint early on to evaluate the performance of Checkpoint, as shown in the following figure, after merging into the Flink master branch UC performance improved by 11x.
4.4 After the AC timeout mechanism is enabled for UC
The additional risk of UC is greatly reduced, and the benefits of UC can also be enjoyed when the backpressure is severe. But in mass production, there are still risks.
By default, Flink writes one file per Subtask for buffer, assuming that the task has 10 tasks and each task has a concurrency of 1000, UC may write an additional 10,000 small files. Assume that the Kafka cluster fails or is bottlenecked, and a large number of Flink jobs write Kafka slowly, which will cause a large number of Flink tasks to switch from AC to UC. This situation causes a large number of tasks to instantly write hundreds of thousands of small files to HDFS, which can cause a NameNode avalanche.
To solve the small file problem, Shopee > FLINK-26803 and FLINK-22946 when deadlock issues.
In order to avoid the additional risks brought by UC, Shopee internally sets
The development page of the Shopee Flink platform has also added a switch for UC, users can choose whether to turn on Unaligned Checkpoint for jobs, there are currently hundreds of Flink tasks to open UC, and the current UC work performance is good, UC can also be successful when counterpressure.
of UC, we
will continue to pay attention to the problems encountered by users on UC, and after several months of stable operation, we can consider enabling UC for all tasks under the premise of enabling AC timeout.
The Shopee build has major changes to the Flink scheduling and network memory modules, which can accurately calculate the network memory required by TM, and will reserve separate memory for UC overdraft buffer in the future.
This article is written by
Rui and Guichao from the Shopee Data Infrastructure team.
Shopee Data Infrastructure team focuses on building a stable, efficient, secure, and easy-to-use big data infrastructure and platform for the company.
Our business includes: real-time data link support, Kafka, Flink related development; Development and maintenance of Hadoop ecosystem components such as HDFS and Spark; Operation and maintenance of Linux operating system and operation and maintenance of big data components; Development and business support for OLAP components, Presto, Druid, Trino, Elasticsearch, ClickHouse; Development of big data platform system, resource management, task scheduling and other platforms.