Sharing guest: Dong Yifeng ByteDance

Editor’s Finish: Hu Shengda Weilai Automobile

Production platform: DataFunTalk

ClickHouse has become the mainstream and popular open source engine in the industry. As the amount of business data expands and the scene coverage becomes extensive, in complex query scenarios, ClickHouse is prone to query abnormalities, affecting the normal progress of business. This time mainly shares how ByteDance solves the complex query problem of ClickHouse, and explains the technical implementation details in detail, which has been exported to developers through the volcano engine ByteHouse.

The full text will be developed around the following aspects:

Project background

Technical solutions

Optimization and diagnostics

Results and outlook

01

Project background

1. ClickHouse execution mode

ClickHouse’s execution mode is relatively simple, similar to Druid and ES, and its basic query mode is divided into two stages:

In the first stage, the Coordinator receives the query and sends the request to the corresponding worker node;

In the second stage, the Coordinator receives the results of each worker node, aggregates them for processing, and returns.

Take the following SQL as an example:

Select name from student_distribute where id = 5

(1) When the Coordinator receives the request, since the student_distribute is a distributed table, it is necessary to rewrite the SQL to query the local table and forward the request to each shard worker;

(2) After receiving the request, the Worker queries the local table data and returns the results to the coordinator;

(3) The Coordinator summarizes the data for each shard and returns the results to the client.

Select name from student_local where id = 5

The mode of execution in the second stage can efficiently support many common scenarios, such as common queries against large wide tables, but with the complexity of business scenarios, there are also the following three problems:

First, when the data returned in the first stage is more and the calculation in the second stage is more complicated, the pressure on the Coordinator will be relatively large, and it is easy to become the bottleneck of the query, and the more shard it may be calculated, the slower the calculation, and the greater the bottleneck. For example, some recalculated agg operators count distinct. If we use hash table de-duplication, the second stage needs to merge the hash table of each worker on the coordinator stand-alone machine, and the amount of computation is very heavy and cannot be parallelized; Another example is group by cardinality is relatively large or window calculation.

Second, join is an important scenario for SQL. Since the Shuffle operation is not supported, the right table must be full data for Join. Whether it is an ordinary Join or a Global Join, when the right table of the Join is relatively large, it is easy to OOM in memory, and Spill to disk, although it solves the memory problem, may affect the performance due to the overhead of disk io and serialization calculation. Especially when Join is the most common Hash Join, the right table is slower to build if it is a large table. Although the community has recently made some optimizations for right-hand table construction, it is possible to build hash tables in parallel by following the join key split on a single machine. But the additional cost is the addition of a split operation to both the left and right tables.

Third, the support for complex queries (such as multi-table Joins, nested multiple subqueries, window functions, etc.) is not friendly, because the data cannot be dispersed by shuffle, and the generated pipeline cannot be fully parallel under some cases, and it is difficult to give full play to the full resources of the cluster.

2. Other MPP databases

At present, the mainstream MPP database basically supports the way Stage is executed. Taking Presto as an example, as shown in the following figure, an agg sql with a two-table join can be split into 5 stages.

Among them, Stage3 and Stage4 correspond to the reading of left and right table data respectively, Stage2 completes the two tables of Join and partial agg calculations, Stage1 completes the final agg calculation, and Stage0 collects Stage1’s data and summarizes and outputs it. In this process, Stage 3, 4, 2, and 1 can be executed in parallel on multiple nodes, and a single complex query is split into several stages, thus realizing the data transfer between stages and different workers.

3. Business background and objectives

As business complexity increases, the business does not want all data to be generated by ETL to produce large wide tables; The demand for complex queries, especially multi-round distributed joins and more aggs, is increasing, and the overall amount of data is growing. In the case of limited cluster resources, we hope to be able to make full use of machine resources and efficiently support complex queries based on ClickHouse.

