-
core feature Interpretation -
Important improvements in interpretation -
of Flink SQL 1.14 Future Planning -
Summary


Overview of Flink SQL 1.13
Second, the core feature interpretation
1. FLIP-145: Support Window TVF
Before
SELEC T
TUMBLE_START(bidtime,INTERVAL '10' MINUTE),
TUMBLE_END( bidtime,INTERVAL '10' MINUTE),
TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTE),
SUM(price)
FROM MyTable
GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)
In version 1.13, we normalized the syntax of the Table-Valued Function: SELECT WINDOW_start,WINDOW_end,WINDOW_time,SUM(price)
FROM Table(TUMBLE(Table myTable, DESCRIPTOR(biztime),INTERVAL '10' MINUTE))
GROUP BY WINDOW_start,WINDOW_end
By comparing the two syntaxes, we can see that the TVF syntax is more flexible and does not need to follow the GROUP BY keyword, and Window TVF is based on relational algebra, making it more standard. WHEN YOU ONLY NEED TO DIVIDE THE WINDOW SCENE, YOU CAN ONLY USE TVF WITHOUT USING GROUP BY AS AGGREGATION, WHICH MAKES TVF MORE EXTENSIBLE AND EXPRESSIVE, AND SUPPORTS CUSTOM TVF (SUCH AS TVF THAT IMPLEMENTS TOP-N).
The example in the figure above is the division of the rolling window made by TVF, which only needs to divide the data into the window without aggregation; If aggregation is required later, then GROP BY is sufficient. At the same time, this operation is very natural for users familiar with batch SQL, and we no longer need to use a special SqlGroupedWindowFunction to bind window partitions and aggregations together as we did before version 1.13. At present, Window TVF supports tumble window, hop window, and added cumulate window; Session Window is also expected to be supported in version 1.14. ■ 1.2 Cumulate Window
Cumulate window is the cumulative window, in simple terms, an interval on the timeline in the above figure is the window step.
-
the first window counts data for an interval;
-
The second window counts the data of the first and second intervals;
-
The third window counts the data of the first, second, and third intervals.
INSERT INTO cumulative_UV
SELECT date_str,MAX (time_str),COUNT(DISTINCT user_id) as UV
FROM (
SELECT
DATE_FORMAT(ts,'yyyy-MM-dd') as date_str,
SUBSTR( DATE_FORMAT(ts,'HH:mm'),1, 4) || '0' as time_str,
user_id
FROM user_behavior
)
GROUP BY date_str
First, the time window field to
which each record belongs is stitched, and then all records are aggregated by GROUP BY according to the stitched time window field, so as to achieve the effect of approximate cumulative calculation.
-
pre-1.13 writing has many drawbacks, first of all, this aggregation operation is calculated once per record. Secondly, when chasing data, when consuming the accumulated data, the curve of the UV market will jump.
-
In version 1.13 support TVF writing, based on cumulate window, we can modify to the following writing, each piece of data according to the Event Time is accurately divided into each window, the calculation of each window is triggered by watermark, even in the data chasing scene will not jump.
INSERT INTO cumulative_UV
SELECT WINDOW_end,COUNT( DISTINCT user_id) as UV
FROM Table(
CUMULATE( Table user_behavior,DESCRIPTOR(ts),INTERVAL '10' MINUTES,INTERVAL '1' DAY))
)
GROUP BY WINDOW_start,WINDOW_end
The UV dashboard curve effect is shown in the figure below:
■ 1.3 Window Performance OptimizationFlink 1.13
The community developers have made a series of performance optimizations for Window TVF, including:
- memory
-
optimization: cache window data through memory preallocation, trigger calculations through window watermarks, and avoid high-frequency access to the state by applying for some memory buffers;
-
Slice optimization: slice the window and reuse the calculated results as much as possible, such as hop window, cumulate window. The calculated shard data does not need to be calculated again, and only the calculation results of the slices need to be reused;
-
Operator optimization: The window operator supports local-global optimization; At the same time, it supports count(distinct) automatic hotspot optimization;
-
Late data: Supports calculating late data to subsequent shards to ensure data accuracy.
■ 1.4 Multi-dimensional data analysis
Window Top-N In addition to multidimensional analysis, Window TVF also supports Top-N syntax, making it easier to write Top-N
2. FLIP-162: Time zone and time function
■ 2.1 Time zone problem analysis
-
The PROCTIME() function should consider time zones, but not time zones;
-
CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() function does not take time zones into account;
-
Flink’s time attribute only supports definition on the TIMESTAMP data type, which has no time zone, and the TIMESTAMP type does not consider the time zone, but the user wants the time in the local time zone.
ul class=”list-paddingleft-2″> if we configure to use TIMESTAMP,
it can be of type string. Whether the user observes from the UK or China time zone, this value is the same;
But for TIMSTAMP_TLZ, its source is a Long value that represents the time that has elapsed since the origin of time. At the same time, the time elapsed from the time origin is the same in all time zones, so this Long value is the concept of absolute time. When we observe this value in different time zones, we will use the local time zone to interpret it as the readable format of “YYYY-MM-DD-HOUR-MINUTE-SECOND”, WHICH IS THE TIMSTAMP_TLZ TYPE, TIMESTAMP_LTZ THE TYPE IS ALSO MORE IN LINE WITH USERS’ USAGE HABITS IN DIFFERENT TIME ZONES.
THE
■ 2.2 Time
function correction
-
which always returned UTC until version 1.13 TIMESTAMP;
-
Now, we’ve changed the return type to TIMESTAMP_LTZ.
In addition to representing functions, PROCTIME can also represent markers for time attributes.
-
Before version 1.13, if we needed to do a daily window operation, you needed to manually solve the time zone problem, do some 8-hour offset and then subtract it back;
-
In FLIP-162 we solved this problem, now the user is very simple to use, only need to declare the proctime attribute, because the return value of the PROCTIME() function is TIMESTAMP_LTZ, so the result will consider the local time zone. The following illustration shows that the aggregation of windows for the proctime property is done in the local time zone under different time zones.
The time function in the Streaming
and Batch modes will
- is per-record calculation in stream mode, that is,
-
each piece of data is calculated once;
-
In Batch mode, it is a query-start calculation, which is calculated once before the job starts. For example, some of our commonly used Batch calculation engines, such as Hive, are also calculated once before each batch starts.
■ The 2.3 time type usage
-
When the upstream source data of the job contains a string of time (such as 2021-4-15 14:00:00), directly declare it as TIMESTAMP and then define the event time on it, the window will be divided based on the time string when calculating, and finally calculate the expected result that meets your actual wants;
-
When the dotting time of the upstream data source belongs to the long value, it represents the meaning of an absolute time. In version 1.13 you can define Event time on top of TIMESTAMP_LTZ. AT THIS TIME, VARIOUS WINDOW AGGREGATES DEFINED ON THE TIMESTAMP_LTZ TYPE CAN AUTOMATICALLY SOLVE THE 8-HOUR TIME ZONE OFFSET PROBLEM WITHOUT THE NEED TO MODIFY AND REVISE THE TIME ZONE ACCORDING TO THE PREVIOUS SQL WRITING.
■ 2.4 Daylight saving time support
Third, important improvement interpretation
1. FLIP-152: Improve Hive syntax compatibility FLIP-152 mainly does Hive syntax compatibility
2. FLIP-163: Improvements
to SQL Client
-
through SET SQL-client.verbose = true, open verbose, and print the entire message by enabling verbose, which is easier to track down error information than before by outputting only one sentence; -
SET execution.runtime-mode=streaming / batch supports setting the batch/stream job mode; -
Set the job name via SET pipline.name=my_Flink_job; -
path via SET execution.savepoint.path=/tmp/Flink-savepoints/savepoint-bb0dab; -
For multiple jobs with dependencies, use SET Table.dml-sync=true to select whether to execute asynchronously, such as offline jobs, job A can run job B after running, and set to true to execute a dependent pipeline scheduling.
Set the job savepoint
4. STATEMENT SET SYNTAX IS ALSO SUPPORTED:
-
2 queries need to be launched before version 1.13 to complete this job; -
In version 1.13, we can put these into a statement and execute it in the form of a job, which can realize node reuse and save resources.
3. FLIP-136: Enhanced conversion of DataStream and Table
-
passes EVENT TIME and WATERMARK when DataStream and Table conversions are supported;
Table Table = TableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime"," TIMESTMP(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
)
-
Supports Changelog data streams to and from Table and DataStream.
//DATASTREAM to Table StreamTableEnvironment.fromChangelogStream(DataStream<ROW>): Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): Table
//Table to DATASTREAM
>StreamTableEnvironment.toChangelogStream(Table): DataStream<ROW>
StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>
Flink SQL 1.14
Future Planning
1.14 The main plans for the release are as follows:
delete Legacy Planner:
Starting from Flink 1.9, after Ali contributed Blink-Planner, many new features have been developed based on this Blink Planner, and the old Legacy Planner will be completely deleted;
Improve Window TVF: support session window, support window TVF allow -lateness, etc.;
Improve schema handling: the schema processing capability of the whole link and the improvement of key verification;
Enhanced Flink CDC support
: Enhance the integration of upstream CDC systems, and more operators in Flink SQL support CDC data streams.
V. Summary
This article explains in detail the core features and important improvements of Flink SQL 1.13.
-
supports Window TVF;
-
Systematically solve time zone and time function problems;
-
Improved compatibility between Hive and Flink;
-
Improve SQL Client;
-
Enhance the conversion of DataStream and Table.