Development status and scale

At this stage, our overall architecture can be divided into 5 layers, and the data flows from the bottom up, as shown in the figure above.

The data collection layer is mainly responsible for collecting various types of data, and the source of data is divided into two categories, one is the buried point and business log and the service log, which are collected through the LCS Agent, and the other is the database data collected into the message queue through Binlog or Checkpoint data integration. The compute layers, dominated by Flink and Spark, process them and eventually store them in various storage and query services for business use. Flink is playing an increasingly important role in real-time and quasi-real-time processing at the computing layer, especially Flink+Iceberg data lake technology, which is making stream-batch integration a reality.

At present, our cluster is running more than 3000 jobs, the main version is 1.12, 1.14 version has also been merged online, the average daily processing of 10 trillion + messages, petabytes of data volume, peak data 200 million / s, running in more than 10 clusters at home and abroad, using more than 45,000 CPU cores, memory usage of more than 200tb.

In the process of data processing at this scale, we encountered many problems.

Job memory consumption is uncontrollable, on Yarn mode is very prone to Yarn container OOM kill, resulting in container lost, causing frequent job restarts, including in-frame restarts.

On Yarn mode can not support the automatic smooth restart of the job, in the process of machine over-protection, offline, machine room migration, etc., can only trigger failover.

Real-time jobs are sensitive to loads, and machine performance needs to be guaranteed during start-up and operation to avoid the impact of offline and online mixing.

Checkpoint, as a guarantee for the consistency of Flink’s stateful computing data, has stability issues.

The historyserver’s default cleanup policy is set poorly, resulting in large disk space and slow access.

When a job is abnormal, it is difficult to determine the cause and node of the exception, and it is necessary to view a large number of job logs, which makes troubleshooting difficult.


Stability optimization and practice

The first is the optimization of Yarn container lost. Flink JobManager will first request resources from Yarn reCheckpointmanager, Yarn reCheckpointmanager assigns resources to the application and returns the allocation information to JobManager, and then JobManager will start taskmanager based on the allocation information and make it heartbeat with JobManager.

JobManager includes JobMaster and reCheckpointmanager, which proactively sends heartbeat requests to detect if taskmanager is alive. If taskexecutor is accidentally killed for some reason, JobManager logs prompt container lost.

The above figure is one of the hints of the container lost phenomenon, which generally appears more often in the old version of Flink.

The image above is another hint of the container lost phenomenon.

When container loss occurs, if you look at the log before and after the exception in Yarn’s nodemanager or JobManager, you can generally see a log like beyond the physical memory limit, which indicates that it was killed by Yarn because of the physical memory usage overrun.

Yarn Nodemanager will start a containersmonitor thread that periodically scans the container memory footprint on Nodemanager to isolate memory resources.

In response to these problems, we have optimized it in the form of Cgroup + JDK upgrade + Jemalloc. One might ask why a JDK upgrade is required? This is because older versions of the JDK use Jemalloc have thread deadlocks, and upgrading the latest JDK can also avoid other JDK bugs, which are usually not easy to find and reproduce.

Cgroup’s way is mainly to turn on the memory soft limit, which limits the memory of the container is no longer based on the memory request amount of a single container, but the amount of memory of the entire Nodemanager. At this time, if there is still surplus memory on NodeManager, the container with overused memory can continue to use this surplus memory. The oom event is triggered if multiple Container memory overages exist on one node at the same time and cause the entire node memory to reach the upper limit. Oom listener listens for this event and judges that if the total memory of the node is reached, it will select the job with the shortest start time and the lowest priority to trigger oom kill if the actual memory consumption exceeds the requested amount and the shortest start time.

However, Cgroup only solves the problem of container being frequently lost by Yarn oom to some extent, and does not completely solve it. In the process of use, there are still some containers where the memory usage continues to rise and eventually be killed by cgroup oom, and then we find that the problem may be related to glibc’s memory allocation bug, long-running processes will have consecutive multiple blocks of anon blocks with a size of 65536, so our final solution is as follows:

Use Cgroup to solve the problem of temporary memory overuse, such as RocksDB’s memory restrictions are not strict, small white users set and use memory incorrectly, etc. caused by problems, and then upgrade the JDK version, solve the thread deadlock bug when Jemalloc allocation, and finally switch Jemalloc to solve the 64M anon allocation bug under the Linux system.

After a series of optimizations, as can be seen from the above figure, the frequency of container lost has been reduced from nearly 5,000 times per month to less than 100 times, and the abnormal job restart caused by Yarn oom kill has been reduced by more than 90%, with remarkable results.

The second optimization practice is the smooth restart function of the node, the streaming operation is a long-running operation, because most of it runs on a cheap machine, so the machine over-warranty, hardware failure, maintenance offline, machine room migration and so on are more common. In order to prevent possible hidden dangers in advance, avoid the impact of framework restart, improve the stability of jobs in the cloud environment, and solve the problem caused by excessive recovery time in Yarn mode, we have developed a smooth restart function for jobs.

