Abstract: This article is compiled by community volunteer Chen Zhengyu, and the Apache Flink community released version 1.13 in May, bringing many new changes. This article is compiled from Xu Bangjiang’s “In-depth Interpretation of Flink SQL 1.13″ shared by Flink Meetup in Beijing on May 22, including:
  1. core feature Interpretation
  2. Important improvements in interpretation
  3. of Flink SQL 1.14 Future Planning
  4. Summary
Tips: Click on the text At the end of “Read the original article you can view more technical dry goods~
GitHub address 
https://github.com/apache/flink
welcome everyone to like Flink and send star~

Overview of Flink SQL 1.13

Flink 1.13 is a large community version, solving more than 1000 issues, through the above figure we can see that most of the problems solved are about Table/SQL modules, a total of more than 400 issues accounting for about 37% of the total. These issues mainly revolve around 5 FLIPs, which we will also introduce in this article, they are:

Let’s break down these flips in detail.

Second, the core feature interpretation

1. FLIP-145: Support Window TVF

Community members should be aware that internal branches of companies such as Tencent, Alibaba, and ByteDance have developed basic versions of this feature. This time, the Flink community also launched TVF related support and optimization in Flink 1.13. The following will interpret this new function from Window TVF syntax, near real-time cumulative computing scenarios, Window performance optimization, and multi-dimensional data analysis.

Before

version 1.13, window was implemented via a special SqlGroupedWindowFunction:

SELEC T  TUMBLE_START(bidtime,INTERVAL '10' MINUTE), TUMBLE_END( bidtime,INTERVAL '10' MINUTE),  TUMBLE_ROWTIME(bidtime,INTERVAL '10'  MINUTE), SUM(price) FROM MyTable GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)

In version 1.13, we normalized the syntax of the Table-Valued Function:

 
SELECT  WINDOW_start,WINDOW_end,WINDOW_time,SUM(price)  FROM Table(TUMBLE(Table myTable, DESCRIPTOR(biztime),INTERVAL '10' MINUTE)) GROUP BY WINDOW_start,WINDOW_end 

By comparing the two syntaxes, we can see that the TVF syntax is more flexible and does not need to follow the GROUP BY keyword, and Window TVF is based on relational algebra, making it more standard. WHEN YOU ONLY NEED TO DIVIDE THE WINDOW SCENE, YOU CAN ONLY USE TVF WITHOUT USING GROUP BY AS AGGREGATION, WHICH MAKES TVF MORE EXTENSIBLE AND EXPRESSIVE, AND SUPPORTS CUSTOM TVF (SUCH AS TVF THAT IMPLEMENTS TOP-N).

The example in the figure above is the division of the rolling window made by TVF, which only needs to divide the data into the window without aggregation; If aggregation is required later, then GROP BY is sufficient. At the same time, this operation is very natural for users familiar with batch SQL, and we no longer need to use a special SqlGroupedWindowFunction to bind window partitions and aggregations together as we did before version 1.13.

At present, Window TVF supports tumble window, hop window, and added cumulate window; Session Window is also expected to be supported in version 1.14.

■ 1.2 Cumulate Window

Cumulate window is the cumulative window, in simple terms, an interval on the timeline in the above figure is the window step.

  • the first window counts data for an interval;

  • The second window counts the data of the first and second intervals;

  • The third window counts the data of the first, second, and third intervals.

Cumulative calculations are very common in business scenarios, such as cumulative UV scenarios. In the UV market curve: We count the cumulative user UV for the day every 10 minutes.

at 1.13 Before the version, when we needed to do this kind of calculation, our general SQL was written as follows:

INSERT INTO cumulative_UVSELECT date_str,MAX (time_str),COUNT(DISTINCT user_id) as  UVFROM ( SELECT  DATE_FORMAT(ts,'yyyy-MM-dd') as date_str, SUBSTR( DATE_FORMAT(ts,'HH:mm'),1, 4) || '0' as time_str, user_id  FROM user_behavior) GROUP BY date_str 

First, the time window field to

