cost estimates
In cost-based optimizers, cost estimation is very important and directly affects the generation of candidate plans. Cost estimation in Flink relies on each different operator providing its own “budget”, and in this article we will analyze what costs are, how operators provide their own budgets, and how to estimate costs based on budgets.

what is cost
Flink defines costs in the form of Costs- which encapsulates some of the factors of cost estimation and provides some calculation methods (addition, subtraction, multiplication, division) for cost objects, as well as the identification and verification of unknown values of these factors.

the word “cost” is also translated as: cost, cost, just treat it as synonymous.

Flink currently divides the factors of cost estimation into two broad categories:

QUANTIFIABLE COST ESTIMATION FACTOR: REFERS TO THE COST ESTIMATION FACTOR (SUCH AS THE NUMBER OF BYTES OF NETWORK OR I/O) THAT CAN BE CALCULATED BY TRACKING A QUANTIFIABLE MEASUREMENT METRIC;
heuristic cost estimation factors: refers to those cost estimation factors that are not quantifiable, so only some qualitative empirical values can be given;
the factors currently included in the cost estimate are as follows:

  • network costs;
  • DISK I/O COSTS;
  • CPU COST;
  • heuristic network costs;
  • heuristic disk costs;
  • HEURISTIC CPU COST;
  • Quantifiable cost estimate factors may often be set to unknown (UNKNOWN, expressed in Costs as a literal constant value of -1). When quantifiable cost estimators are left unknown, the cost of all operations becomes unknown, so this will result in the inability to decide which biased operation to take during optimization trimming. In this case, the heuristic cost estimator must work, and it should contain a value to ensure that operators executed with different strategies are comparable (even in cases where they cannot be estimated).

how to estimate costs
CostEstimator defines a series of methods for increasing costs that need to be implemented by specific estimators, which are broadly divided into three broad categories:

increase the cost of the transport policy;
increase the cost of local strategies;
increase the cost of barriers;
CostEstimator can use the above types of methods to complete the calculation of the total cost of an operator, and the specific calculation logic is encapsulated in the method costOperator, which receives a plan node (PlanNode) parameter, and then enumerates and calculates according to the transmission policy and the local policy. The complete method is as follows:

public void costOperator(PlanNode n) {
//构建一个成本对象用来存储总成本
final Costs totalCosts = new Costs();
//获得该节点的最少可用内存
final long availableMemory = n.getGuaranteedAvailableMemory();
//-----------------------------
// 增加传输策略产生的成本
//-----------------------------

//遍历该节点的所有输入端通道    
for (Channel channel : n.getInputs()) {
    final Costs costs = new Costs();

    //匹配当前通道的传输策略
    switch (channel.getShipStrategy()) {
        case NONE:
            throw new CompilerException(
                "Cannot determine costs: Shipping strategy has not been set for an input.");
        case FORWARD:
            break;
        //随机重分区
        case PARTITION_RANDOM:    
            addRandomPartitioningCost(channel, costs);
            break;
        //哈希分区与自定义分区增加成本的方式相同
        case PARTITION_HASH:
        case PARTITION_CUSTOM:
            addHashPartitioningCost(channel, costs);
            break;
        //范围分区
        case PARTITION_RANGE:    
            addRangePartitionCost(channel, costs);
            break;
        //广播
        case BROADCAST:
            addBroadcastCost(channel, channel.getReplicationFactor(), costs);
            break;
        //强制重平衡分区
        case PARTITION_FORCED_REBALANCE:
            addRandomPartitioningCost(channel, costs);
            break;
        default:
            throw new CompilerException("Unknown shipping strategy for input: " 
                + channel.getShipStrategy());
    }

    //匹配当前通道的本地策略
    switch (channel.getLocalStrategy()) {
        case NONE:
            break;
        //排序与合并排序都增加本地的排序成本
        case SORT:
        case COMBININGSORT:
            addLocalSortCost(channel, costs);
            break;
        default:
            throw new CompilerException("Unsupported local strategy for input: " 
                + channel.getLocalStrategy());
    }

    //增加屏障成本
    if (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE) {
        addArtificialDamCost(channel, 0, costs);
    }

    //如果通道在动态路径上,则需要调整成本计算的权重
    if (channel.isOnDynamicPath()) {
        costs.multiplyWith(channel.getCostWeight());
    }

    //将当前通道的成本加入总成本
    totalCosts.addCosts(costs);
} 

Channel firstInput = null;
Channel secondInput = null;
Costs driverCosts = new Costs();
int costWeight = 1;

//如果节点在动态路径上,则重新调整成本权重
if (n.isOnDynamicPath()) {
    costWeight = n.getCostWeight();
}

//获得当前节点的所有输入端通道
{
    Iterator<Channel> channels = n.getInputs().iterator();
    if (channels.hasNext()) {
        firstInput = channels.next();
    }
    if (channels.hasNext()) {
        secondInput = channels.next();
    }
}

//根据计划节点的执行策略来计算本地成本
switch (n.getDriverStrategy()) {
    //以下这些执行策略不计算本地成本
    case NONE:
    case UNARY_NO_OP:
    case BINARY_NO_OP:
    case MAP:
    case MAP_PARTITION:
    case FLAT_MAP:

    case ALL_GROUP_REDUCE:
    case ALL_REDUCE:

    case CO_GROUP:
    case CO_GROUP_RAW:
    case SORTED_GROUP_REDUCE:
    case SORTED_REDUCE:

    case SORTED_GROUP_COMBINE:

    case ALL_GROUP_COMBINE:

    case UNION:

        break;

    //各种形式的合并成本
    case INNER_MERGE:
    case FULL_OUTER_MERGE:
    case LEFT_OUTER_MERGE:
    case RIGHT_OUTER_MERGE:
        addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
        break;

    //混合哈希join的成本(第一个输入边是构建边,第二个输入边是扫描边)
    case HYBRIDHASH_BUILD_FIRST:
    case RIGHT_HYBRIDHASH_BUILD_FIRST:
    case LEFT_HYBRIDHASH_BUILD_FIRST:
    case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
        addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
        break;

    //混合哈希join的成本(第二个输入边是构建边,第一个输入边是扫描边)
    case HYBRIDHASH_BUILD_SECOND:
    case LEFT_HYBRIDHASH_BUILD_SECOND:
    case RIGHT_HYBRIDHASH_BUILD_SECOND:
    case FULL_OUTER_HYBRIDHASH_BUILD_SECOND:
        addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
        break;

    //各种其他的执行策略
    case HYBRIDHASH_BUILD_FIRST_CACHED:
        addCachedHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
        break;
    case HYBRIDHASH_BUILD_SECOND_CACHED:
        addCachedHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
        break;
    case NESTEDLOOP_BLOCKED_OUTER_FIRST:
        addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
        break;
    case NESTEDLOOP_BLOCKED_OUTER_SECOND:
        addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
        break;
    case NESTEDLOOP_STREAMED_OUTER_FIRST:
        addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
        break;
    case NESTEDLOOP_STREAMED_OUTER_SECOND:
        addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
        break;
    default:
        throw new CompilerException("Unknown local strategy: " + n.getDriverStrategy().name());
}

//将驱动器的执行成本加入到总成本,将得到的总成本作为当前节点的成本
totalCosts.addCosts(driverCosts);
n.setCosts(totalCosts);


DefaultCostEstimator inherits from CostEstimator as the default (and unique) cost estimator. It implements a series of addXXX methods called in the calculating cost logic above to increase the cost. Most of these methods, in turn, rely on the budget data provided by the estimate Provider (EstimateProvider), and then use this budget data to calculate according to different algorithmic logic that increases costs. For example, we take the addBroadcastCost method that adds broadcast cost as an example, in fact, the broadcast transmission method is to copy the data to all output channels of the current operator, so the calculation of the cost here depends on the replication factor, the code is as follows:

public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
//检查复制因子的合法性
if (replicationFactor <= 0) {
throw new IllegalArgumentException("The replication factor of must be larger than zero.");
}
if (replicationFactor > 0) {
    //所估算的需要输出数据的大小
    final long estOutShipSize = estimates.getEstimatedOutputSize();
    //如果数据大小小于等于零,则标记网络成本为“未知”
    if (estOutShipSize <= 0) {
        costs.setNetworkCost(Costs.UNKNOWN);
    } 
    //否则网络成本拿数据大小乘以复制因子
    else {
        costs.addNetworkCost(replicationFactor * estOutShipSize);
    }
    //增加启发式网络成本,通过启发式成本基数乘以复制因子后再扩大十倍
    costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor);
} else {
    costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000);
}
}