ByteHouse is a version that ByteDance R&D students have deeply optimized and transformed based on the open source ClickHouse, providing stronger query services and data writing performance on massive data, and supporting a variety of application scenarios. As shown in the figure, ByteHouse has been fully verified and used in multiple internal scenarios such as behavior analysis, portrait analysis, intelligent marketing analysis, and APP log analysis, and has been enhanced in many aspects, with unique capabilities.

02

Technical solutions

1. Design thinking

The implementation of complex queries based on ClickHouse adopts a stage-by-stage approach, replacing the current two-stage execution method of ClickHouse. Similar to other distributed database engines (such as Presto, Impala, etc.), a complex Query is divided into multiple stages according to the data exchange situation, and the exchange of data between the Stage and the Stage is completed through the exchange, and there is no data exchange within a single Stage. There are three main forms of data exchange between stages:

(1) Shuffle(shuffle) according to a single (multiple) key

(2) Convergence of 1 or more nodes into one node (we call it gather)

(3) The same copy of data to multiple nodes (also known as broadcast or broadcast)

Divide different modules according to different functions, and the design goals are as follows:

(1) Each module agrees on a good interface to minimize mutual dependence and coupling. Once a module changes, it does not affect other modules, for example, the adjustment of the stage generation logic does not affect the scheduling logic.

(2) The module adopts the architecture of the plug-in, allowing the module to flexibly support different policies according to the configuration.

2. Related Terms

ExchangeNode represents the node of data exchange in the syntax tree

PlanSegment A plan fragment of execution that corresponds to a single Stage

ExchangeManager manages the exchange of data, which is responsible for the exchange of data between different Stage nodes

The SegmentScheduler scheduler, which is responsible for distributing the schedule fragments to the worker, is called by the Coordinator node

InterpreterPlanSegment Plan Fragment Executor, which executes a specific plan fragment

3. Execute the process

(1) After the Coordinator accepts the complex query, on the basis of the current ClickHouse syntax tree, inserts Exchange nodes and generates distributed plans based on node types and data distribution.

(2) Coordinator Splits the distributed Plan to generate the execution fragment PlanSegment for each Stage according to the Exchange Node type.

(3) The Coordinator calls the SegmentScheduler to send the PlanSegments of each stage to the Worker node.

(4) The Worker node accepts PlanSegment to complete the reading and execution of data through InterpreterPlanSegment, and completes the interaction of data through ExchangeManager.

(5) The Coordinator reads the data from the ExchangeManager of the corresponding node of the last round of Stage and returns it to the client after processing.

4. Plan Splitting

The following is an example of plan splitting, which is a query scenario of a 2-table Join, which divides the entire distributed plan into 4 stages according to Exchange information.

5. Query the SegmentScheduler

The query fragment scheduler SegmentScheduler sends PlanSemgent to different worker nodes according to a certain scheduling strategy according to upstream and downstream dependencies and data distribution, as well as Stage parallelism and worker distribution and status information.

The 2 currently supported strategies are:

(1) Dependency scheduling: Define the topology according to the Stage dependency, generate a DAG graph, schedule the stage according to the DAG graph, similar to topology sorting, and wait until the dependent Stage is started before starting a new Stage. For example, the two tables joined just now will first schedule the left and right tables to read the stage, and then schedule the join stage.

(2) AllAtOnce: Similar to Presto’s AllAtOnce strategy, it will first calculate the relevant information of each stage and schedule all the stages at once.

In contrast, these two strategies make trade-offs in fault tolerance, resource usage, and latency.

The first scheduling strategy allows for better fault tolerance, because ClickHouse can have multiple copies, and if the current Stage partial node fails to connect, it can try to switch to the replica node, without awareness of subsequent dependencies on stage. This refers to the reading data of the Stage, we call it Source Stage, non-Source Stage because there is no data dependency, fault tolerance will be stronger, as long as the number of nodes to ensure parallelism, and even in extreme cases can reduce the stage parallelism to support better fault tolerance. The disadvantage is that the scheduling is dependent, can not be completely parallel, will increase the scheduling time, for some data and the amount of computation is small, but the stage of more node scheduling delay may account for the overall SQL time is not a small proportion. We’ve also made some targeted optimizations to support parallelism as much as possible for non-dependencies.

