Abstract: This article is compiled by community volunteer Miao Wenting, and the content is from Alibaba technical expert Gao Yun (Yunqian) on May 22 at Flink Meetup in Beijing Station “Flink Runtime and DataStream API Optimization for Streaming and Batch Integration”. The content includes:
  1. reviewing the design of Flink’s integrated flow batch

  2. Introducing optimization points for the runtime

  3. Describes a summary of optimization points for the DataStream API

  4. and some subsequent planning

Tips: ClickRead Original at the end of the article to view more technical dry goods~
 GitHub address >
Welcome everyone to like Flink and send star~

I. Flink with integrated flow batch

1.1 Architecture Introduction

First, let’s look at the overall logic of Flink flow batch integration. In the early days, although Flink was a framework that could support both stream processing and batch processing, its implementation of stream processing and batch processing, whether it was in the API layer or the Shuffle, scheduling, and operator layers at the bottom, were two separate sets. The two implementations are completely independent and not particularly closely related.

Under the guidance of the goal of stream-batch integration, Flink has now unified abstraction of the underlying operator, scheduling, and shuffle, and supports the DataStream API and Table API in a unified way. The DataStream API is a more physical interface, and the Table API is a Declearetive interface, and both sets of interfaces are unified for streams and batches.

1.2 Pros

< span src="https://img.alicdn.com/imgextra/i4/O1CN01d4PXq81OUJAUDB9Ld_!!6000000001708-2-tps-2260-1270.png" >

Based on the DataStream API and Table API, users can write the same set of code to process both historical data and real-time data, such as data reflow scenarios.
Unified Connector and operator implementation to reduce development and maintenance costs.
Reduce learning costs and avoid learning two sets of similar interfaces.
Reduce maintenance costs by using the same system to support both flow and batch jobs.

1.3 Data processing process

The following briefly introduces how Flink abstracts the integration of flow and batch, and Flink splits the job into two types:

This kind of job is what we usually know as a flow job, for this kind of job, Flink adopts a standard stream execution mode, which needs to consider the recording time, through Watermark The alignment method advances the time of the entire system to achieve some data aggregation and output, and the intermediate state is maintained by the State in the middle.

The data may be saved in a file or otherwise preserved in advance of a limited set of data. At this point, the finite dataset can be regarded as a special case of the infinite dataset, so it can naturally run on top of the previous stream processing mode, and can be directly supported without code modification.

However, the limited data of the limited data set may be overlooked here, and more fine-grained time, watermark and other semantics need to be handled on the interface, which may introduce additional complexity. In addition, in terms of performance, because it is handled in a streaming way, it is necessary to pull up all the tasks at the beginning, which may require more resources, if it is a RocksDB backend, it is equivalent to a large hash table, and in the case of more keys, there may be random IO access problems.

However, in the batch execution mode, the entire data processing process can be realized in an IO-friendly way by sorting. Therefore, the batch mode provides us with greater choice space in scheduling, shuffle, and operator implementation under the premise of considering limited data.

Finally, for finite data streams, regardless of the processing mode, we want the final processing result to be consistent.

1.4 Recent evolution


Flink In recent releases, a lot of efforts have been made towards the goal of streaming-batch integration at both the API and implementation layers.

Flink unifies the Table/SQL API and introduces a unified blink planner, which translates both convection and batches to DataStream operators. In addition, convection and batching introduce a unified shuffle architecture.

A new sort-merge-based shuffle pattern has been introduced for batch shuffle, which will greatly improve performance compared to the previous Flink’s built-in hash shuffle. In terms of scheduling, Flink has introduced a stream-batch integrated scheduler based on Pipeline Region.

Improved the Sort-Merge Shuffle and optimized the performance of the Pipeline Region scheduler for large-scale jobs. Also, as mentioned earlier, for the two execution modes of finite streams, we expect that its execution results should be consistent. But now Flink still has some problems at the end of job execution, which makes it not completely consistent.

So in 1.13, there is also a part of the work for a limited dataset job, how to make its results consistent with the expected results in the stream batch, especially in the stream mode.

You need to continue to complete the limited job consistency guarantee, batch flow switching Source, and gradually deprecating the DataSet API.

2. Runtime Optimization

2.1 Large-scale job scheduling optimization

■ 1. Edge Time Complexity Problem

When Flink submits a job, it generates a DAG graph of the job, consisting of multiple vertices, and the vertices correspond to our actual processing nodes, such as Map. Each processing node will have concurrency, and in the previous Flink implementation, when we submitted the job to JM, JM would expand the job and generate an Execution Graph.

As shown in the following figure, the job has two nodes with concurrency of 2 and 3. In the data structure actually maintained in JM, 2 tasks and 3 tasks are maintained respectively, and composed of 6 execution edges, and Flink maintains the topology information of the entire job based on this data structure. Based on this topology information, Flink can maintain the state of each task individually and identify which task needs to be pulled up when the task is hung up.