which each record belongs is stitched, and then all records are aggregated by GROUP BY according to the stitched time window field, so as to achieve the effect of approximate cumulative calculation.

  • pre-1.13 writing has many drawbacks, first of all, this aggregation operation is calculated once per record. Secondly, when chasing data, when consuming the accumulated data, the curve of the UV market will jump.

  • In version 1.13 support TVF writing, based on cumulate window, we can modify to the following writing, each piece of data according to the Event Time is accurately divided into each window, the calculation of each window is triggered by watermark, even in the data chasing scene will not jump.

 INSERT INTO cumulative_UV SELECT WINDOW_end,COUNT( DISTINCT user_id) as UV FROM Table( CUMULATE( Table user_behavior,DESCRIPTOR(ts),INTERVAL  '10' MINUTES,INTERVAL '1' DAY))) GROUP BY WINDOW_start,WINDOW_end 

The UV dashboard curve effect is shown in the figure below:

■ 1.3 Window Performance OptimizationFlink 1.13

The community developers have made a series of performance optimizations for Window TVF, including:

    memory

  • optimization: cache window data through memory preallocation, trigger calculations through window watermarks, and avoid high-frequency access to the state by applying for some memory buffers;

  • Slice optimization: slice the window and reuse the calculated results as much as possible, such as hop window, cumulate window. The calculated shard data does not need to be calculated again, and only the calculation results of the slices need to be reused;

  • Operator optimization: The window operator supports local-global optimization; At the same time, it supports count(distinct) automatic hotspot optimization;

  • Late data: Supports calculating late data to subsequent shards to ensure data accuracy.

Based on these optimizations, we conduct performance testing through the open source Benchmark (Nexmark). The results show that the universal performance of window has been improved by 2x, and there will be a better performance improvement in the count(distinct) scenario.

■ 1.4 Multi-dimensional data analysis

The standardization of syntax brings more flexibility and extensibility, and users can perform multidimensional analysis directly on window functions. AS SHOWN IN THE FIGURE BELOW, THE ANALYSIS AND CALCULATION OF GROUPING SETS, ROLLUP, AND CUBE CAN BE PERFORMED DIRECTLY. If it is a version before 1.13, we may need to perform separate SQL aggregations for these groups, and then perform union operations on the aggregation results to achieve similar effects. Now, scenarios like this kind of multi-dimensional analysis can be supported directly on window TVF.

Support for

Window Top-N In addition to multidimensional analysis, Window TVF also supports Top-N syntax, making it easier to write Top-N


on Window.

2. FLIP-162: Time zone and time function

■ 2.1 Time zone problem analysis

When using Flink SQL, we have reported a lot of time zone-related problems, and the causes of time zone problems can be summarized into 3:

  • The PROCTIME() function should consider time zones, but not time zones;

  • CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() function does not take time zones into account;

  • Flink’s time attribute only supports definition on the TIMESTAMP data type, which has no time zone, and the TIMESTAMP type does not consider the time zone, but the user wants the time in the local time zone.

In response to the problem that the TIMESTAMP type does not consider the time zone, we propose to support the TIMESTAMP_LTZ type (TIMESTAMP_LTZ is an abbreviation for timestamp with local time zone). You can compare it with TIMESTAMP in the following table:

TIMESTAMP_LTZ IS DIFFERENT FROM THE TIMESTAMP WE USED EARLIER, IT REPRESENTS THE MEANING OF ABSOLUTE TIME. By comparison, we can find that <

