Flink source code, job submission process, job scheduling process, job internal conversion flowchart
1, and Flink SQL have been used?
2. Flink is called the integration of flow batches, from which version does it really realize the integration of flow batches?
3. Which parser does Flink SQL use?
4. What are the main functions of Calcite?
5. What about the Flink SQL processing process?
6. What optimization rules does Flink SQL contain?
7. What operations are involved in Flink SQL?
8. Has Flink Hive ever been used?
9. What did Flink do when integrating with Hive?
10 What methods does the HiveCatalog class contain?
11, Flink SQL 1.11 has added the real-time data warehouse function, introduction?
12, Flink-Hive real-time write data introduction?
13, Flink-Hive real-time reading data introduction?
14. When Flink-Hive writes data in real time, how to ensure that the data that has been written to the partition can be visible to the downstream?
15. What is the introduction of PartitionCommitTrigger submitted by partitions in the source code?
16 How does PartitionTimeCommitTigger know which partitions to commit? (Source
code analysis)
17、How to ensure that the data that has been written to the partition is visible to the downstream flag problem (source code analysis)18、
Has Flink SQL CEP been contacted?
19. What are the parameters that Flink SQL CEP understands?
20. Write a CEP SQL case, such as bank card theft
21, does Flink CDC understand? What are Flink SQL CDC Connectors?
22. Introduction to the principle of
Flink CDC
23. Design a real-time data warehouse integrating Flink SQL acquisition + calculation + transmission (ETL) through
CDC
24. How does Flink SQL CDC achieve consistency assurance (source code analysis) 25.
Do
you understand Flink SQL GateWay?
26, Flink SQL GateWay creation session explanation?
27 How does Flink SQL GateWay handle concurrent requests? How do I handle multiple submissions?
28. How to maintain the correlation between multiple SQL Statements?
29. How do I submit SQL strings to the cluster as code?
1. Has Flink SQL ever been used?
In Flink,
there are four levels of abstraction, and Flink SQL, as the top, is a first-class citizen of the Flink API
in standard SQL SQL statements contain four types of
DML (Data Manipulation Language): Data manipulation language, which is used to define database records (data).
DCL (Data Control Language): Data control language used to define access rights and security levels.
DQL (Data Query Language): Data query language, used to query records (data).
DDL (Data Definition Language): Data definition language, used to define database objects (libraries, tables, columns, etc.).
Flink SQL includes DML data manipulation language, DDL data language, DQL data query language, and does not include DCL language.
2. Flink is called the integration of flow batches, from which version does it really realize the integration of flow batches?
Starting from version 1.9.0, Alibaba’s Blink was introduced, the FIink TabIe & SQL module was significantly refactored, while retaining Flink Planner, Blink PIanner was introduced, before the introduction, Flink did not consider the unification of flow batch operations, for flow batch jobs, the bottom layer implemented two sets of code, after introduction, based on the concept of flow batch integration, redesign the operator, With flow at its core, both flow jobs and batch jobs are eventually converted to transformations.
3. Which parser does Flink SQL use?
Flink SQL uses Apache Calcite as a parser and optimizer.
Calcite is a dynamic data management framework that has many typical database management system functions such as SQL parsing, SQL verification, SQL query optimization, SQL generation, and data connection query, but omits some key functions, such as Calcite does not store relevant metadata and basic data, does not fully contain relevant processing data algorithms, etc.
4. What are the main functions of Calcite?
Calcite mainly contains the following five parts:
1, SQL
parsing (Parser)
Calcite SQL parsing is implemented through JavaCC, using JavaCC to write SQL syntax description files, and parse SQL into unverified AST syntax trees.
2. SQL validation (Validato)
verification is
divided into two parts
of stateless verification: that is, to verify whether the SQL statement meets the specification.
Stateful check: that is, verify whether the schema, field, and function in SQL exist and whether the input and output types match by combining with metadata.
3. SQL query
optimization
optimizes the output (RelNode, logical plan tree) of the previous step, and obtains the physical execution plan after optimization, there are two kinds of optimization: rule-based optimization and cost-based optimization, which will be described in detail later.
4
. SQL generation
generates physical execution plans into executable programs on specific platforms/engines, such as generating SQL query statements that conform to different platform rules such as MySQL or Oracle.
5. Data connection and
execution
execute queries through various execution platforms to obtain output results.
In Flink or other big data engines that use Calcie, SQL query optimization is generally completed, and each platform combines Calcite SQL code generation and platform implementation code generation, and combines the optimized physical execution plan into executable code, which is then compiled and executed in memory.
5. What about the Flink SQL processing process?
The following is an example to describe the processing flow of Flink SQL in detail, as shown in the following figure:
Let’s write a source table with the source of kafka, and when we execute the create table log_kafka, Flink SQL will do the following:
(1) First, the underlying layer of FlinkSQL uses the apache Calcite engine to process SQL statements, Calcite will use javaCC for SQL parsing, javaCC generates a series of java code according to the Parser.jj file defined in Calcite, and the generated java code will convert SQL into an AST abstract syntax tree (that is, SQLNode type).
(2) The generated SqlNode abstract syntax tree, which is an unvalidated abstract syntax tree, then SQL Validator will obtain metadata information in Flink Catalog to verify SQL syntax, metadata information checks include table name, field name, function name, data type and other checks. Then generate a validated SqlNode.
(3) After reaching this step, only the SQL is parsed to the fixed node of the java data structure, and the association relationship between the relevant nodes and the type information of each node are not given.
Therefore, it is also necessary to convert SqlNode to a
logical plan, that is, LogicalPlan, during the conversion process, the SqlToOperationConverter class will be used to convert SqlNode to Operation, and Operation will perform operations such as creating tables or deleting tables according to SQL syntax, while FlinkPlannerImpl.rel() The method converts SQLNode into a RelNode tree and returns RelRoot.
(4) Step 4 will perform the Optimize operation to optimize the logical plan according to the predefined optimization rules RelOptRule.
There are two types of optimizers in Calcite RelOptPlanner, HepPlanner based on rule optimization (RBO) and VolcanoPlanner based on cost optimization (CBO). Then get the optimized RelNode, and then convert the optimized logical plan into a physical plan based on the rules in Flink.
(5) Step 5 Execute the execute operation, which will generate transformation through the code, and then recursively iterate through each node to convert DataStreamRelNode into DataStream, during which the DataStreamUnion, DataStreamCalc, and DataStreamScan classes will be recursively called translateToPlan method. Recursively calling the translateToPlan of each node is actually using CodeGen elements to program various operators of Flink, which is equivalent to directly using Flink’s DataSet or DataStream to develop programs.
(6) Finally, it is further compiled into an executable JobGraph for submission and running.
6. What optimization rules does Flink SQL contain?
The following figure shows the execution flowchart
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MUUpHtLXNMKeexr7s9FA1teN9vN565qNRUu2fcRJaicmpWsHt3Fq9gvqg/640?wx_fmt=png” >
summary:
First parse, then verify, convert SqlNode into Operation to create a table, and then call the rel method to turn sqlNode into a logical plan (RelNodeTree) and then optimize the logical plan;
Before optimization, four
rules are preprocessed according to HepPlanner based on rule-based optimization in Calcite, and Logic RelNode is obtained after processing, and then VolcanoPlanner using cost optimization uses Logical_Opt_Rules (logical plan optimization) to find the optimal execution Planner and convert it to FlinkLogical RelNode。
Finally, the optimization rules included in Flink are used, such as DataStream_Opt_Rules: Streaming Optimization, DataStream_Deco_Rules: Decorative Streaming Optimization to convert the optimized logical plan into a physical plan.
Optimization rules include the following
:
Table_subquery_rules: Subquery optimization
Expand_plan_rules: Scaling Plan Optimization
Post_expand_clean_up_rules: Scaling Plan Optimization
Datastream_norm_rules:
Normalize stream processing
Logical_Opt_Rules: Logical Planning
Optimization
DataStream_Opt_Rules
: Streaming Computing Optimization
DataStream_Deco_Rules: Decorative Streaming Computing Optimization
7. What operations are involved in Flink SQL?
First of all, what is
Operation
In Flink SQL, the DDL, DML, and DQL operations involved are all Operations, which is said inside Flink that Operation can correspond to SqlNode.
Operation execution Before optimization, the function executed is executeQperation, as shown in the figure below, for all the operations executed.
8. Has Flink Hive ever been used?
The Flink
community has made significant changes in Flink version 1.11, as shown in the following image:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MUo1e4icKYQDdGXU2hxGB9tC2YZSeiaxEgmBla2dZH4wsCXMMf9W4yFiaZA/640?wx_fmt=png”>
9、 What does Flink do when integrating with Hive?
The following is the execution diagram when Flink connects to HIve:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MUjQ40nTDXShIqADibfjxLasKAWAyaYFVwpg12LhRKUhH0dHDicoYTyhFA/640?wx_fmt=png”>
(1) Flink 1.1 introduces the Hive dialect, so you can write HIve syntax, that is, Hive Dialect, in Flink SQL.
(2) After writing HIve SQL, FlinkSQL Planner will parse, verify, convert SQL into logical plans, physical plans, and finally become Jobgraph.
(3) HiveCatalog, as a persistent medium for Flink and Hive’s table elements, will store Flink metadata for different sessions in the Hive Metastore. HiveCatalog allows users to store hive tables or Kafka tables in the Hive Metastore.
BlinkPlanner is a newly introduced mechanism in Flink version 1.9, and Blink’s query processor realizes the unification of the stream batch job interface, and the underlying API is Transformation. Truly realize the unified processing of stream & batch, replacing the original Flink Planner to separate flow & batch processing. After version 1.11 it has defaulted to Blink Planner.
10 What methods does the HiveCatalog class contain?
The key method is as follows:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MUWVLribMWIviabpmib5HvrwhbSyGwELv5OmYKZRy3vBLdxUzwPgapX7aRA/640?wx_fmt=png”>
HiveCatalog is mainly persistent metadata, so the general creation types include, such as database, Table, View, Function, Partition, and is_Generic field judgment.
11, Flink SQL 1.11 has added the real-time data warehouse function, introduction?
A
major feature added to Flink version 1.11 is the real-time data warehouse, which can insert data from kafka into Hive in real time, and the traditional real-time data warehouse is based on Kafka + Flinkstreaming, defining the whole process of stream computing jobs, with real-time performance in seconds or even milliseconds, but one of the problems with real-time data warehouses is that the historical data is only 3-15 days, and ad-hoc queries cannot be made on it.
In response to this feature, Flink version 1.11 re-modifies FlieSystemStreaming Sink to add partition commit and rolling policy mechanisms, allowing HiveStreaming sink to reuse file system stream receivers.
In the Table/SQL API of Flink 1.11, the FileSystemConnector is implemented by the enhanced StreamingFileSink component, named StreamingFileWriter in the source code. Only when the checkpoint succeeds will the file written by StreamingFileSink change from the Pending state to the Finished state, so that it can be safely read downstream. So, be sure to turn on Checkpointing and set a reasonable interval.
12, Flink-Hive real-time write data introduction?
StreamingWrite, which fetches data in real time from kafka, writes data from Kafka to Hive tables using partitioned commit, and runs batch queries to read that data.
Flink -SQL is written:
Flink-table writing :
13, Flink-Hive real-time reading data introduction?
As shown in the following figure:
First of all, you can take a look at the real-time storage of data in the Hive data warehouse, FileSystemConnector in order to adapt to the Flink-Hive integration environment, the biggest change is partition submission, the official document gives that the partition can take the date + hour strategy, or the hour, minute and second strategy.
So how to ensure when the data that has been written to the partition will be visible to the downstream?
This is related to the triggering mechanism, which includes process-time and partition-time as well as delay.
partition-time refers to the firing of partitions extracted according to the time of the event. When ‘watermark’ > ‘partition-time’ + ‘delay’, select partition-time data to submit successfully
process-time refers to the trigger according to the system processing time, when the delay is added, if you want the partition to commit, when ‘currentprocessing time’ > ‘partition creation time’ + ‘delay’ select process-time data can be submitted successfully.
However, choosing the process-time trigger mechanism will have a flaw, that is, when the data is late or the program fails to restart, the data cannot be classified into the correct partition according to the event time. Therefore, partition-time is generally chosen.
15. What is the introduction of PartitionCommitTrigger submitted by partitions in the source code?
In the source code, the PartitionCommitTrigger class diagram is shown below
Two pairs of necessary information are maintained in this class
:
1, pendingPartitions / pendingPartitionsState: the partition waiting to be committed and the corresponding state;
2, watermarks / watermarksState: watermarks (stored in TreeMap to ensure order) and the corresponding state.
16 How does PartitionTimeCommitTigger know which partitions to commit? (Source code analysis)
1. Check whether the checkpoint ID is legitimate;
2. Take out the watermark corresponding to the current checkpoint ID and call the headMap() and clear() methods of TreeMap to delete the watermark data earlier than the current checkpoint ID (useless);
3. Traverse the partitions waiting to be committed, and call the previously defined PartitionTimeExtractor.
(e.g. ${year}-${month}-${
day} ${hour}:00:00) extract the partition time.
If watermark > partition – time + delay, it means that it can be submitted and returned
to
them.17 How to ensure that the data that has been written to the partition is visible to the downstream flag problem (source code analysis)
in the source code, mainly involving the PartitionCommitPolicy class, as shown in the following figure:
18. Has Flink SQL CEP been contacted?
The concept of CEP:
Complex Event Processing, which is used to identify events in the input stream that meet specified rules and output them in a specified manner.
Get up – > wash – > eat – > to work, a
series of connected event flows form a
pattern
of browsing goods – > add to cart – > create an order – > payment completed – > delivery – > receipt, the pattern formed by the event flow.
As can be understood by concept, CEP mainly identifies events with some basic rules specified by the user in the input stream, and then outputs these events in a specified way.
As shown in the following figure: we specify the events of “square, circle” as the basic rule, and output these events as a result stream in the raw stream of input.
CEP usage scenario:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MUocicvxJZ7yshG2HNfwzB8YfeCDEU7icOicDzZWw2MIjnN1ibnVEcGSvDfA/640?wx_fmt=png”>
like user anomaly detection: we specify anomalous action events as the result stream to output ;
Strategic marketing: designate events that meet the requirements as the result stream;
O&M monitoring: Specify a certain range of metrics as the result stream;
Bank card fraud: Specifies that being swiped twice in two places at the same time is an abnormal result stream.
Flink CEP SQL syntax is complex event processing in SQL, but it is also different from Flink SQL syntax, which contains many rules.
19. What are the parameters that Flink SQL CEP understands?
CEP contains the following parameters:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MURnB50Fc7K81d2IhMkKtS3oUlOOVFJN8dLqLpxQIvT7xVxKbNwx8xDQ/640?wx_fmt=png” >
parameter introduction
Output mode (how many rows should
be output for each found match)
one row per
match will
be summarized and
output every time a complete match
is detected (flink is not supported)
After detecting a complete match, each specific record in the matching process will be output
running VS final semantics
Running events that use those matches in calculations
can only be used
in running matches and final matching end
define statements, both of which can
output
results differently
for one row per match, and output is no different
for all rows per match, the output is different
introduction to the jump mode after matching
After match
,
skip to next row
-
next match starts with the next event in the last event in the successful matching event sequence
> The next match starts from the next event of the first event in the successful matching event sequence the
skip
to first pattern Item
- the
-
next match starts from the first event in the sequence of successful matches corresponding to patternItem
skip
to last pattern Item
-
the next match starts from the last event in the sequence of successful matching events corresponding to patternItem
Note: Using skip to first / last patternItem is prone to circular matching problems, you need to be cautious
20. Write a CEP SQL case
, such as bank card fraud
written through Flink CEP SQL about financial scenarios Bank card theft case.
Case introduction: In the financial scene, sometimes there will be bank card fraud, criminals use the Internet and other technologies, in an interval of 10 minutes or less, so that a bank card in different two places appear multiple swipe records, which from the conventional operation, in the case of a lot of interval, the user can not be in two cities at the same time for card transactions, so this problem, the need to make a trigger alarm mechanism in the background.
Requirement: When the same cardId occurs from two different locations within ten minutes, an alarm mechanism is triggered to detect credit card fraud.
1) When writing CEP SQL, there are many skills, first we write the most basic query statement, query the required fields from a table.
select starttime,endtime,cardId,event from dataStream
(2)match_recognize();
This field is a prerequisite for CEP SQL and is used to generate an append table in which all CEP SQL is written.
(3) Partition
,
sorting because it is the same ID, so you need to use partition by, but also according to the time to sort order by
(4) Understand the order of
CEP SQL core
writing order as shown above icon
CEP SQL class is Pattern, detect the card swipe phenomenon in two places within 10 minutes, so define two events:
Pattern (e1 e2+) within interval ’10’ minute
defines the judgment statement required in the pattern, specifying the use of define
define
e1 as a1.action = ”e2
as e2.action = ” and e2.location <> e1.location
According to the above input
conditions, the output conditions are constructed, and measures
measures
e2.action as event
e1.timestamp as starttime
last(e2.timestamp) as endtime
output
conditions are successfully matched, and one output is output, Prescribed writing method (this block writes
different statements according to different rules)
one row per match
jumps to the next line after matching (different statements are written according to different rules)
after match skip to next row
21, Flink CDC understand? What are Flink SQL CDC Connectors?
In Flink 1.11, the CDC mechanism was introduced, and the full name of CDC is Change Data Capture, which is used to capture the addition, deletion, modification and query operations of database tables, which is a very mature synchronous database change scheme.
Flink CDC Connectors is a set of source connectors for Apache Flink, which is a source connector that can directly read full and incremental data from MySQL and PostgreSQL data https://github.com/ververica/flink-cdc-connectors.
Version 1.13 supports the following connectors:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MUCoiciaPP604fjL2Kl4ucyPXMeEBc5KMrWf9rKP3aze5OUNnenzSRoHIQ/640?wx_fmt=png”>
In addition, it supports parsing Change Logs in debezium-json and canal-json formats in Kafka, performing calculations through Flink or writing directly to other external data storage systems (such as Elasticsearch), or writing Flink data in Changelog JSON format to Kafka:
Version mapping between Flink CDC Connectors and Flink:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RtmweybaDw0CsWCmuIj35MUj7GPGIuXWBibwKbaDLZySAGq1wXE59MJLzgRoFxRXwaRR3Vk0dt4I7A/640?wx_fmt=png”>
22、 In
the
latest CDC research report, Debezium and Canal are the most popular CDC tools at present, and the core principle of these CDC tools is to extract database logs to obtain changes. After a series of investigations, Debezium (which supports full, incremental synchronization, and supports MySQL, PostgreSQL, Oracle and other databases) is currently widely used.
Flink SQL CDC has a built-in Debezium engine, which uses its ability to extract logs for changes and convert changelog into RowData data that Flink SQL knows. (Debezium’s data format is on the right below, and Flink’s RowData data format is on the left.)
RowData represents a row of data, and there will be a metadata information on RowData RowKind, which includes insert (+I), before update (-U), after update (+U), delete (-D), which is very similar to the concept of binlog in the database.
The data collected through Debezium contains the old data (before) and new data rows (after) and the original data information (source), the u of op is the update operation identifier (the values c, u, d, r of the op field correspond to create, update, delete, reade, respectively), ts_ms represents the timestamp of the synchronization.
23、 The design diagram of a real-time data warehouse integrating Flink SQL acquisition + calculation + transmission (ETL) through CDC
is as follows:
Replace the data acquisition module of Debezium + Kafka through Flink CDC connectors to achieve Flink SQL acquisition + computing + transmission (ETL) integration, with Mysql as the Source source, Flink CDC middleware as the plug-in, ES or Kafka, or other Sink, the advantages of this design are as follows:
out-of-the-box, easy to use
and reduce maintenance components, Simplify real-time links, reduce deployment costsReduce end-to-end
latencyFlink supports exactly once’s read and compute
data without landing, reduces storage costs
,
supports full and incremental streaming reads
binlog collection sites can be traced back
to 24, Flink SQL How CDC Implements Consistency Assurance (Source Code Analysis)Flink
SQL CDC’s Source function for obtaining the database change log is DebeziumSourceFunction, and the final return type is RowData, which implements CheckpointedFunction, that is, it is guaranteed to occur through the Checkpoint mechanism No number loss when failure occurs, and exactly once semantics are implemented, which is clearly explained in the comments of the function.
To implement the
CheckpointedFunction, you need to implement the following two methods:
public interface CheckpointedFunction {
Take a snapshot and save the data in memory in the checkpoint state
void snapshotState(FunctionSnapshotContext var1) throws Exception;
Recover data from checkpoint state after program exception recovery
void initializeState(FunctionInitializationContext var1) throws Exception;
Let’s see what states are recorded in the DebeziumSourceFunction.
/** Accessor for state in the operator state backend.
*
The read binlog file and displacement information are recorded in the offsetState, corresponding to
the */
private transient ListState
/**
* State to store the history records, i.e. schema changes.
historyRecordsState records information such as schema changes
@see FlinkDatabaseHistory
*/
private transient ListState
We found that in Flink SQL CDC is a relatively simple scenario, there is no intermediate operator, and Exactly Once is achieved by persisting binglog consumption displacement and schema change information snapshots through Checkpoint.
25 Does Flink SQL GateWay understand?
Flink SQL GateWay concept:
FlinkSql Gateway is the “task gateway” of Flink clusters, which supports submitting queries, inserts, deletions, and other tasks in the form of restapis, as shown in the following figure
the
overall architecture is shown in the following figure:
26, Flink SQL GateWay creation session explanation?
The flowchart for creating a session is as follows:
public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.
like + Looking, less bugs 👇