If you use this kind of all-to-all communication, that is, when there is an edge between every two upstream and downstream tasks, upstream concurrency * downstream concurrency, the O(N^2) data structure will appear. In this case, the memory footprint is very amazing, if it is 10k * 10k edges, JM’s memory footprint will reach 4.18G. In addition, many of the computational complexity of jobs is related to the number of edges, at this time the spatial complexity is O(N^2) or O(N^3), if it is 10k * 10k edges, the initial scheduling time of the job will reach 62s.

It can be seen that in addition to the initial scheduling, for batch jobs, it is possible to continue executing downstream after upstream execution, and the scheduling complexity in the middle is O(N^2) or O(N^3), which will lead to a large performance overhead. In addition, if the memory footprint is large, the performance of the GC will not be particularly good.

■ 2. The symmetry of the execution graph


aimed at some problems in the memory and performance of Flink under large-scale jobs, and after some in-depth analysis, it can be seen that there is a certain symmetry between the upstream and downstream nodes in the above example.

The types

of “edges” in Flink can be divided into two types:

    one is

  • a Pointwise type, upstream and downstream are one-to-one correspondence, or upstream one corresponds to several downstream, not all connected, in this case, the number of sides is basically linear O(N), and the number of operators is in the same order level.

  • The other is the all-to-all type, where each upstream task must be connected to every downstream task, in which case it can be seen that the dataset generated by each upstream task must be consumed by all downstream tasks, which is actually a symmetric relationship. Just remember that the upstream dataset will be consumed by all tasks downstream, and there is no need to store the middle edge separately.

Therefore, Flink introduced the concepts of ResultPartitionGroup and VertexGroup to upstream datasets and downstream nodes in 1.13. Especially for the all-to-all edges, because the upstream and downstream are symmetrical, you can put all the upstream generated data sets into a group, and put all the downstream nodes into a group, and there is no need to store the relationship between the middle edges in the actual maintenance, only need to know which data set upstream is consumed by which group downstream, or which vertex downstream is the data set of which group upstream is consumed. In this way, the memory footprint is reduced.

In addition, when actually doing some scheduling-related calculations, such as in a batch, if all edges are blocking edges, each node belongs to a separate region. Before calculating the upstream and downstream relationship between regions, for each vertex upstream, you need to traverse all vertices downstream, so it is an O(N^2) operation. With the introduction of ConsumerGroup, it becomes an O(N) linear operation.

■ 3. 

After the optimization of the above data structure, the JM memory footprint can be reduced from 4.18G to 12.08M and the initial scheduling time from 62s to 12s under the condition of 10k * 10k edges. This optimization is actually very significant, for users, as long as they upgrade to Flink 1.13, they can get benefits without any additional configuration.

2.2 Another optimization of sort-merge shuffle

is the optimization of data shuffle for batch jobs. Under normal circumstances, after the batch job is run upstream, the result will be written to an intermediate file first, and then the data will be pulled from the intermediate file downstream for processing.

The advantage of this approach is that it saves resources, does not need to go upstream and downstream at the same time, and in the event of failure, does not need to be executed from scratch. This is a common way to perform batches.

■ Hash Shuffle

So, in the shuffle process, how is the intermediate result saved to the intermediate file and pulled downstream?  

Previously, Flink introduced Hash shuffle, and then take the all-to-all side as an example, the dataset generated by the upstream task will write a separate file for each downstream task, so the system may produce a large number of small files. And whether you use file IO or mmap, writing each file uses at least one buffer, which will cause memory waste. Upstream data files randomly read by downstream tasks will also generate a large amount of random IO.

Therefore, the previous Flink’s Hash shuffle application in batch processing can only compare work in production when the scale is relatively small or when using SSDs. It is a problem on a larger scale or SATA disk.

■ Sort Shuffle

So, in Flink 1.12

and Flink 1.13, a new shuffle based on Sort Merge was introduced. This sort does not refer to sorting the data, but to the task target written downstream.

The general principle is that when the upstream outputs data, a fixed-size buffer will be used to avoid the size of the buffer increasing with the increase of scale, all data is written to the buffer, when the buffer is full, it will be sorted once and written to a separate file, the subsequent data is still based on this buffer to continue to write, and the continuation of the paragraph will be spelled to the back of the original file. Finally, a single upstream task produces an intermediate file consisting of many segments, each of which is an ordered structure.

Unlike other batch frameworks, this side is not based on ordinary outer sorting. The general outer ordering means that these segments will be merged separately again to form an overall ordered file, so that there will be better IO continuity when reading downstream, and the data segment to be read by each task is very small. However, this kind of merge itself also consumes a lot of IO resources, and it is possible that the overhead of the merge time will far exceed the benefits brought by downstream sequential reads.

