> this release, one of Flink’s main goals is to make the use of stream processing applications as simple and natural as normal applications. The new passive scaling introduced in Flink 1.13 makes scaling of streaming jobs as easy as other applications, and users only need to modify the concurrency.

This release also includes a number of important changes to better understand the performance of streaming jobs. When the performance of the streaming job is not as expected, these changes allow the user to better analyze the cause. These changes include load and backpressure visualization to identify bottleneck nodes, CPU flame graphs to analyze operator hotspot codes, and State access performance metrics to analyze State Backend status.

In addition to these features, the Flink community has added a number of other optimizations, some of which will be discussed later in this article. We hope that users can enjoy the benefits of the new version and features, and at the end of this article, we will also cover some of the changes to be aware of when upgrading the Flink version.

We encourage users to download and try out the new version of Flink[1] and to report problems through the mailing list[2] and JIRA[3].

First, the important characteristics

of passive expansion and contraction

One of the

initial goals of the Flink project was to make stream processing applications as simple and natural as normal applications, and passive scaling is Flink’s latest development towards this goal.

When considering resource management and parts, Flink has two possible patterns. Users can deploy Flink applications to resource management systems such as k8s and yarn, and Flink actively manages resources and allocates and releases resources as needed. This pattern is useful for jobs and applications that frequently change resource requirements, such as batch jobs and real-time SQL queries. In this mode, the number of workers started by Flink is determined by the concurrency set by the application. In Flink we call this pattern active scaling.

For long-running stream processing applications, a more suitable model is that users only need to start the job like any other long-running service, regardless of whether it is deployed on k8s, yarn or other resource management platforms, and do not need to consider the number of resources that need to be requested. Instead, its size is determined by the number of workers allocated. When the number of workers changes, Flink automatically changes the concurrency of the application. In Flink we call this pattern passive scaling.

Flink’s Application deployment mode[4] opens up efforts to bring Flink jobs closer to normal applications (i.e., starting a Flink job does not require two separate steps to start the cluster and commit the application), while passive scaling accomplishes this goal: users no longer need to use additional tools (e.g. scripts, K8s operators) to get workers is consistent with the app concurrency setting.

Users can now apply autoscaling tools to Flink applications, just like normal applications, as long as they understand the cost of scaling: stateful streaming applications need to redistribute state when scaling.

If you want to try passive scaling, you can add the scheduler-mode: reactive configuration item and start an application cluster (Standalone[5] or K8s[6]). See the documentation [7] for more details.

Analyzing Application Performance

The ability to easily analyze and understand application performance is a critical feature for all applications. This feature is even more important for Flink, which is typically data-intensive (i.e., requires processing large amounts of data) and needs to deliver results with (near) real-time latency.

When Flink applications can’t keep up with data input, or when an app is consuming more resources than expected, these tools can help you analyze why.

■ Bottleneck detection and backpressure monitoring

The first problem to be solved by Flink performance analysis is often: which operator is the bottleneck?

To answer this question, Flink introduced metrics that describe the degree of job busyness (i.e., processing data) and backpressure (inability to continue output due to downstream operators not being able to process results in time). Possible bottlenecks in applications are operators that are busy and backpressure upstream.

Flink 1.13 optimizes the logic for backpressure detection (using task-based mailbox timing instead of stack sampling) and re-implements the UI presentation of job graphs: Flink now shows the degree of busyness and backpressure on the UI by color and value.

■ CPU Flame Graph in Web UI

Flink Another frequently answered question about performance: Which part of the calculation logic in the bottleneck operator consumes a lot of money?

An effective visualization tool for this problem is the flame chart. It can help answer the question:

    > which method is now consuming CPU?

  • What percentage of CPU is used by different methods?

  • What does the stack where a method is called look like?

The flame map is built by repeating the stacks of sampled threads. In the flame diagram, each method call is represented as a rectangle, the length of which is proportional to the number of times this method appears in the sample. An example of a flame graph on the UI is shown in the following image.

The documentation for the flame diagram[8] includes more details and instructions for enabling this feature.

■ State Access Latency Metric

Another possible performance bottleneck is state backend, especially if the job’s state exceeds memory capacity and must use RocksDB state backend[9].

This is not to say that RocksDB is not good enough (we love RocksDB!). ), but it needs to meet some conditions to achieve the best performance. For example, users may easily encounter unintentional problems with RockDB’s IO performance requirements on the cloud due to the use of the wrong disk resource type[10].

Based on the CPU flame graph, the new State Backend’s latency metrics can help users better determine whether the performance is not as expected because of State Backing. For example, if a user sees that a single access to RocksDB takes milliseconds, then they need to look at the memory and I/O configuration. These metrics can be enabled by setting the state.backend.rocksdb.latency-track-enabled option. These metrics are monitored by sampling, so their performance impact on RocksDB State Backend is negligible.

Switching State Backend via

Savepoint

Users can now switch the State Backend of a Flink application when restarting from a Savepoint. This allows Flink applications to no longer be restricted to using only the State Backend selected when the application first runs.

Based on this feature, users can now start with a HashMap State Backend, and switch to the RocksDB State Backend if the subsequent state becomes too large.

At the implementation level, Flink now unifies the Savepoint format of all State Backends for this functionality.

K8s deployments using user-specified pod mode

native kubernetes deployment[11] (Flink actively asks K8s to start pods), and custom pod templates can now be used.

Using these templates, users can set up JM and TM pods in a more K8s-compliant way that is more flexible than the built-in configuration items integrated with Flink K8s.

Unaligned Checkpoint Unaligned Checkpoint

is currently in production-ready status, and users are encouraged to try out this feature in the presence of backpressure.

Specifically, these features, introduced in Flink 1.13, make Unaligned Checkpoint easier to use:

    > Users can now scale up and down the app when using Unaligned Checkpoint. This is handy if users need to use Retained checkpoint because they can’t use Savepoint for performance reasons.

  • For apps without backpressure, enabling Unaligned Checkpoint is now less expensive. Unaligned Checkpoints can now be triggered automatically by timing out, i.e. an app uses Aligned Checkpoint by default (no data in transit) and only automatically switches to Unaligned Checkpoint (stores data in transit) when the alignment exceeds a certain time range.

For information on how to enable Unaligned Checkpoint, refer to the related documentation [12].

Machine learning moved to a separate warehouse

In order to accelerate the progress of Flink machine learning (unified machine learning with flow batches), Flink Machine Learning has now opened a new flink-ml[13].  Warehouse. We manage the Stateful Function project in a similar way, simplifying the process of merging code by using a separate repository and allowing for separate version releases, which improves development efficiency.

Users can follow Flink’s progress in machine learning,

such as interoperability with Alink[14], Flink’s suite of common machine learning algorithms, and Flink’s integration with Tensorflow[15].

Second, the progress of the SQL/Table

API

is similar to the previous version, and SQL and Table APIs still occupy a large proportion of all development.

Defining a Time Window Through the Table-valued Function

In streaming SQL queries, one of the most common uses is to define a time window. Flink 1.13 introduced a new way to define windows: via the Table-valued function. This approach is not only more expressive (allowing users to define new window types), but also more consistent with SQL standards.

Flink 1.13 supports TUMBLE and HOP windows in the new syntax, and will also support SESSION windows in subsequent releases. Let’s demonstrate the expressive power of this method by using the following two examples

:

  • Example 1: A newly introduced CUMULATE window function that can support expanding windows in specific steps until the maximum window size is reached:

  • <

section class="code-snippet__fix code-snippet__js"> 
SELECT window_time, window_start, window_end, SUM(price) AS total_price  FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, window_ time;