The second scheduling strategy can greatly reduce the scheduling delay by parallelism, in order to prevent a large number of network io threads, we control the number of threads by asynchronizing; The disadvantage of this strategy is that fault tolerance does not depend on scheduling well, because each stage worker has been determined before scheduling, and if one worker has a connection exception, the entire query will directly fail. And there may be some Stage upstream data that has not yet been scheduled to execute before Ready, and it takes a long time to wait for the data. For example, the final agg stage needs to wait for the partial agg to be completed before the data can be obtained. Although we have made some optimizations, we do not waste CPU resources for a long time, but after all, we also consume some resources, such as creating threads for execution.

6. InterpreterPlanSegment

The following describes how the plan fragment is executed, originally ClickHouse’s query and node execution is mainly in the form of SQL, and after slicing Stag, it needs to support the execution of a separate PlanSemgent. So the main function of InterpreterPlanSegment is to accept a serialized PlanSemgent, which can run the entire PlanSemgent’s logic on the Worker node. The main steps are:

(1) Read the data according to the input information, and if the input is a specific table, read the data from the local; If input is an exchange input, the data is read from the corresponding ExchangeManager;

(2) Execute PlanSemgent’s logic;

(3) Output the result data after processing, if it is a Coordinator node, send the data to the Client; If it is a non-Coordinator node, write to the ExchangeManager corresponding to this instance in the exchange mode of data.

In the Interpreter section, we try to reuse the current ClickHouse execution logic, such as processor execution mode, process list management, and so on. Compared to the InterpreterSelect logic, it can be considered that 1 stage has only 1 stage. Of course, we have also done a lot of functions and performance enhancements, such as we support 1 stage to handle multiple joins, etc., which can reduce the number of stages and unnecessary data transfer, in a large table (usually a fact table) to join multiple dimension tables of the scene has a better help.

After the InterpreterPlan Segment is executed, the corresponding status information is reported to the coordinator. When an exception is executed, the exception information is reported to the query fragment scheduler and the execution of other workers is canceled.

7. Data Exchange (ExchangeManager)

ExchangeManager is the medium for PlanSegment data exchange and an important component in balancing the upstream and downstream processing capabilities of data. On the whole, it adopts the push method, actively pushes it to the downstream when the upstream data is ready, and supports reverse pressure. The architecture is shown in the following figure:

The specific process is as follows:

(1) When the downstream PlanSegment is executed, when the input is an exchange input, the corresponding data request is registered with the upstream ExchangeManager according to certain token rules (usually composed of query_id+segment_id+index_id, etc.) and data source information;

(2) After the upstream ExchangeManager receives the request, establish the upstream and downstream data channels, and push the upstream data to the downstream, if the channel has not been established, it will block the upstream execution.

In this process, the upstream and downstream will optimize the sending and reading through the queue, and control the upstream execution speed through the mechanism of backpressure when the queue is saturated. Due to the use of push and queue, here we have to consider a special scenario, in some cases downstream of the Stage does not need to read all the upstream data, a typical scenario is limit. For example, limit 100, the downstream stage needs to read 100 pieces of data, and the upstream may output larger-scale data, so in this case, when the downstream stage reads enough data, it needs to be able to actively cancel the execution of the upstream data and empty the queue. This is a scenario-specific optimization that can greatly speed up query times.

Other points that ExchangeManager needs to consider and optimize are:

(1) Fine-grained memory control, can be in accordance with the instance, query, segment multi-level memory control, to avoid OOM, a longer-term consideration is to support spill to disk, reduce the use of memory. In order to improve the transmission efficiency, small data needs to be merged, and big data needs to be split. At the same time, network processing in some scenarios to ensure orderliness, such as sort, partial sort and merge sort network transmission must be orderly, otherwise the data may be problematic.