Therefore, another method is used here: when requesting data downstream, for example, the 3 downstream files in the figure below are to read the upstream intermediate files, and there will be a scheduler to sort the file locations to be read by the downstream request, and realize the entire file by adding IO scheduling at the upper layer The continuity of IO reads prevents a large number of random IOs on SATA disks.

On SATA disks, compared with Hash shuffle, the IO performance of Sort shuffle can be improved by 2~8 times. Through Sort shuffle, Flink batch processing basically reaches the state of production availability, and the IO performance on SATA disks can hit the disk to more than 100M, and the SATA disk can reach a maximum read and write speed of 200M.

To maintain compatibility, Sort shuffle is not enabled by default, and users can control how much downstream concurrency is reached to enable Sort Merge Shuffle. And you can further improve the performance of batch processing by enabling compression. Sort Merge shuffle does not take up additional memory, and the upstream read and write cache now occupies is a piece of the framework.off-heap.

DataStream API Optimization

3.1 2PC & End-to-End Consistency

In order to ensure end-to-end consistency, for Flink streaming jobs, it is implemented through a two-phase commit mechanism, combining some features of Flink’s checkpoint, failover mechanism and external systems.

The general logic is that when I want to do end-to-end consistency, such as reading Kafka and then writing to Kafka, I will write the data to a Kafka transaction first during normal processing, and preCommit when doing checkpoint, so that the data will not be lost again.

If CheckPoint is successful, a formal commit will be conducted. This ensures that the transactions of the external system and the failover inside Flink are consistent, for example, if Flink has a failover and needs to be rolled back to the previous checkpoint, the transaction corresponding to this part of the external system will also be dropped by abort, and if the checkpoint is successful, the commit of the external transaction will also be successful.

Flink’s end-to-end consistency relies on the checkpoint mechanism. However, when encountering finite streams, there are some problems:

    > For jobs with finite streams, after the task ends, Flink does not support checkpoints, such as stream-batch mixed jobs, some of which will end, after which Flink can no longer do checkpoints, and the data will not be submitted again.

    At the end of finite stream data, because the checkpoint is executed

  • timed, there is no guarantee that the last checkpoint will be executed after all the data has been processed, which may cause the last part of the data to fail to commit.

The above will cause the results of the finite flow job flow/batch execution mode to be inconsistent in the flow mode. 

3.2 Support checkpoint after some tasks end (in progress)

Starting from Flink 1.13, it supports checkpoints after some tasks are completed. Checkpoint actually maintains a state list of all tasks for each operator.

After a part of the task ends, the dotted line of the following figure. Flink divides ending tasks into two types:

  • and if all subtasks of an operator have ended, a finished flag is stored for that operator.

  • If an operator has only partial task ends, only the unfinished task state is stored.

    Based on this checkpoint, all operators will still be pulled up after the failover, and if it is recognized that the last execution of the operator has ended, that is, finsihed = true, the execution of this operator will be skipped. Especially for the Source operator, if it has ended, it will not be re-executed to send data later. In the above way, the consistency of the entire state can be guaranteed, even if a part of the task ends, it still goes to the checkpoint.

Flink has also reworked the end semantics. Now there are several possibilities for Flink job to end


    job ends:


  • data is limited, and the finite stream job ends normally;

  • stop-with-savepoint, which ends with a savepoint;

  • stop-with-savepoint –drain, take a savepoint to end, and push the watermark to positive infinity.

Previously, there were two different implementation logics, and both had the problem that the last part of the data could not be committed.

  • for the two semantics of job end and stop-with-savepoint –drain, expect the job to not restart, will be called endOfInput() to the operator, The notification operator does checkpoints in a unified way.

  • For stop-with-savepoint semantics, the job is expected to continue savepoint restart, and endOfInput() will not be called to the operator. A checkpoint will be made later, so that for jobs that will end and will not be restarted, the last part of the data can be committed to the external system.

To summarize

Flink’s overall goals, one of the things is to make a unified platform for efficient processing of finite and infinite data sets. At present, there is basically a preliminary prototype, both in terms of API and runtime. Let’s give an example to illustrate the benefits of flow batch integration.

If you want to change the logic one

day, use stop-with-savepoint to stop the flow, but this change logic also needs to recover the data within the previous two months to ensure the consistency of the results. At this point, you can start a batch of jobs: the jobs are not modified, run to the input data cached in advance, and use the batch mode to revise the data of the previous two months as soon as possible. In addition, based on the new logic, using the savepoint saved earlier, you can restart a new streaming job.

It can be seen that in the entire process mentioned above, if the previous flow batch is separated, it is necessary to develop a separate operation for data revision. However, in the case of integrated stream batching, data can be naturally revised based on the flow operation without additional development by the user.

In subsequent versions of Flink, more scenarios of combining stream batches will be further considered, such as the scenario where users first do batch processing, initialize the state, and then switch to infinite streams. Of course, further optimization and improvement will be made in the separate functions of stream and batch, making Flink a competitive computing framework in terms of stream batch.