budget provider
Earlier we talked about how to estimate costs through CostEstimator, but in fact, CostEstimator applies relevant algorithms to calculate costs on the basis of the budget data obtained, and the budget data used to estimate costs is actually from estimate Providers (EstimateProvider). All operators in the Flink batch have an internal representation based on the optimizer, which we can call the optimizer operators, which were created before the optimization operation, and they all had to implement the EstimateProvider interface. Each optimizer operator exposes information related to cost estimates to external queries based on their own implementation and semantics. The information currently included in the budget is:

Output traffic size: provided by the interface method getEstimatedOutputSize;
Number of records output: provided by the interface method getEstimatedNumRecords;
Average number of bytes recorded in a single output: provided by the interface method getEstimatedAvgWidthPerOutputRecord;
Under the dag package, the inheritance diagram of the EstimateProvider interface is as follows:

Where OptimizerNode is the base class inherited by all optimized operators, so all optimizer operators are budget providers. OptimizerNode provides a unified budget calculation method for most optimizer operators,computeOutputEstimates.

Why do you say that most operators are there? Because some operators are special, such as the dual-input union operator BinaryUnionNode and iteration-related operators.

All operators are traversed as they are optimized, and Flink provides a number and budget traverser (IdAndEstimatesVisitor) to traverse all operators one by one and calculate the budget, which is reflected in the following line of code for Optimizer’s compile method:

rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
1
In IdAndEstimatesVisitor’s postVisit method, the compareOutputEstimates method is called to calculate the budget. Let’s analyze how the budget is calculated, and in general, the logic of the componentOutputEstimates is divided into two parts:

each specific operator calculates their specific budget;
Override the original budget calculations according to compiler Hints;
OptimizerNode defines the budget computation of a particular operator as an abstract method called componentOperatorSpecificDefaultEstimates, open to derived classes to implement according to their own specific logic. Then, if the operator is set with CombineHints, it will overwrite the original budget result according to The CompilerHints.

The so-called CompilerHints, which encapsulate compilation hints that describe the behavior of user functions, can be used to improve the optimizer’s choice of plans. If you set a compilation hint for an operator, it will be used to override the budget for intermediate results given by the operator itself when calculating the budget. Currently, CompilerHints doesn’t get much of a chance in the optimizer.

Because ComparHints is not widely used, the calculation of the budget still depends on the specific provision of each operator, so we pay attention to the compositeOperatorSpecificDefaultEstimates method. The method is implemented entirely according to the semantic characteristics of the concrete operators, and we choose to look at several of them:

the budget of the binary union operator is to add up its two inputs:

protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 + card2;
long size1 = getFirstPredecessorNode().getEstimatedOutputSize();
long size2 = getSecondPredecessorNode().getEstimatedOutputSize();
this.estimatedOutputSize = (size1 < 0 || size2 < 0) ? -1 : size1 + size2;


The Cross operator is handled as follows:

protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
//输出的总记录数为第一个输入节点和第二个输入节点的记录数的乘积;
this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 * card2;
//如果记录数大于等于零,则会计算输出数据的大小
if (this.estimatedNumRecords >= 0) {
    //获得第一个、第二个输入节点的单条记录大小,两者相加则是cross运算符单条输出记录的大小
    float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
    float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
    float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;

    if (width > 0) {
        this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
    }
}
}


as can be seen from the calculation of the budget by the two operators above, they mostly rely on the output budget of the upstream operator. the initial budget is definitely determined by the source operator, because only source can know the exact size of the data.

So, let’s look at DataSourceNode, and it’s clear that it’ as an input source of data, is the operator that is most likely to understand the size of the initial dataset, for which Flink defines a specific object baseStatistics for statistics, which is used to count budget information for external data sources. But not every data source can be counted, and Flink currently only implements file-fed budget statistics FileBaseStatistics.