(2) Connection multiplexing and network optimization, including selecting memory-based switching without going to the network for upstream and downstream scenarios in the same section, can reduce the overhead of the network and reduce the cost of data serialization and deserialization. In addition, because ClickHouse has done sufficient optimization in computing, in some scenarios, even memory bandwidth has become a bottleneck, and we have also applied techniques such as zero copy to reduce memory copies in some scenarios of ExchangeManager.

(3) Exception handling and monitoring, compared with stand-alone execution, abnormal situations in distributed situations are more complex and not easy to perceive. By retrying, you can avoid the temporary high load or abnormality of some nodes, and you can quickly sense, troubleshoot, and do targeted solutions and optimizations when problems occur. There is more engineering practice here.

03

Optimization and diagnostics

1. Join multiple implementations

Depending on the size and distribution of the data, we support a variety of Join implementations, including the following:

(1) Shuffle Join, the most versatile Join;

(2) Broadcast Join, for the scene of large tables and small tables, reduce the transmission of the left table by broadcasting the right table to all worker nodes of the left table;

(3) Colocate Join, reduce the data transmission of left and right tables for scenarios where the left and right tables maintain a common distribution according to the Join key.

2. Network connection optimization

The core essence of network connection optimization is to reduce the use of connections. Especially when Shuffle needs data, each node of the next Stage needs to pull data from every node of the previous Stage. When there are more nodes in a cluster, if there are more complex Query (more stages, more degrees of parallelism (number of nodes)), the Worker nodes of the cluster will establish a very large number of connections, as shown in the following figure, the number of connections established by a single node is proportional to the number of cluster nodes and the number of concurrent stages.

The scale of the clickhouse cluster inside Byte is very large, and the largest cluster (the scale of a single cluster of several thousand) may establish tens of thousands of network connections for a single machine in the current execution mode of ClickHouse. Therefore, if complex Query execution is supported, it is necessary to optimize network connectivity due to the increase in stages, especially support for connection multiplexing. By reusing connections as much as possible, we will only establish a fixed number of connections between different nodes, and different queries will reuse these connections without growing with the size of the query and stage.

3. Network transmission optimization

In the data center field, remote direct memory access (RDMA) is a technology that bypasses the remote host operating system kernel to access the data in its memory, because it does not go through the operating system, not only saves a lot of CPU resources, but also improves system throughput, reduces the network communication delay of the system, especially suitable for wide application in massively parallel computer clusters.

Since ClickHouse has done a lot of optimization at the computing level, and the network bandwidth is much smaller than the memory bandwidth, in some scenarios where the amount of data transmission is particularly large, network transmission will become a certain bottleneck. In order to improve the efficiency of network transmission and improve the throughput of data exchange, on the one hand, we introduce compression to reduce the amount of data transferred, on the other hand, we introduce RDMA to reduce certain overhead. After testing, in some scenarios with large data transfer volumes, there are no small benefits.

4. Runtime Filter

The Join operator is usually the most time-consuming operator in the OLAP engine. If you want to optimize the Join operator, you can have two ideas, on the one hand, you can improve the performance of the Join operator, such as a better Hash Table implementation and Hash algorithm, and better parallelism. On the other hand, it is possible to minimize the data involved in the Join calculation.

Runtime Filter has a relatively large effect in some scenarios, especially the star model scenario where the fact table joins the dimension table. Because the size of the fact table is usually relatively large in this case, and most of the filter conditions are on the dimension table, the fact table may want to join the dimension table in full. The role of the Runtime Filter is to drastically reduce data transfer and computation in the Join by filtering out the input data in the Join that will not hit the Join in advance, thereby reducing the overall execution time. The following figure is an example:

The left table does not have a direct filter, the right table has the filter item.proce > 1000. When the right-table query is completed, the scope and collection of the item.id can be determined, and the scope and collection of sales.item can be inferred from the join type inner join and join conditions sales.item_id=item.id. We can use the scope and collection of sales.item as a filter condition to filter the sales data before joining.

We have supported Runtime Filter on complex queries, and currently mainly support minmax and bloomfilter.

The overall execution process is as follows:

