Based on flink1.14 source code analysis
There are many business parties in the company who are using our Flink sql platform to do TopN calculations, and today a colleague suddenly asked me, how does Flink sql achieve topN?
Hoodwinked, this piece of source code has not been seen ah, the business has to ask what to do, quickly open the source code to make up for
this problem and first calmly analyze the scope
First of all, it must belong to the Flink sql module, the source code must be in the flink-table-planner package, and then topN is not ROW_NUMBER, it is
a function,
so let’s start from the system function of the flink source code as a clue to find it and come org.apache.calcite.sql.fun.SqlStdOperatorTable class
Sure enough, if you find it, there must be a place in Calcite’s rule that judges it and continues to check the call chain
As expected, FlinkLogicalRankRuleBase, a calcite rule, really determines the type of rank according to the type of this function,
and looks at the matching conditions of this rule
It is also easy to understand here that overAgg will judge the rank and the corresponding type
This is just to do a simple extraction of the rank field ah, extract the predicate, extract the expression ah these operations to take the information
and then directly generate a new relNode called FlinkLogicalRank through transformTo directly return this equivalent node
Since it is relNode, there will definitely be a calcite rule to deal with it, come and find it
The batch one doesn’t matter, you can see from the name that we are looking for the class without
window
Returning StreamPhysicalRank
This class is a FlinkPhysicalRel that can be converted to execNode
here in one more word
Here the partitionkey is passed in, which is the partition by in sql, and this will be used later to create a transformation keySelecter to divert data
The returned StreamExecRank is an operator that can be converted into a specific Flink
, and the specific logic is in it,
next look at the specific logic of the row_number, and find the method translateToPlanInternal
According to the policy, there are three main types
of AppendFastStrategy (input contains only when inserted)
and
RetractStrategy (input contains update and delete).
UpdateFastStrategy (when the input should not contain deletions and the input has a given primaryKeys and is sorted by field).
Let’s take a look at RetractStrategy
First get a comparator for sorting RowData through the sort field ComparableRecordComparator
creates a RetractableTopNFunction based on the comparator
This class also has two main state data structures
dataState This map is used to store when all the data with the same key will be placed in the same
listtreeMapThis sortable map is to sort the data through the sort by defined in our SQL above. Long refers to how many records there are for this same key
!!!!!!!!!!! That is to use Java’s treeMap sorting
to continue to look down
The main logic is this Each
time a piece
of data is entered, it will be divided according to the type of data
when the data is Insert, and the UPDATE_AFTER type will go to the emitRecordsWithRowNumber() method
When the data is UPDATE_BEFORE, the DELETE type goes to the retractRecordWithRowNumber () method
Let’s look at the specific logic first look at INSERT
Iterate through treeMa p
To interpret it, when the data is insert
data, the
INSERT data will be put into the treeMap first, and delete will not
traverse the treeMap in order
When the traversal process finds that the traversed key is the same as the key of the current data
, all data data (LIST in dataState) that is the same as the current data key are withdrawn and their rowNumber+1
is updated to continue traversing the treeMap
All the data after that is withdrawn from UpdateBefore, and UpdateAfter is sent downstream to make rowNumber+1, traversed until it has reached the end of the TopN data loop When the
data is of type DELETE, it will be reversed with Insert. The data after the current key is all withdrawn
, and then the entire processing process of rowNumber-1 is
almost over, and it can be seen that rowNumber When N is large and the sorting changes frequently, the performance consumption is still very large, and in extreme cases, the data of the swimmer will be multiplied many times
Note that there is also a parameter in the other two strategies, table.exec.topn.cache-size
affects the size of the local lruCache below
Zooming up can reduce the access to the status, you can add the
address of this document as needed:
end
public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.
like + Looking, less bugs 👇