After adding a node to exclusion, the Flink recheckpoint manager gets the decommission information, parses the information to get the corresponding node, and determines whether the container currently running the task is running on the node being decommissioned. If so, stop by calling the task’s JobManager stop with savepoint interface. The platform automatically detects the running status of the task, and if a job is not stopped through the platform, the platform automatically pulls the task back up and the job resumes from savepoint. This process triggers periodically and merges them in batches before processing, avoiding instantaneous load stress caused by frequent triggering of messages. In addition, both the node and the container are deduplicated to avoid triggering the same task multiple times to affect stability. In addition, its trigger period is much smaller than the downline period set by sre when it goes down the line node, which also alleviates the O&M pressure.

JobManager will start the metric collection monitoring thread, and periodically collect the node’s CPU, memory, disk io and network io metrics, and then aggregate into a collection of metrics, through the dynamic metric rules to determine the metrics, if the conditions are met, it will be added to the node blacklist, so that the application’s container will no longer run on this node. If a node is blacklisted by multiple applications, it indicates that there may be a problem with the node, and the job restart is automatically triggered smoothly and monitored and alarmed to automatically find possible abnormal nodes.

The diagram above is the approximate flow of Flink Checkpoint, which triggers the Checkpoint Operator to Checkpoint, which generates and broadcasts the Checkpoint Barrier downstream, and then Snapshot State. The Checkpoint Operator performs the ACK after completing the Checkpoint, and the downstream node receives the Checkpoint Barrier and does the corresponding processing according to whether it wants to align, and then enters the Checkpoint logic. All nodes send a completion notification to all Operators participating in Checkpoint after Checkpoint Coordinateor ack, indicating that the Checkpoint has been completed, and then send a completion notification to all Operators participating in Checkpoint, and finally the Operator makes the final commit operation.

Problems encountered during Checkpoint include the following:

Disk full or other io exceptions can cause Checkpoint to fail to trigger for a long time, but the exception information only exists in the JobManager’s logs and does not affect the normal execution of the job, resulting in potential hidden dangers that are not easily perceived.

Due to logical changes, adjustment concurrency, rescheduling, etc., jobs do not resume from Checkpoint by default when restarting, resulting in loss of state or message backlog.

Checkpoint small files are too many when there are large concurrency degrees, which causes a lot of HDFS RPC load pressure.

Recovery conflicts caused by user misconfiguration of the Checkpoint directory are not easy to control and troubleshoot.

In response to the above problems, we have also made some optimizations.

For disk full, io exceptions, Kerberos file corruption, we will catch the exception stack, judge and retry according to the exception stack, and increase the failure count of Checkpoint in case of failure, more than a certain number of times to restart within the framework, or send an alarm to the user to ensure that the job will not have a long Checkpoint failure and resume from a very old Checkpoint.

The optimization for the problem of not being able to recover from Checkpoint when a job is restarted is to set the default number of reservations for each job and to make a temporary Checkpoint Metadata file when Checkpoint is made, and only rename it as an official file when Finalize. Then sort all Checkpoint files in descending order of last modified time, looking for the official Checkpoint Metadata file. If successful, it is a complete Checkpoint file that can be used for recovery.

In this setting, it is necessary to ensure that the last modification time of the file is correct. To this end, we set the task finish not to delete the Checkpoint file by default, and the task not to discard the latest Checkpoint file by default when doing Savepoint to ensure the correctness of the last modification time of these two types of files. In this way, the task is automatically restored from the most up-to-date, complete state, with as little data and state as possible that need to be reprocessed. In addition, if the task has found the latest, complete Checkpoint and can be used for recovery, this indicates that the previous Savepoint and Checkpoint can already be cleaned up, thereby reducing the footprint.

So we clean up the full amount of Savepoint by setting the lifecycle for Savepoint; For incremental Checkpoint, in order to avoid clearing out the state in use, the contents of its Metadata file are read first, the parent folder corresponding to the status file used in it is retained, and the rest is cleaned up, so as to ensure that the number of files and space occupation are minimized without affecting state recovery.

For the problem that the user arbitrarily configures the Checkpoint directory to cause state recovery conflicts and causes load pressure, by adding the job name and timestamp to the Metadata file, the current job name is different from the stored job name and the alarm information will be prompted, and the timestamp of the restored Checkpoint will be significantly different from the current time, and there will also be alarm information.

Small files are a common problem with HDFS, and because HDFS is suitable for storing large chunks of files, small files must be optimized to improve performance and stability. The method is to merge small file writes during Checkpoint, such as writing multiple small files into the sequence file to form a large file, which may waste space, but has a more obvious effect on reducing the load pressure of HDFS Namenode.

