Apache Flink continues to grow rapidly and is one of the most active communities in Apache. Flink 1.16 has over 230 contributors enthusiastically participating, completing 19 FLIPs and 900+ questions, bringing many exciting features to the community.
Flink has become the protagonist and de facto standard for stream processing, and the concept of batch unification of streams is gaining acceptance and is being successfully implemented in more and more companies. Previously, the concept of stream-batch integration emphasized a unified API and a unified computing framework. This year, based on this, Flink proposed the next development direction of Flink-Streaming Warehouse (Streamhouse), which further upgraded the scope of stream-batch convergence: it truly achieves not only unified computing but also unified storage, so as to achieve unified real-time analysis.
In 1.16, the Flink community has made many improvements in batching and stream processing:
-
• Feature: Introduced Join hint to allow Flink SQL users to manually specify join policies to avoid unreasonable execution plans. Hive SQL is 94% compatible, and users can migrate Hive to Flink at a fraction of the cost.
-
• Stability: A speculative execution mechanism is proposed to reduce the long-tail sub-tasks of the operation and improve stability. Improve HashJoin and introduce a failure rollback mechanism to avoid join failures.
-
• Performance: Introduces dynamic partition pruning to reduce scan I/O and improve connection handling for star model queries. The TPC-DS benchmark has a 30% improvement. We can use hybrid shuffle mode to improve resource usage and processing performance.
-
For stream processing, there are a number of significant improvements:
-
• Lookup join is widely used in stream processing. Problems such as slow lookup, low throughput, and update latency are solved by ordinary caching mechanisms, asynchronous IO, and retryable lookups. These functions are very practical, solve the pain points that users often complain about, and support richer scenarios.
-
• From the first day of Flink SQL, there have been non-deterministic operations that can lead to incorrect results or exceptions, causing great distress to users. In 1.16, we put a lot of effort into solving most of the problems, and we will continue to improve them in the future.
> • For batching, there have been improvements in ease of use, stability, and performance. 1.16 is a milestone release of Flink batch processing and an important step towards maturity.
>• Ease of use: With the introduction of SQL Gateway and full compatibility with Hive Server2, users can easily submit Flink SQL jobs and Hive SQL jobs, and can also easily access the original Hive ecosystem.
•
> • Changelog State Backend provides users with second- or even millisecond-level checkpoints to significantly improve the fault-tolerant experience while providing a smaller end-to-end latency experience for transactional sink jobs.
With the further refinement of stream batch fusion and the continuous iteration of Flink Table Store (0.2 has been released[1]), the Flink community is gradually pushing the Streaming repository from concept to reality and maturity.
Understanding streaming
warehouses To be precise, streaming warehouses are to make the data warehouse streaming, allowing the data at each layer of the entire warehouse to flow in real time. The goal is to implement a Streaming Service with end-to-end real-time performance through a unified API and compute framework. See article [2] for more details.
Batch Processing
Flink is a unified streaming batch engine, and due to our long-term investment, stream processing has become the protagonist. We’re also working on improving batch processing to make it a great compute engine. This makes the overall experience of flow batch unification smoother.
Feedback from various sources of SQL Gateway indicates that SQL Gateway [3]. It is a feature that users are very much looking forward to, especially batch users. This feature was finally completed in 1.16 (see FLIP-91 for design). SQL Gateway is an extension and enhancement of SQL Client that supports multi-tenancy and pluggable API protocols (Endpoints), solving the problem that SQL Client can only serve a single user and cannot integrate with external services or components. At present, SQL Gateway has supported REST API and HiveServer2 protocol, and users can connect to SQL Gateway through cURL, Postman, HTTP client and other programming languages to submit stream jobs, batch jobs and even OLAP jobs. For HiveServer2 Endpoint, see Hive Compatibility for more details.
Hive Compatibility
To reduce the cost of Hive’s migration to Flink, we have introduced HiveServer2 Endpoint and Hive syntax improvements in this release:
HiveServer2 Endpoint[4]. Allows users to interact with SQL Gateway through Hive JDBC/Beeline and migrate Flink to the Hive ecosystem (DBeaver, Apache Superset, Apache DolphinScheduler, and Apache Zeppelin). When a user connects to the HiveServer2 endpoint, SQL Gateway registers the Hive Catalog, switches to Hive Dialect, and executes the job in batch mode. With these steps, users can have the same experience as HiveServer2.
Hive syntax [5] is already the de facto standard for big data processing. Flink improves compatibility with Hive syntax and adds support for several Hive syntaxes commonly used in production. Hive syntax compatibility can help users migrate existing Hive SQL tasks to Flink, making it easier for users who are familiar with Hive syntax to write SQL queries to tables registered in Flink. Compatibility is measured using the Hive qtest suite that contains more than 12K SQL cases. So far, the compatibility of the entire hive query has reached 94.1% for Hive 2.3 and 97.3% if ACID queries are excluded.
Flink SQL’s Join
prompt join prompt is a common solution in the industry to improve the shortcomings of the optimizer by influencing the execution plan. Join is the most widely used operator in batch jobs, and Flink supports a variety of join strategies. Missing statistics or a poor cost model for the optimizer can lead to incorrect connection policy selection, resulting in slow execution or even job failures. By specifying a connection prompt, the optimizer selects a user-specified connection policy whenever possible. It avoids various drawbacks of the optimizer and ensures the production availability of batch jobs.
Adaptive Hash Join
For batch jobs, the hash join operator may fail if the input data is heavily skewed, which is a very poor experience for the user. To solve this problem, we introduced adaptive hash-join, which automatically falls back to Sort-Merge-Join if it fails at runtime. This mechanism ensures that Hash-Join is always successful and improves stability without the user’s complete knowledge.
Speculative execution of
batch jobs
Flink 1.16 introduces speculative execution to mitigate slow batch jobs caused by problematic nodes. The problematic node might have hardware issues, unexpectedly busy I/O, or high CPU load. These issues can make managed tasks run much slower than tasks on other nodes and affect the overall execution time of batch jobs.
When speculative execution is enabled, Flink will continue to detect slow tasks. Once a slow task is detected, the node where the slow task is located will be identified as problematic and blocked through the “blacklist list” mechanism (FLIP-224). The scheduler will create new attempts for slow tasks and deploy them to non-blocking nodes, while existing attempts will continue to run. The new attempt processes the same input data and produces the same data as the original attempt. Once any attempt is completed first, it is considered the only completion attempt for the task, and the rest of the attempts for that task are canceled.
Most existing sources can use speculative execution (FLIP-245). Only if the source uses SourceEvent
, it must implement SupportsHandleExecutionAttemptSourceEvent
to support speculative execution. Sink does not yet support speculative execution, so speculative execution does not currently occur on sink.
The Web UI and REST API have also been improved (FLIP-249) to display multiple concurrent attempts and blocking task managers for tasks.
Hybrid Shuffle Mode
We introduce a new hybrid shuffle[6] mode for batch execution. It combines the advantages of blocking shuffle and pipeline-style shuffle (in streaming mode).
-
• Like blocking shuffle, it does not require upstream and downstream tasks to run at the same time, which allows a job with few resources to execute.
-
• Like the pipeline-style shuffle, it does not require downstream tasks to be executed after the upstream task completes, which reduces the overall execution time of the job given sufficient resources.
-
• It accommodates custom preferences between keeping less data and restarting fewer tasks on failure by providing different spill strategies.
Note: This feature is experimental and is not activated by default.
Further improvements
to blocking
shuffle We have further improved the usability and performance of blocking shuffle in this release, including adaptive network buffer allocation, sequential IO optimization, and result partition reuse, allowing multiple consumer vertices to reuse the same physical result partition to reduce disk IO and storage space. These optimizations can achieve an overall performance improvement of 7% for 10 T scale TPC-DS testing. In addition, two compression algorithms with higher compression ratios (LZO and ZSTD) have been introduced to further reduce storage space and some CPU overhead compared to the default LZ4 compression algorithm.
Dynamic partition clipping
For batch jobs, partitioned tables are more widely used in production environments than non-partitioned tables. Currently, Flink supports static partition pruning, and the optimizer pushes the partition field-related filtering conditions in the WHERE clause into the Source Connector during the optimization phase, thereby reducing unnecessary partition scan IO. Star mode is the simplest of the most commonly used data mart modes. We have found that many user jobs cannot use static partition pruning because partition pruning information can only be determined at execution time, which requires dynamic partition pruning technology to collect partition pruning information at runtime based on data from other related tables, thereby reducing unnecessary partition scanning IO to find partition tables. The use of dynamic partition pruning has been validated with the 10 T dataset TPC-DS to improve performance by up to 30%.
In
Stream Processing
1.16, we have made improvements in Checkpoints, SQL, Connectors, and more to keep stream computing ahead of the curve.
A generic incremental
checkpoint change log state backend
designed to make checkpoint intervals shorter and more predictable, this release is in production ready to adapt the change log state backend to existing state backends and improve the availability of change log status backends:
-
• Support for local
-
Introduction of file caching to optimize recovery
-
Support for checkpoint-based switching
-
Improved monitoring experience for changelog status backend
-
• Expose the changelog configuration to the webUI
>• Support for state migration
recovery•
•
•
>• Expose changelog metrics
Percentile | End to End Duration | Checkpointed Data Size* | Full Checkpoint Data Size* |
50% | 311ms / 5s | 14.8MB / 3.05GB | 24.2GB / 18.5GB |
90% | 664ms / 6s | 23.5MB / 4.52GB | 25.2GB / 19.3GB |
99% | 1s / 7s | 36.6MB / 5.19GB | 25.6GB / 19.6GB |
99.9% | 1s / 10s | 52.8MB / 6.49GB | 25.7GB / 19.8GB |
Table 1: Changelog Enabled / Changelog Disabled Comparison in Value States (see this blog [7] for more details)
RocksDB Rescaling Improvements and Rescaling Benchmarks
Rescaling is a frequent operation of cloud services built on Apache Flink, and this release leverages deleteRange to optimize rescaling of the Incremental RocksDB state backend. deleteRange is used to avoid a large number of scanning and deletion operations, and the recovery speed can be almost 2~10 times faster for the upgrade of a large number of states that need to be deleted.
Improving the monitoring experience and availability of the status backend This release also improves
the monitoring experience and availability of
the status backend. Previously, RocksDB’s logs were located in their own DB folder, which made debugging RocksDB not so easy. This release keeps RocksDB’s logs in Flink’s log directory by default. RocksDB statistics-based metrics are introduced and saved at the database level, such as the total block cache hit count in the database.
Support for overdraft
buffers
introduces a new concept of overdraft network buffers[8] to mitigate the effects of uninterrupted blocking of subtask threads during back pressure, via taskmanager.network.memory.max-overdraft-buffers-per-gate
Open. Starting with 1.16.0, Flink subtasks can request 5 additional (overdrawn) buffers above the regular configuration by default. This change may slightly increase the memory consumption of Flink Job, but greatly reduce the checkpoint duration of unaligned checkpoints. Overdraft buffers come into play if a subtask is backpressed by a downstream subtask, and the subtask needs multiple network buffers to do what it is currently doing. Read more about this in the documentation [9].
Align timeouts to misaligned checkpoint barriers in the output buffers of upstream subtasks
This release updates the time to switch from snapped checkpoints (AC) to unaligned checkpoints (UC). With UC enabled, if execution.checkpointing.aligned-checkpoint-timeout[10] is configured, each checkpoint will still start with an AC, but when the global checkpoint duration exceeds aligned-checkpoint-timeout, If the AC is not complete, then the checkpoint will switch to misalignment.
Previously, a switchover of a subtask required waiting for all barriers from upstream. If the service is backpressed, downstream subtasks may not receive all barriers within checkpointing-timeout[11], causing CheckPoint to fail.
In this release, if a barrier cannot be sent from the output buffer to a downstream task within execution.checkpointing.aligned-checkpoint-timeout
, Flink has the upstream subtask switch to UC to send a barrier downstream, thereby reducing the probability of timeouts during checkpoint backpressure. More details can be found in this document [12].
Nondeterministic users in
stream processing
often complain about the high cost of understanding stream processing. One of the pain points is uncertainty in stream processing (often not intuitive), which can lead to incorrect results or errors that exist long from the first day Flink SQL is available.
For complex streaming jobs, you can now detect and resolve potential correctness issues before running. If the problem cannot be completely resolved, you can prompt the user to adjust the SQL with details to avoid introducing non-deterministic issues. More details can be found in the documentation [13].
The enhanced
Lookup Join
Lookup Join is widely used in stream processing, and we have introduced some improvements:
-
• Introduce configurable asynchronous mode (
ALLOW_UNORDERED
) via job configuration [15] or lookup hints [16] to significantly improve query throughput without compromising correctness. -
• Retryable lookup mechanism[10] provides users with more tools to resolve delayed updates in external systems.
> • Added a unified abstraction for the lookup source cache and related metrics [14], to speed up
lookup queries
Retry
asynchronous I/O support
introduces a built-in retry mechanism for asynchronous I/O [17] that is transparent to the user’s existing code, allowing flexibility to meet the user’s retry and exception handling needs.
In
Flink 1.15, we introduced a new execution mode, “Threading”, where user-defined Python functions will be executed in the JVM via JNI instead of in a separate Python process. However, it only supports the Table API in Flink 1.15 and Python scalar functions in SQL. In this release, we have more comprehensive support for it. It is also supported by the Python DataStream API and Python table functions in the Table API and SQL.
We are also continuing to fill in the last few missing features in the Python API. In this release, we provide more comprehensive support for the Python DataStream API, support for side output, broadcast status, and other functions, and finally determine window support. We’ve also added support for more connectors and formats in the Python DataStream API, such as support for connectors elasticsearch, kinesis, pulsar, mixed sources, etc., as well as formats orc, parquet, and more. With all these features added, the Python API should be in line with the most notable features in Java and Scala APIs, and users should be able to develop most Flink jobs fluently using the Python language.
In
1.16, we extended more DDL syntax to help users better use SQL:
-
• USING JAR[18] supports dynamic loading of UDF jars to help platform developers easily manage UDFs.
-
• CREATE TABLE AS SELECT (CTAS) allows users to create new tables based on existing tables and queries.
-
• ANALYZE TABLE[19] allows users to manually generate table statistics so that the optimizer can generate better execution plans.
Caching in DataStream for interactive programming
supports caching the results of transformations through DataStream#cache
. Cached intermediate results are generated deferrally the first time intermediate results are computed so that they can be reused by future jobs. If the cache is lost, it is recalculated using the original transformation. Currently only batch mode is supported. This feature is useful for ML and interactive programming in Python.
History Server and Completed Work Informationenhancement
We have enhanced the experience of viewing completed job information in this release.
-
JobManager / HistoryServer WebUI now provides aggregation of key SubTask metrics, grouped by Task or TaskManager.
-
JobManager / HistoryServer WebUI now provides more environment information, including environment variables, JVM options, and classpaths.
-
HistoryServer now supports browsing logs from external log archive services[6].
>• JobManager/HistoryServer WebUI now provides detailed execution time metrics, including the duration spent in each execution state and the accumulated free/busy/backpressure time during runtime.
The Protobuf
format
Flink now supports Protocol Buffers (Protobuf) format. This allows you to use this format directly in the Table API or SQL applications.
The introduction of a configurable RateLimitingStrategy for Async Sink Async
Sink was implemented in 1.15, allowing users to easily implement their own custom asynchronous sinks. We have now extended it to support the configurable RateLimitingStrategy
. This means that sink implementers can now customize how their asynchronous sink behaves when a request fails, depending on the specific sink. If RateLimitingStrategy
is not specified, it defaults to the current default AIMDrateLimitingStrategy
.
Upgrade Description
Our goal is to make the upgrade as smooth as possible, but some changes require users to adjust certain parts of the program when upgrading to Apache Flink 1.16. Review the release notes for adjustments to make during the upgrade and a list of issues to check.
List of contributors
The Apache Flink community would like to thank every contributor who made this release possible:
1996fanrui, Ada Wang, Ada Wong, Ahmed Hamdy, Aitozi, Alexander Fedulov, Alexander Preuß, Alexander Trushev, Andriy Redko, Anton Kalashnikov, Arvid Heise, Ben Augarten, Benchao Li, BiGsuw, Biao Geng, Bobby Richard, Brayno, CPS794, Cheng Pan, Chengkai Yang, Chesnay Schepler, Danny Cranmer, David N Perkins, Dawid Wysakowicz, Dian Fu, DingGeGe, EchoLee5, Etienne Chauchot, Fabian Paul, Ferenc Csaky, Francesco Guardiani, Gabor Somogyi, Gen Luo, Gyula Fora, Haizhou Zhao, Hangxiang Yu, Hao Wang, Hong Teoh, Hongbo Miao, HuangXingBo, Ingo Bürk, Jacky Lau, Jane Chan, Jark Wu, Jay Li, Jia Liu, Jie Wang, Jing Ge, Jing Zhang, Jingsong Lee, Jinhu Wu, Joe Moser, Joey Pereira, JunRuiLee, Juntao Hu, JustDoDT, Kai Chen, Krzysztof Dziolak, Kyle Dong, LeoZhang, Levani Kokhreidze , Lihe Ma, Lijie Wang, Liu Jiangang, LuNing (Lucas) Wang, LuNing Wang, Luning (Lucas) Wang, Luning Wang, Marios Trivyzas, Martijn Visser, MartijnVisser, Mason Chen, Matthias Pohl, Metehan Yıldırım, Michael, Mingde Peng, Mingliang Liu, Mulavar, Nie yingping, Niklas Semmler, Paul Lam, Paul Lin, Paul Zhang, PengYuan, Piotr Nowojski, Qingsheng Ren, Qishang Zhong, Ran Tao, Robert Metzger, Roc Marshal, Roman Boyko, Roman Khachatryan, Ron, Ron Cohen, Ruanshubin, Rudi Kershaw, Rufus Refactor, Ryan Skraba, Sebastian Mattheis, Sergey, Sergey Nuyanzin, Shengkai, Shubham Bansal, SmirAlex, Smirnov Alexander, SteNicholas, Steven van Rossum, Suhan Mao, Tan Yuxin, Tartarus0zm, TennyZhuang, Terry Wang, Thesharing, Thomas Weise, Timo Walther, Tom, Tony Wei, Weijie Guo, Wencong Liu, WencongLiu, Xintong Song, Xuyang, Yangze Guo, Yi Tang, Yu Chen, Yuan Huang, Yubin Li, Yufan Sheng, Yufei Zhang, Yun Gao, Yun Tang, Yuxin Tan, Zakelly, Zhu Zhu, Zichen Liu, Zili Sun, acquachen, bgeng777, billyrrr, bzhao, caoyu, chenlei677, chenzihao, chenzihao5, coderap, cphe, davidliu, dependabot[bot], dkkb, dusukang, empcl, eyys, fanrui, fengjiankun, fengli, fredia, gabor.g.somogyi, godfreyhe, gongzhongqiang, hongli, huangxingbo, huweihua, jayce, jaydonzhou, jiabao.sun, kevin.cyj, kurt, lidefu, lijiewang.wlj, liliwei, lincoln lee, lincoln.lil, littleeleventhwolf, liufangqi, liujia10, liujiangang, liujingmao, liuyongvs, liuzhuang2017, longwang, lovewin99, luoyuxia, mans2singh, maosuhan, mayue.fight, mayuehappy, nieyingping, pengmide, pengmingde, polaris6, pvary, qinjunjerry, realdengziqi, root, shammon, shihong90, shuiqiangchen, slinkydeveloper, snailHumming, snuyanzin, suxinglee, sxnan, trushev, tsreaper, unknown, wangfeifan, wangyang0918, wangzhiwu, wenbingshen, xiangqiao123, xuyang, yangjf2019, yangjunhan, yangsanity, yangxin, ylchou, yuchengxin, yunfengzhou-hub, yuxia Luo, yuzelin, zhangchaoming, zhangjingcun, zhangmang, zhangzhengqi3, zhaoweinan, zhengyunhong.zyh, zhenyu xing, zhouli, zhuanshenbsj1, zhuzhu.zz, zoucao, zp, zhou lei , Rao Zixuan, Bao Jianxin, Yu Li, Empire A San
Citation Link
[
1
] 0.2 Released: https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html[2]
Article: https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821[3]
SQL Gateway: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/[4]
HiveServer2 Endpoint: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/hive-compatibility/hiveserver2/[5]
Hive syntax: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/hive-compatibility/hive-dialect/overview/[6]
Mixed shuffle: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/batch/batch_shuffle[7]
This blog: https://flink.apache.org/2022/05/30/changelog-state-backend.html[8]
Overdraft network buffer: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/memory/network_mem_tuning/#overdraft-buffers[9]
In the documentation: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/memory/network_mem_tuning/#overdraft-buffers[10]
execution.checkpointing.aligned-checkpoint-timeout: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout[11]
checkpointing-timeout: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#execution-checkpointing-timeout[12]
This document: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout[13]
Documentation: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/determinism[14]
Related indicators: https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric[15]
Job configuration: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/config/#table-exec-async-lookup-output-mode[16]
Find tips: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#lookup[17]
Asynchronous I/O : https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/asyncio/#retry-support[18]
USING JAR: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#create-function[19]
ANALYZE TABLE: https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/analyze