(1) Build plan segment worker (right table) will send the generated single-node runtime filter to the coordinator node;

(2) The coordinator performs a merge operation after waiting for the runtime filter of each worker to be sent, and distributes the merged runtime filter to each execute plan segment worker (left table) node;

(3) During the runtime filter construction, execute plan segment (left table) needs to wait a certain amount of time, and if the runtime filter has been issued before the timeout, the filter is executed through the runtime filter.

One question to consider here is whether the runtime filter column builds indexes (primary keys, skip indexes, etc.) and hits prewhere? If the runtime filter’s column (join column) has built an index, the pipeline needs to be rebuilt. Because data reads may be reduced after an index is hit, both the degree of pipeline parallelism and the processing range of the corresponding data may change. If the runtime filter’s columns have nothing to do with the index, you can pre-emptively bring the filter conditions when the plan is generated, but at the beginning as a placeholder is empty, and the placeholder information can be changed to a real filter when the runtime filter is issued. In this way, even if the runtime filter release timed out and the query fragment has begun to execute, as long as the query fragment is not executed, the data after that can still be filtered.

It should be noted that runtime filter is an optimization in a special scenario, which is aimed at the scene where the amount of data in the right table is not large, and the runtime filter built has a strong filtering effect on the left table. If the amount of data in the right table is relatively large, the construction of runtime filter is relatively slow, or the data filtering effect of the left table is poor or even non-existent, then the runtime filter will increase the time of the query. Therefore, depending on the characteristics and size of the data, decide whether to turn on or not.

5. Diagnosis and analysis

With the introduction of the multi-stage execution model for complex queries, the execution pattern of SQL becomes complex. Especially when users query some very complex queries, hundreds of rows of sql generated by the stage will be very large, and it will take a long time to read the stage and understand the meaning of sql. Digression: We’ve been running through all the tpcds queries for a long time, and there are some sql that can generate dozens of stages. In this case, how to locate the bottleneck of SQL and optimize it is a difficult problem.

We have made the following two optimizations:

First of all, the most common practice is to add all kinds of perfect metrics, including the execution time of the entire Query and the execution time of different stages, the amount of IO data, the operator processing data and execution, the operator metrics and profile event, etc.

Second, we recorded the backpressure information and upstream and downstream queue lengths to infer stage execution and bottlenecks.

Frankly speaking, SQL scenarios include Vientiane, and many very complex scenarios still require students who are familiar with the engine to diagnose and analyze SQL to make optimization recommendations. In the process of accumulating experience, we hope that by continuously improving the metrics and analysis paths, we can continuously reduce the burden of oncall, and in some scenarios, we can give optimization tips more intelligently, which is also beneficial for using students.

04

Results and outlook

1. Complex query effect

According to the three shortcomings of the above execution model, the following three scenarios are tested:

(1) The calculation of the second stage is more complicated

(2) The right table of Hash Join is a large table

(3) Multi-table Join

With SSB 1T data as the dataset, the cluster contains 8 nodes.

2. The calculation of the second stage is more complicated

This case SQL is shown in the following figure

uniqExact is the default algorithm for count distinct, using hash table for data deduplication. With complex queries, the query execution time is from 8.514s = >2.198s, and the second phase of the agg uniqExact operator’s merge, which was originally merged by coordinator single point, can now be done in parallel by multiple nodes by following the group by key shuffle. Hence the pressure on the coordinator’s merge agg by shuffle.

3. The right table of Hash Join is a large table

This case demonstrates the scenario where the right table is a large table, because ClickHouse’s optimization of multiple tables is not very good. Here a subquery is used to push down the filtered criteria.

In this case, with the complex query pattern, the query execution time ranged from 17.210=>1.749s. lineorder is a large table, through shuffle can follow the big table data according to the join key shuffle to each worker node, reducing the pressure of right table construction.

4. Multi-table Join

This case is a case of a 5-table join.

