Based on flink1.14 source code analysis

There are many business parties in the company who are using our Flink sql platform to do TopN calculations, and today a colleague suddenly asked me, how does Flink sql achieve topN?

Hoodwinked, this piece of source code has not been seen ah, the business has to ask what to do, quickly open the source code to make up for

this problem and first calmly analyze the scope

First of all, it must belong to the Flink sql module, the source code must be in the flink-table-planner package, and then topN is not ROW_NUMBER, it is

a function,

so let’s start from the system function of the flink source code as a clue to find it and come class

Sure enough, if you find it, there must be a place in Calcite’s rule that judges it and continues to check the call chain

As expected, FlinkLogicalRankRuleBase, a calcite rule, really determines the type of rank according to the type of this function,

and looks at the matching conditions of this rule

 It is also easy to understand here that overAgg will judge the rank and the corresponding type

This is just to do a simple extraction of the rank field ah, extract the predicate, extract the expression ah these operations to take the information

and then directly generate a new relNode called FlinkLogicalRank through transformTo directly return this equivalent node

Since it is relNode, there will definitely be a calcite rule to deal with it, come and find it

The batch one doesn’t matter, you can see from the name that we are looking for the class without


 Returning StreamPhysicalRank

This class is a FlinkPhysicalRel that can be converted to execNode

here in one more word

Here the partitionkey is passed in, which is the partition by in sql, and this will be used later to create a transformation keySelecter to divert data

The returned StreamExecRank is an operator that can be converted into a specific Flink

, and the specific logic is in it,

next look at the specific logic of the row_number, and find the method translateToPlanInternal

According to the policy, there are three main types

of AppendFastStrategy (input contains only when inserted)


RetractStrategy (input contains update and delete).

UpdateFastStrategy (when the input should not contain deletions and the input has a given primaryKeys and is sorted by field).

Let’s take a look at RetractStrategy

First get a comparator for sorting RowData through the sort field ComparableRecordComparator

creates a RetractableTopNFunction based on the comparator

This class also has two main state data structures

dataState This map is used to store when all the data with the same key will be placed in the same

listtreeMapThis sortable map is to sort the data through the sort by defined in our SQL above. Long refers to how many records there are for this same key

!!!!!!!!!!!  That is to use Java’s treeMap sorting

to continue to look down

The main logic is this Each

time a piece

of data is entered, it will be divided according to the type of data

when the data is Insert, and the UPDATE_AFTER type will go to the emitRecordsWithRowNumber() method

When the data is UPDATE_BEFORE, the DELETE type goes to the retractRecordWithRowNumber () method

Let’s look at the specific logic first look at INSERT

Iterate through treeMa p

To interpret it, when the data is insert

data, the

INSERT data will be put into the treeMap first, and delete will not

traverse the treeMap in order

When the traversal process finds that the traversed key is the same as the key of the current data

, all data data (LIST in dataState) that is the same as the current data key are withdrawn and their rowNumber+1

is updated to continue traversing the treeMap

All the data after that is withdrawn from UpdateBefore, and UpdateAfter is sent downstream to make rowNumber+1, traversed until it has reached the end of the TopN data loop When the

data is of type DELETE, it will be reversed with Insert. The data after the current key is all withdrawn

, and then the entire processing process of rowNumber-1 is

almost over, and it can be seen that rowNumber When N is large and the sorting changes frequently, the performance consumption is still very large, and in extreme cases, the data of the swimmer will be multiplied many times

Note that there is also a parameter in the other two strategies, table.exec.topn.cache-size

affects the size of the local lruCache below

Zooming up can reduce the access to the status, you can add the

address of this document as needed:


public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.

like + Looking, less bugs 👇