In addition, through the federal cluster method, using multiple Namenodes to balance the RPC request load, each Namenode is a relatively independent service, and then standardize its Checkpoint directory for user jobs, so that its access can be balanced to multiple Namenodes, and then read the old HDFS files by mounting the form of a table, and gradually realize the automatic migration to the new unified specification directory.

Next, let’s introduce a case from Xiaomi Data Acquisition Service, which shows their very simple architecture diagram, mainly to collect the buried points and data of multiple source-side SDKs into a message queue, and then use Flink for ETL, and finally store it in Doris and display it on the Kanban.

At present, the service has access to 750+ domestic and foreign businesses, processing an average of 160 billion+ messages per day. By employing Checkpoint-related optimizations, RPC latency has been reduced by approximately 40% and small files have been reduced. At the same time, when the job starts and stops through stop with savepoint, the correctness of the recovery is guaranteed, and the semantics of exactly once are ensured.


Operational optimization practices

Flink Historyserver is very effective for job operations, especially it can view job statistics after the job is stopped, if the job exits abnormally or the processing result is problematic, and we can’t view the relevant logs in time for some reasons, we can view it through the Historyserver in the future.

Flink Historyserver will get the ID of the last cleaned up job that has been cached during each scheduled cleanup, and then get the history log information that has been packaged this time, and then determine whether the history log has exceeded the configured maximum value, if so, it will directly perform the cleanup of the subsequent history log, otherwise it will determine whether the last cached job exists in the current history log, and if it does not exist, it will also perform the cleanup.

However, there are a series of problems in the above process, one is that the service restart will cause the current cache of downloaded job information to be lost, if the history log of the job is also lost between the restart, it will form a floating cache job, and the local cache job will exist for a long time and cannot be cleaned. The currently packaged historical log information does not support expiration, resulting in a large number of logs remaining on HDFS and local disks, and will exist for a long time, which not only affects the speed of access, but also causes a large waste of disk space. The maximum value of the cached job history log is difficult to determine, and if an exception occurs in the underlying service such as HDFS, it will cause a large number of failures at the same time and flush out the valid log. In addition, the current default does not record the log on Taskmanager, which is very unfavorable to exception troubleshooting.

We have also optimized for the above problems.

One is to read the job history log information that has been cached to the local disk and compare it with the history log record, so as to avoid floating cache jobs; The maximum retention time of the supported history log will be cleaned up beyond its life cycle, which is more scientific and reasonable than the maximum number of historical logs currently supported; In addition, we also support the packaging and cleaning of Taskmanager and Container historical data, and more comprehensively record the information of the job when it exits abnormally, so as to facilitate troubleshooting.

The full-link heartbeat monitoring function of the job is mainly to monitor the link delay of the job, the implementation is to insert a special tag in the Stream Checkpoint, the tag information includes the name of the job, the current time, the name is generated by op+operator in the index of the entire link and the index of the subtask in the operator, non-Checkpoint node will update the name after receiving the tag. And subtract the time of Checkpoint insertion from the current time to generate the time from Checkpoint to the subtask, and report to Metrics Reporter, and finally calculate these metrics, in this way you can find abnormal nodes in the link, monitor the abnormal loss of data in the job, and also estimate its impact by the insertion frequency of heartbeat information.

When the heartbeat tag encounters multiple downstream links, it is not randomly selected links, but is broadcast to multiple links at the same time, so there may be too much heartbeat monitoring tag information, which affects the processing performance of normal operations.

The restful interface dynamic start-stop monitoring function not only enables dynamic start-stop heartbeat monitoring, but we found that there are other scenarios that can also benefit from this function, so we extended it. A simple code modification allows it to support dynamic tuning of other configurations, including Checkpoint configurations such as Checkpoint periods and timeouts, the level of dynamic logs, and so on.

When a job has a performance or Checkpoint problem, you can dynamically open it through the restful interface and stop dynamically after the problem is determined, so that you can solve the problem of too much heartbeat information. In the case of load bursts and short-term data skew causing Checkpoint to time out, dynamically adjusting the Checkpoint timeout can avoid jobs failing due to Checkpoint timeouts, and it can also avoid the dead-end loop of more data backlog and more severe data skew problems due to Checkpoint prolonged unsuccessful timeouts. At the same time, it can also be used to determine the timeout time, and the user can continuously test the most suitable timeout time for the job by dynamically adjusting the way, reducing the number of job start-stop times in the stress test process. It also supports tweaks for other configurations, such as dynamically adjusting the log level, but it should be noted that the adjusted configuration is not persisted and will fail due to a framework restart or job restart.


Future planning

Upcoming events

PC live viewing: https://developer.aliyun.com/live/250170

Mobile recommends following the Apache Flink video number to schedule a viewing

490,000 bonuses are waiting for you to take!

👇 Scan the code to register for the competition 👇

   Click “Read the original article” to view the video &PPT