ul class=”list-paddingleft-2″> if we configure to use TIMESTAMP,


  • it can be of type string. Whether the user observes from the UK or China time zone, this value is the same;

  • But for TIMSTAMP_TLZ, its source is a Long value that represents the time that has elapsed since the origin of time. At the same time, the time elapsed from the time origin is the same in all time zones, so this Long value is the concept of absolute time. When we observe this value in different time zones, we will use the local time zone to interpret it as the readable format of “YYYY-MM-DD-HOUR-MINUTE-SECOND”, WHICH IS THE TIMSTAMP_TLZ TYPE, TIMESTAMP_LTZ THE TYPE IS ALSO MORE IN LINE WITH USERS’ USAGE HABITS IN DIFFERENT TIME ZONES.

  • THE

    FOLLOWING EXAMPLE SHOWS THE DIFFERENCE BETWEEN TIMESTAMP AND TIMESTAMP_LTZ TYPES.

    ■ 2.2 Time

    function correction

    correction PROCTIME() function

    When we had the TIMESTAMP_LTZ type, we corrected the PROCTIME() type:

    • which always returned UTC until version 1.13 TIMESTAMP;

    • Now, we’ve changed the return type to TIMESTAMP_LTZ.

    In addition to representing functions, PROCTIME can also represent markers for time attributes.

    Revised the CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() function

    The values of these functions change in different time zones. For example, in the UK UTC time zone, it is 2 AM; But if you set the time zone to UTC+8, the time is 10 a.m. The actual time in different time zones will change, and the effect is as follows:

    Solving the processing time Window time zone problem

    Everyone knows that proctime can represent a time property, the window operation on proctime:

    • Before version 1.13, if we needed to do a daily window operation, you needed to manually solve the time zone problem, do some 8-hour offset and then subtract it back;

    • In FLIP-162 we solved this problem, now the user is very simple to use, only need to declare the proctime attribute, because the return value of the PROCTIME() function is TIMESTAMP_LTZ, so the result will consider the local time zone. The following illustration shows that the aggregation of windows for the proctime property is done in the local time zone under different time zones.

    Revised

    The time function in the Streaming


    and Batch modes will

    actually behave differently in the flow and batch modes, and this correction is mainly to make it more in line with the actual usage habits of users. For example, the following function:
      is per-record calculation in stream mode, that is,


    • each piece of data is calculated once;

    • In Batch mode, it is a query-start calculation, which is calculated once before the job starts. For example, some of our commonly used Batch calculation engines, such as Hive, are also calculated once before each batch starts.

    ■ The 2.3 time type usage

    in version 1.13 also supports defining Event time on TIMESTAMP columns, which means that Event time now supports defining on both TIMESTAMP and TIMESTAMP_ LTZ columns. So as a user, what type of specific scenarios are used?

    • When the upstream source data of the job contains a string of time (such as 2021-4-15 14:00:00), directly declare it as TIMESTAMP and then define the event time on it, the window will be divided based on the time string when calculating, and finally calculate the expected result that meets your actual wants;

    • When the dotting time of the upstream data source belongs to the long value, it represents the meaning of an absolute time. In version 1.13 you can define Event time on top of TIMESTAMP_LTZ. AT THIS TIME, VARIOUS WINDOW AGGREGATES DEFINED ON THE TIMESTAMP_LTZ TYPE CAN AUTOMATICALLY SOLVE THE 8-HOUR TIME ZONE OFFSET PROBLEM WITHOUT THE NEED TO MODIFY AND REVISE THE TIME ZONE ACCORDING TO THE PREVIOUS SQL WRITING.

    Tip: These improvements in Flink SQL regarding time functions and time zone support are version incompatible. When updating the version, you need to pay attention to whether such functions are included in the job logic to avoid business impact after the upgrade.

    ■ 2.4 Daylight saving time support

    Prior to Flink 1.13, it was very difficult for users of foreign daylight saving time zones to do window-related calculations because there was a transition between daylight saving time and winter time.

    Flink 1.13 elegantly supports daylight saving time by supporting the definition of time attributes on TIMESTAMP_LTZ columns, while Flink SQL elegantly supports daylight saving time by cleverly combining TIMESTAMP and TIMESTAMP_LTZ types in WINDOW processing. This is useful for foreign daylight saving time zone users and companies with overseas business scenarios.

    Third, important improvement interpretation

    1. FLIP-152: Improve Hive syntax compatibility FLIP-152 mainly does Hive syntax compatibility

    enhancements, supports some common DML and DQL syntax of Hive, including:

    Hive dialect supports Hive common syntax. Hive has a lot of built-in functions, Hive dialect needs to be used with HiveCatalog and Hive Module, Hive Module provides all the built-in functions of Hive, which can be directly accessed after loading.

    At the same time, we can also create/delete Catalog functions and some custom functions through Hive dialect, which greatly improves the compatibility between Flink SQL and Hive, making it more convenient for users who are familiar with Hive.

    2. FLIP-163: Improvements

    to SQL Client

    Before version 1.13, everyone thought that Flink SQL Client was a gadget nearby. However, FLIP-163 has important improvements in version 1.13:

    1. Through the -i parameter, DDL is loaded and initialized in advance to facilitate the initialization of multiple DDL statements of the table, and there is no need to execute the command multiple times to create the table, replacing the previous table creation by using yaml files;

    2. Support for the -f parameter, where SQL files support DML (insert into) statements;

    3. Support for more useful configurations:

      • through SET SQL-client.verbose = true, open verbose, and print the entire message by enabling verbose, which is easier to track down error information than before by outputting only one sentence;
      • SET execution.runtime-mode=streaming / batch supports setting the batch/stream job mode;
      • Set the job name via SET pipline.name=my_Flink_job;
      • Set the job savepoint

      • path via SET execution.savepoint.path=/tmp/Flink-savepoints/savepoint-bb0dab;
      • For multiple jobs with dependencies, use SET Table.dml-sync=true to select whether to execute asynchronously, such as offline jobs, job A can run job B after running, and set to true to execute a dependent pipeline scheduling.

    4. STATEMENT SET SYNTAX IS ALSO SUPPORTED:


    It is possible that one of our queries is not only written to one sink, but needs to be output to multiple sinks, such as one sink written to jdbc and one sink written to HBase.

      • 2 queries need to be launched before version 1.13 to complete this job;
      • In version 1.13, we can put these into a statement and execute it in the form of a job, which can realize node reuse and save resources.

    3. FLIP-136: Enhanced conversion of DataStream and Table

    While Flink SQL has greatly lowered some of the barriers to entry for us to use real-time computing, the advanced encapsulation of Table/SQL also shields some low-level implementations, such as timer, state, etc. Many advanced users want more flexibility to be able to manipulate DataStream directly, which requires switching between Table and DataStream. FLIP-136 enhances the conversion between Table and DataStream, making it easier for users to convert between the two.

    • passes EVENT TIME and WATERMARK when DataStream and Table conversions are supported;

    Table Table = TableEnv.fromDataStream( dataStream, Schema.newBuilder()  .columnByMetadata("rowtime"," TIMESTMP(3)") .watermark("rowtime", "SOURCE_WATERMARK()")  .build()); )

    • Supports Changelog data streams to and from Table and DataStream.

    //DATASTREAM to Table StreamTableEnvironment.fromChangelogStream(DataStream<ROW>): Table StreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): Table //Table to DATASTREAM>StreamTableEnvironment.toChangelogStream(Table): DataStream<ROW> StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>  

    Flink SQL 1.14

    Future Planning

    1.14 The main plans for the release are as follows:

      delete Legacy Planner:

  • Starting from Flink 1.9, after Ali contributed Blink-Planner, many new features have been developed based on this Blink Planner, and the old Legacy Planner will be completely deleted;

  • Improve Window TVF: support session window, support window TVF allow -lateness, etc.;

  • Improve schema handling: the schema processing capability of the whole link and the improvement of key verification;

  • Enhanced Flink CDC support

  • : Enhance the integration of upstream CDC systems, and more operators in Flink SQL support CDC data streams.

  • V. Summary

    This article explains in detail the core features and important improvements of Flink SQL 1.13.

    • supports Window TVF;

    • Systematically solve time zone and time function problems;

    • Improved compatibility between Hive and Flink;

    • Improve SQL Client;

    • Enhance the conversion of DataStream and Table.

    At the same time, he also shared the community’s future planning for Flink SQL 1.14, I believe that after reading the article, students can have more understanding of the changes in Flink SQL in this version. In the process of practice, you can pay more attention to these new changes and changes, and feel the convenience they bring at the business level.