This release also includes a number of important changes to better understand the performance of streaming jobs. When the performance of the streaming job is not as expected, these changes allow the user to better analyze the cause. These changes include load and backpressure visualization to identify bottleneck nodes, CPU flame graphs to analyze operator hotspot codes, and State access performance metrics to analyze State Backend status.
In addition to these features, the Flink community has added a number of other optimizations, some of which will be discussed later in this article. We hope that users can enjoy the benefits of the new version and features, and at the end of this article, we will also cover some of the changes to be aware of when upgrading the Flink version.
We encourage users to download and try out the new version of Flink and to report problems through the mailing list and JIRA.
First, the important characteristics
of passive expansion and contraction
One of the
initial goals of the Flink project was to make stream processing applications as simple and natural as normal applications, and passive scaling is Flink’s latest development towards this goal.
When considering resource management and parts, Flink has two possible patterns. Users can deploy Flink applications to resource management systems such as k8s and yarn, and Flink actively manages resources and allocates and releases resources as needed. This pattern is useful for jobs and applications that frequently change resource requirements, such as batch jobs and real-time SQL queries. In this mode, the number of workers started by Flink is determined by the concurrency set by the application. In Flink we call this pattern active scaling.
For long-running stream processing applications, a more suitable model is that users only need to start the job like any other long-running service, regardless of whether it is deployed on k8s, yarn or other resource management platforms, and do not need to consider the number of resources that need to be requested. Instead, its size is determined by the number of workers allocated. When the number of workers changes, Flink automatically changes the concurrency of the application. In Flink we call this pattern passive scaling.
Flink’s Application deployment mode opens up efforts to bring Flink jobs closer to normal applications (i.e., starting a Flink job does not require two separate steps to start the cluster and commit the application), while passive scaling accomplishes this goal: users no longer need to use additional tools (e.g. scripts, K8s operators) to get workers is consistent with the app concurrency setting.
Users can now apply autoscaling tools to Flink applications, just like normal applications, as long as they understand the cost of scaling: stateful streaming applications need to redistribute state when scaling.
If you want to try passive scaling, you can add the scheduler-mode: reactive configuration item and start an application cluster (Standalone or K8s). See the documentation  for more details.
Analyzing Application Performance
The ability to easily analyze and understand application performance is a critical feature for all applications. This feature is even more important for Flink, which is typically data-intensive (i.e., requires processing large amounts of data) and needs to deliver results with (near) real-time latency.
When Flink applications can’t keep up with data input, or when an app is consuming more resources than expected, these tools can help you analyze why.
■ Bottleneck detection and backpressure monitoring
The first problem to be solved by Flink performance analysis is often: which operator is the bottleneck?
To answer this question, Flink introduced metrics that describe the degree of job busyness (i.e., processing data) and backpressure (inability to continue output due to downstream operators not being able to process results in time). Possible bottlenecks in applications are operators that are busy and backpressure upstream.
Flink 1.13 optimizes the logic for backpressure detection (using task-based mailbox timing instead of stack sampling) and re-implements the UI presentation of job graphs: Flink now shows the degree of busyness and backpressure on the UI by color and value.
■ CPU Flame Graph in Web UI
Flink Another frequently answered question about performance: Which part of the calculation logic in the bottleneck operator consumes a lot of money?
An effective visualization tool for this problem is the flame chart. It can help answer the question:
What percentage of CPU is used by different methods?
What does the stack where a method is called look like?
> which method is now consuming CPU?
The flame map is built by repeating the stacks of sampled threads. In the flame diagram, each method call is represented as a rectangle, the length of which is proportional to the number of times this method appears in the sample. An example of a flame graph on the UI is shown in the following image.
The documentation for the flame diagram includes more details and instructions for enabling this feature.
■ State Access Latency Metric
Another possible performance bottleneck is state backend, especially if the job’s state exceeds memory capacity and must use RocksDB state backend.
This is not to say that RocksDB is not good enough (we love RocksDB!). ), but it needs to meet some conditions to achieve the best performance. For example, users may easily encounter unintentional problems with RockDB’s IO performance requirements on the cloud due to the use of the wrong disk resource type.
Based on the CPU flame graph, the new State Backend’s latency metrics can help users better determine whether the performance is not as expected because of State Backing. For example, if a user sees that a single access to RocksDB takes milliseconds, then they need to look at the memory and I/O configuration. These metrics can be enabled by setting the state.backend.rocksdb.latency-track-enabled option. These metrics are monitored by sampling, so their performance impact on RocksDB State Backend is negligible.
Switching State Backend via
Users can now switch the State Backend of a Flink application when restarting from a Savepoint. This allows Flink applications to no longer be restricted to using only the State Backend selected when the application first runs.
Based on this feature, users can now start with a HashMap State Backend, and switch to the RocksDB State Backend if the subsequent state becomes too large.
At the implementation level, Flink now unifies the Savepoint format of all State Backends for this functionality.
K8s deployments using user-specified pod mode
native kubernetes deployment (Flink actively asks K8s to start pods), and custom pod templates can now be used.
Using these templates, users can set up JM and TM pods in a more K8s-compliant way that is more flexible than the built-in configuration items integrated with Flink K8s.
Unaligned Checkpoint Unaligned Checkpoint
is currently in production-ready status, and users are encouraged to try out this feature in the presence of backpressure.
Specifically, these features, introduced in Flink 1.13, make Unaligned Checkpoint easier to use:
For apps without backpressure, enabling Unaligned Checkpoint is now less expensive. Unaligned Checkpoints can now be triggered automatically by timing out, i.e. an app uses Aligned Checkpoint by default (no data in transit) and only automatically switches to Unaligned Checkpoint (stores data in transit) when the alignment exceeds a certain time range.
> Users can now scale up and down the app when using Unaligned Checkpoint. This is handy if users need to use Retained checkpoint because they can’t use Savepoint for performance reasons.
For information on how to enable Unaligned Checkpoint, refer to the related documentation .
Machine learning moved to a separate warehouse
In order to accelerate the progress of Flink machine learning (unified machine learning with flow batches), Flink Machine Learning has now opened a new flink-ml. Warehouse. We manage the Stateful Function project in a similar way, simplifying the process of merging code by using a separate repository and allowing for separate version releases, which improves development efficiency.
Users can follow Flink’s progress in machine learning,
such as interoperability with Alink, Flink’s suite of common machine learning algorithms, and Flink’s integration with Tensorflow.
Second, the progress of the SQL/Table
is similar to the previous version, and SQL and Table APIs still occupy a large proportion of all development.
Defining a Time Window Through the Table-valued Function
In streaming SQL queries, one of the most common uses is to define a time window. Flink 1.13 introduced a new way to define windows: via the Table-valued function. This approach is not only more expressive (allowing users to define new window types), but also more consistent with SQL standards.
Flink 1.13 supports TUMBLE and HOP windows in the new syntax, and will also support SESSION windows in subsequent releases. Let’s demonstrate the expressive power of this method by using the following two examples
Example 1: A newly introduced CUMULATE window function that can support expanding windows in specific steps until the maximum window size is reached:
section class="code-snippet__fix code-snippet__js">
SELECT window_time, window_start, window_end, SUM(price) AS total_price
FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, window_ time;
Example 2: The user can access the start and end times of the window in the table-valued window function, allowing the user to implement new functions. For example, in addition to regular window-based aggregations and joins, users can now implement window-based Top-K aggregations:
SELECT window_time, ...
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC)
) WHERE rank <= 100;
This release greatly simplifies the process of mixing the
DataStream API with the Table API/SQL
The Table API is a very convenient interface for application development because it is written in programs that support expressions and provides a large number of built-in functions. But sometimes users need to switch back to DataStream, such as when they need expressiveness, flexibility, or state access.
Flink’s new StreamTableEnvironment.toDataStream()/.fromDataStream() can use a Source or Sink declared by a DataStream API as a Source or Sink for a Table. Major optimizations include <
> automatic conversion of DataStream and the Table API type system.
Seamless integration of Event Time configuration and high consistency of Watermark behavior.
The Row type, which represents data in the Table API, has been greatly enhanced, including optimizations for the toString() / hashCode() and equals() methods, support for accessing field values by name, and support for sparse representation.
Table table = tableEnv.fromDataStream(
dataStream = tableEnv.toDataStream(table)
.keyBy(r -> r.getField("user"))
Client: Init Script and Statement Sets
Client is an easy way to run and deploy SQL streams or batch jobs directly, which users can invoke from the command line without writing code SQL, or as part of a CI/CD process.
This release greatly improves the functionality of the SQL Client. Both SQL Client and SQL scripts are now supported, based on all the syntaxes that can be supported through Java programming, i.e., programmatically calling TableEnvironment to initiate queries. This means that SQL users no longer need to add glue code to deploy their SQL jobs.
■ Configuration simplification and code sharing
Flink will no longer support configuring SQL Client via Yaml (Note: It is still supported, but has been marked as obsolete). Instead, SQL Client now supports configuring the environment with an init script before the main SQL script executes.
These init scripts can often be shared between different teams/deployments. It can be used to load commonly used catalogs, apply common configurations, or define standard views.
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
By adding configuration items and optimizing the SET / RESET command, users can more conveniently use SQL Client and SQL The script is internal to control the flow of execution.
■ Support for
multiple queries and multiple queries through statement collections
allows users to execute multiple SQL queries (or statements) in one Flink job. This is useful for long-running streaming SQL queries.
Statement sets can be used to combine a set of queries into a set of simultaneous executions.
The following is an example of a SQL script that can be executed via SQL Client. It initializes and configures the environment for executing multiple queries. This script includes all queries and all environment initialization and configuration, making it a self-contained deployment component.
-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;
-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '...',
'format' = 'avro'
-- set the execution mode for jobs
-- set the sync/async mode for INSERT INTOs
-- set the job's parallelism
-- set the job name
SET pipeline.name = my_flink_job;
-- restore state from the specific savepoint path
BEGIN STATEMENT SET;
INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;
INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
users can now also use Hive SQL syntax on Flink. In addition to the Hive DDL dialect, Flink now supports the popular Hive DML and DQL dialects.
In order to use the Hive SQL dialect, you need to set table.sql-dialect to hive and load HiveModule. The latter is important because Hive’s built-in functions must be loaded for proper compatibility with Hive syntax and semantics. Examples are as follows:
>CREATE CATALOG myhive WITH (' type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some
Note that DML and DQL statements with Flink syntax are no longer supported in the Hive dialect. If you want to use the Flink syntax, you need to switch back to the dialect configuration of default.
Optimized SQL time functions
Time processing is an important task in data processing. But at the same time, dealing with different time zones, dates, and times is an increasingly complex task.
In Flink 1.13, we put a lot of effort into simplifying the use of time functions. WE TWEAKED THE RETURN TYPES OF TIME-DEPENDENT FUNCTIONS TO BE MORE PRECISE, SUCH AS PROCTIME(), CURRENT_TIMESTAMP(), AND NOW().
Second, users can now define Event Time properties based on a column of type TIMESTAMP_LTZ, allowing graceful support for daylight saving time in window processing.
Users can refer to the Release Note to see the full changes to this section.
Third, PyFlink core optimization
The improvements to PyFlink in this release are mainly to make the Python-based DataStream API more consistent with the corresponding functionality of the Table API and Java/scala versions.
Stateful operators in the Python DataStream API
In Flink 1.13, Python programmers can enjoy all the capabilities of the Flink state handling API. The Python DataStream API, which was refactored in Flink 1.12, now has full state access, allowing users to log data to the state and access it later.
Stateful processing power is the foundation of many complex data processing scenarios that rely on sharing state across records, such as the Window Operator.
The following example shows the implementation of a custom computation window:
def __init__(self, window_size):
self.window_size = window_size
def open(self, runtime_context : RuntimeContext):
descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
self.sum = runtime_context.get_state(descriptor)
def flat_map(self, value):
current_sum = self.sum.value()
if current_sum is None:
current_sum = (0, 0)
# update the count
current_sum = (current_sum + 1, current_sum + value)
# if the count reaches window_size, emit the average and clear the state
if current_sum >= self.window_size:
yield value, current_sum // current_sum
ds = ... # type: DataStream
ds.key_by(lambda row: row) \
User-defined windows in the PyFlink DataStream API The PyFlink DataStream
interface adds support for user-defined windows in Flink 1.13. Users can now use window definitions outside of standard windows.
Since windows are the core mechanism for handling infinite data streams (by dividing streams into finite “buckets”), this feature greatly improves the expressiveness of the API.
Row-based operations in the PyFlink Table
The Python Table API now supports row-based operations, such as custom functions for user row data. This feature allows users to use data processing functions that are not built-in.
An example of a Python Table API that uses the map() operation is as follows:
def increment_column(r: Row) -> Row:
return Row(r + 1, r)
table = ... # type: Table
mapped_result = table.map(increment_column)
In addition to map(), this API supports flat_map(), aggregate(), flat_aggregate() and other row-based operations. This brings the functionality of the Python Table API closer to that of the Java Table API.
The PyFlink DataStream API supports the Batch execution mode for finite streams, and the PyFlink DataStream API now
supports the Batch execution mode
introduced in the Flink 1.12 DataStream API.
By reusing data finity to skip processing of state backends and checkpoints, the Batch execution mode simplifies operations and improves the performance of finite stream processing.
based on Hugo’s Flink documentation
Flink documentation migrated from JekyII to Hugo. If you notice a problem, be sure to let us know, we are very much looking forward to how users will feel about the new interface.
Web UI Support Historical Exceptions
Flink Web UI can now display n historical exceptions that caused a job to fail, thereby improving the debugging experience in scenarios where one exception causes multiple subsequent exceptions. Users can find the root exception in the exception history.
Reporting of Optimization Failed Checkpoint Exceptions and Reasons for Failure
Flink now provides statistics of failed or canceled Checkpoints, making it easier for users to determine the cause of Checkpoint failures without having to look at the log.
Previous versions of Flink only reported metrics (e.g. size of persisted data, trigger time, etc.) when the checkpoint was successful.
JDBC Sink provides “exactly once” consistency starting with
1.13, and JDBC Sink can provide “exactly once” consistency support for databases that support XA transactions by committing data using transactions. This feature requires that the target database must have (or be linked to) an XA transaction processor.
This sink is now only available in the DataStream API. Users can create this sink via JdbcSink.exactlyOnceSink(…) (or by explicitly initializing a JdbcXaSinkFunction).
The PyFlink Table API supports user-defined aggregate functions on the Group window
The PyFlink Table API now supports both Python-based User-defined Aggregate Functions (UDAFs) and Pandas UDAFs for Group windows. These functions are important for many data analysis or machine learning trained programs.
Prior to Flink 1.13, these functions could only be used in unlimited Group-by aggregation scenarios. Flink 1.13 optimizes this limitation.
Sort-merge Shuffle optimization in batch execution mode Flink 1.13 optimizes the performance and memory footprint of
Sort-merge Blocking Shuffle
for batch programs. This Shuffle mode was introduced in FLIP-148 of Flink 1.12.
This optimization avoids the constant OutOfMemoryError: Direct Memory problem under large-scale jobs and improves performance through I/O scheduling and broadcast optimization, especially on HDDs.
HBase connector supports asynchronous dimension table queries
and query caching
HBase Lookup Table Source can now support asynchronous query patterns and query caching. This greatly improves the performance of Table / SQL dimension table joins using this Source, and can reduce the number of I/O requests to HBase in some typical cases.
In previous releases, HBase Lookup Source only supported synchronous communication, resulting in lower job throughput and resource utilization.
to upgrade Flink 1.13:
FLINK-21709 The old Table & SQL API scheduler has been marked as obsolete and will be removed in Flink 1.14. The Blink scheduler has been set as the default planner several versions ago and will be the only planner in future releases. This means that BatchTableEnvironment and DataSet API interoperability will no longer be supported in the future. Users need to switch to a unified TableEnvironment to write flow or batch jobs.
>FLINK-22352 – The Flink community has decided to deprecate support for Apache mesos, which may be further removed in the future. It is best for users to switch to other resource management systems.
FLINK-21935 – state.backend.async This configuration has been disabled because Flink now always saves snapshots asynchronously (i.e. the previous configuration default). And there is no implementation of snapshot saving operations that can support synchronization.
FLINK-17012 – The RUNNING state of a Task is subdivided into two steps: INITIALIZING and RUNNING. The INITIALIZING phase of a Task includes the process of loading the state and restoring the in-flight data when the unaligned checkpoint is enabled. By explicitly distinguishing between the two states, the monitoring system can better distinguish whether the task is already actually working.
FLINK-21698 – There is a problem with direct conversion between numeric and TIMESTAMP types, which is now disabled, such as CAST (numeric AS TIMESTAMP(3)). Users should use TO_TIMESTAMP(FROM_UNIXTIME(numeric)) instead.
FLINK-22133 – The new Source interface has a small incompatible modification, namely that the SplitEnumerator.snapshotState() method now accepts an additional checkpoint id parameter to represent the ids of the checkpoint to which the snapshot operation is in progress.
>FLINK-19463 – The old Statebackend interface was marked as obsolete because it carried too much semantics and was confusing. This is a pure API layer change and does not affect the application runtime. For how to upgrade existing jobs, refer to the Job Migration Guide .
5. Other resources
Binaries and code are available from the download page of Flink’s official website, and the latest PyFlink release is available from PyPI.
If you want to upgrade to Flink 1.13, please refer to the release notes . This version is compatible with previous 1.x versions on interfaces marked as @Public.
Users can also view the new version modification list and the updated document for a detailed list of changes and new features.
Original link: https://flink.apache.org/news/2021/05/03/release-1.13.0.html