After the complex query mode is enabled, the query execution time is from 8.583s = >4.464s, and all right tables can start reading and building data at the same time. In order to compare with the existing mode, the runtime filter is not turned on for complex queries, and the effect will be faster after turning on the runtime filter.

It should also be emphasized here that today’s sharing is mainly to explain how to support complex queries from the execution mode. In fact, the optimizer also provides a significant performance gain for complex queries. Through some rbo rules, such as common predicate pushdown, correlation subquery processing, etc. In fact, there are many optimization rules here, which can greatly improve the execution efficiency of SQL. The above SQL is actually relatively simple, 5 table join and some dimension table filter conditions, here written as sub-queries in order to better push down the right table filter conditions in the existing mode of ClickHouse. In fact, for us, in the mode of complex query, due to the existence of an optimizer, the user does not have to write so complicated, the optimizer will automatically complete the pushdown and rbo optimization.

The above is the optimization of some rules, in fact, in complex queries, the optimization of cbo also has a great effect. As an example, in ClickHouse, the performance of the same two tables, the large table joins the small table, much better than the small table joins the large table. In the previous effect 2, if you adjust the order of the tables, it will be much faster; In addition, which kind of join implementation to choose has a greater impact on join performance, if the join key distribution is satisfied, colcate join completely reduces the shuffle of data compared to shuffle join. In a multi-table join, the order of the join and the implementation of the join have a greater impact on the length of execution than the 2-table join. With the help of data statistics, through some cbo optimization, you can get a better execution mode.

With the optimizer, business students can write any SQL according to the business logic, and the engine automatically calculates the relatively optimal SQL plan and executes it, accelerating the execution of the query.

5. Outlook

CLickHouse’s current model actually performs well in many single-table query scenarios. We mainly optimize for complex query scenarios, mainly to achieve multistage execution mode, and to achieve data transfer between stages. In terms of engineering practice, more attempts and optimizations have been made to improve the performance of execution and network transmission, and it is hoped that the threshold of SQL analysis and tuning will be lowered and the pressure of oncall will be reduced by improving metrics and intelligent diagnostics.

The current realization is only the first step, and we have a lot of work in the future.

First, it is definitely a continued improvement in execution and Exchange performance. This is not going to talk about general optimizations performed by the engine, such as better indexes or operators, mainly related to complex query patterns.

The second is the enhancement of Metrics and intelligent diagnostics, as just mentioned, the flexibility of SQL is too high, for some complex queries without metrics it is almost difficult to diagnose and tune, which we will continue to do for a long time.

Welcome to use ByteHouse products, you can scan the code or search ByteHouse to enter the official website and click to try it now to try it for free, and the ability of complex queries has also been output to ByteHouse.

Welcome to pay attention to the two-dimensional code, enter the official exchange group of the ByteDance data platform, the group can get technical dry goods, wonderful live broadcast activities, and at the same time, for the live broadcast problem, you can also communicate with me in the group. In addition, our team is recruiting a large number of R&D engineers, welcome to pay attention to the official account of ByteDance data platform for relevant information.

This concludes today’s sharing, thank you.

Share at the end of the article, like, watch, give a 3 combo ~

01/ Sharing guests

Dong Yifeng

ByteDance

Senior R&D Engineer, ByteHouse, Volcanic Engine

ByteDance data platform senior R&D engineer, joined the ByteDance OLAP team in 2016, has been engaged in the development and promotion of big data query engine, has been responsible for Hive, Spark, Druid, ClickHouse and other big data engines, currently mainly focusing on ClickHouse execution layer related research and development.

02/ Free download materials

03/ Register to watch live PPT for free

04/About us

DataFun: Focus on the sharing and exchange of big data and artificial intelligence technology applications. Founded in 2017, more than 100+ offline and 100+ online salons, forums and summits have been held in Beijing, Shanghai, Shenzhen, Hangzhou and other cities, and more than 2,000 experts and scholars have been invited to participate in sharing. Its public account DataFunTalk has produced 800+ original articles, millions + reads, and 150,000+ accurate fans.

🧐 Share, like, watch, give a 3 combo! 👇