CBO has been one of the most successful features in recent years in optimizing Spark SQL.
CBO calculates some statistics related to business data to optimize the query, such as the number of rows, the number of rows after deduplication, null values, maximum and minimum values, etc. Based on this data, Spark automatically selects BHJ or SMJ for cost-based Join Order in multi-Join scenarios to achieve the purpose of optimizing the execution plan. However, since these statistics need to be processed in advance and will become outdated, we use outdated data to judge, and in some cases it will become a negative effect, which will reduce the efficiency of SQL execution.
AQE solves this problem by making statistics during execution and dynamically adjusting the execution plan.
most important question for AQE is when to recalculate the optimization execution plan. If the operators of Spark tasks are arranged in pipelines, they are executed in parallel in sequence. However, shuffle or broadcast exchange interrupts the permutation execution of operators, which we call materialization points, and uses “query stages” to represent small fragments that are divided by materialization points. Each Query Stage produces intermediate results, and downstream Query Stages can be executed if and only if the stage and all its parallel stages have been executed. So when the upstream stage execution is completed, the statistics of partitions are also obtained, and the downstream has not yet started executing, which provides AQE with the opportunity to reoptimize.
At the beginning of the query, after the execution plan is generated, the AQE framework first finds and executes those stages that do not have an upstream stage. Once one or more of these stages are complete, the AQE framework marks them as complete in the physical plan and updates the entire logical plan based on the execution data provided by the completed stages. Based on these new output statistics, the AQE framework will execute an optimizer to optimize according to a series of optimization rules; The AQE framework also executes optimizers that generate ordinary physical plans and adaptively executes optimization rules that are specific, such as partition merging and data skew processing. As a result, we get the latest optimized execution plan and some stages that have been executed, and this is a loop. Then we just need to repeat the above steps until the entire query is finished.
In Spark 3.0, the AQE framework has three major features:
selection join strategy
process of dynamic folding shuffle
dynamic optimization of joins with data skew
Next, let’s take a look at these three characteristics specifically.
(1) Dynamically merge
When we process very large amounts of data, shuffle is usually the most performance-impacting. Because shuffle is a very time-consuming operator, it needs to move data over the network and distribute it to downstream operators.
In shuffle, the number of partitions is critical. The optimal
number of partitions depends on the data, and the size of the data will vary greatly in different levels of query, so it is difficult to determine a specific number:
if there are too few partitions, the amount of data per partition will be too much, which may cause a large amount of data to fall to disk, thus slowing down the query.
If there are too many partitions, the amount of data per partition will be small, which will incur a lot of additional network overhead and affect the Spark task scheduler, which will slow down queries.
To solve this problem, we initially set a relatively large number of shuffle partitions to merge adjacent small partitions with the data of the shuffle file during execution.
For example, suppose we perform
SELECT max(i) FROM tbl GROUP BY j, the table tbl has only 2 partitions and the amount of data is very small. We set the initial shuffle partition to 5, so there will be 5 partitions after grouping. Without AQE optimization, 5 tasks will be generated for aggregation results, in fact, 3 partitions have a very small amount of data.
in this case, however, AQE will only generate 3 reduce tasks.
many joins supported by Spark, broadcast hash join performance is the best. Therefore, if the estimated size of the table to be broadcast is less than the broadcast limit threshold, then we should set it to BHJ. However, improper estimation of the size of a table can lead to decision-making errors, such as a join table with a lot of filters (easy to estimate the table) or a join table with many other operators (easy to estimate the table down), not just a full scan of a table.
Since AQE has precise upstream statistics, it solves the problem. For example, in the following example, the actual size of the right table is 15M, and in this scenario, after filter filtering, the actual data size participating in the join is 8M, which is less than the default broadcast threshold of 10M, and should be broadcast.
While we convert to BHJ during execution, we can even optimize traditional shuffle to local shuffle (e.g. shuffle reads in mapper instead of reducer-based) to reduce network overhead.
(3) Dynamic optimization
skew Data skew is caused by the uneven distribution of data across partitions on the cluster, which slows down the entire query in the join scenario. AQE automatically detects skewed data based on shuffle file statistics, breaks up those skewed partitions into small subpartitions, and then joins each.
We can look at this scenario, Table A joins Table B, where Table A’s partition A0 data is much larger than the other partitions.
AQE will split partition A0 into 2 subpartitions and let them join partition B0 of Table B alone.
Without this optimization, SMJ would have 4 tasks and one of them would take much longer to execute than the others. The join is optimized to have 5 tasks, but each task takes about the same amount of time to execute, so the whole query brings better performance.
3. Use the
spark.sql.adaptive.enabled to true to enable AQE, which defaults to false in Spark 3.0 and meets the following conditions:
> non-streaming query
Contains at least one exchange (such as join, aggregate, window operator) or one subquery
AQE successfully solves a difficult trade off (overhead of generating statistics and query time) and data precision problems of Spark CBO by reducing the dependence on static statistics. Compared to the previous limited CBO, it is now very flexible – we no longer need to analyze the data in advance!