Recently, ByteDance’s enterprise-level technology service platform Volcano Engine officially released ByteHouse. In the process of creating ClickHouse Enterprise Edition ByteHouse, after years of exploration and precipitation, today I would like to share with you two typical application optimization cases of ByteDance’s use of ClickHouse in the past.
Recently, ByteDance’s enterprise-level technology service platform Volcano Engine officially released “ByteHouse”, as ClickHouse Enterprise Edition, to solve the pain points of difficult to get started with open source technology and high cost of trial and error, and provide commercial products and technical support services.
As the largest ClickHouse
user in China, ByteDance currently has a total of more than 1.5W internal ClickHouse nodes, a total data volume of more than 600PB under management, and the largest cluster size is 2400 The remaining nodes. In summary, much of ByteDance’s extensive business growth analysis is based on a ClickHouse-based query engine. In the process of creating ClickHouse Enterprise Edition “ByteHouse”, after years of exploration and precipitation, today I would like to share with you two typical applications and optimization cases of ByteDance’s use of ClickHouse in the past.
Recommender system real-time metrics
“A/B experiments” within ByteDance are widely used, especially in verifying the effectiveness of recommendation algorithms and feature optimization. Initially, the company’s internal dedicated A/B experimental platform has provided T+1 offline experimental indicators, and the recommender system needs to observe the algorithm model faster, or the online effect of a certain function, so it needs a data that can be fed back in real time as a supplement:
-
can query aggregate indicators and detailed data at the same time;
-
It can support up to hundreds of dimensions and indicators, and the scene changes flexibly and will continue to increase.
-
Data can be efficiently filtered by ID;
-
Some machine learning and statistically relevant metric calculations (such as AUC) need to be supported.
There are many analysis engines inside
byte, ClickHouse, Druid, Elastic Search, Kylin, etc. After analyzing user requirements, ClickHouse is selected:
-
which can observe the algorithm model faster without the high data delay caused by precalculation.
-
ClickHouse is not only suitable for aggregate queries, but also has good performance for detail checkup after cooperation with hop indexing;
-
Byte’s self-developed ClickHouse supports the Map type, supports dynamically changing dimensions and indicators, which is more in line with requirements;
-
BitSet’s filtering Bloom Filter is a better solution, and ClickHouse has native BF support;
-
Byte’s self-developed ClickHouse engine has achieved relevant capabilities through UDF, and has good scalability.
Each product has its own suitable scenario, but for the current scenario of the needs assessment, ClickHouse is more suitable.
After confirming the technology selection, there are also two ways in the how to implement part:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_jpg/fgbJnuXT4EptEp9ex7fJTYIDNBETlcjI5FcKyRlqBYrOx41OszxzQZa7uOzhMmtaW9L68QlgWCeXpVb5bawRnQ/640?wx_fmt=jpeg”>
Due to the
fact that the external write is not controllable and the technology stack is not available, we finally adopted the Kafka Engine solution, that is, ClickHouse has built-in consumers to consume Kafka. The overall architecture is as follows:
data is directly generated by the recommender system and written to Kafka
– in order to make up for the lack of Flink’s ETL capabilities, the recommender system has made corresponding cooperation, modifying the message format of Kafka Topic to directly adapt to the schema of the ClickHouse table;
The
agile BI platform is also adapted to real-time scenarios and can support interactive query and analysis;
If there is a problem with the real-time data, you can also import the data into ClickHouse from Hive, in addition, the business side will also import the offline data sampled by 1% for some simple verification, and the 1% sampled data will generally be stored longer.
In addition to technology selection and implementation schemes, we have encountered many problems in supporting the real-time data of the recommender system, the biggest of which is that with the increasing amount of data generated by the recommender system, the consumption power of a single node is also required to be larger and larger, mainly encountering the following problems:
Issue 1: Insufficient write throughput
Challenge: In scenarios with a large number of auxiliary hop indexes, the construction of the index seriously affects the write throughput.
Solution: Build the index asynchronously.
The specific logic in the implementation of the community version is as follows:
-
parse the input data to generate a block of in-memory data structures;
-
Then slice the block and build the columns data file according to the schema of the table;
-
Finally, the scan builds the skip index file according to the skip index schema. The Part file will not be built until the three steps are completed.
Under the premise of ensuring that the user can query normally after the columns data is constructed, ByteHouse synchronously completes the first two steps, and the third step puts the built part into an asynchronous index building queue, and the background thread builds the index file.
Effect: After changing to asynchronous, the overall write throughput can be increased by about 20%.
Problem 2: Kafka
Insufficient Spending Power
Challenge: The community version of Kafka table will only have one consumer by default , which is a waste of resources and performance does not meet performance requirements.
Try to optimize the process:
try to increase the consumption power by increasing the number of consumers,
-
but the implementation of the community is to manage multiple consumers by one thread, The data consumed by multiple consumers can only be completed by one output thread, so the potential of multithreading and disk is not fully exploited here;
-
Trying to write to the same table by creating multiple Kafka Tables and Materialized Views is troublesome for O&M.
Solution: Support multi-threaded consumption.
The optimization methods mentioned earlier are not satisfactory, and finally decided to transform Kafka Engine to support multiple consumer threads internally, simply put, each thread holds a consumer, and then each consumer is responsible for its own data parsing and data writing, which is equivalent to executing multiple INSERT Queries inside a table at the same time.
Effect: Multi-consumption and writing tables are consumed by multiple consumers at the same time through multi-threading, and the write performance is improved almost linearly.
Problem 3: Failure cannot guarantee data integrity
challenges : In active/standby mode, if data is written to both nodes at the same time, once one node fails, various problems are prone to occur during the recovery of the newly started node, including performance degradation, unguaranteed sharding, and the most serious query results may be incorrect
Solution: Ensure that only one of the nodes in the active/standby mode is written.
To prevent two nodes from consuming this data, an improved version of Kafka Engine references ReplicatedMergeTree’s ZooKeeper-based master selection logic. For each pair of replicas, a pair of consumers will try to complete the master selection logic on ZooKeeper to ensure that the consumers elected as the master node can consume, and the other node will be in a standby state.
With such a single-node consumption mechanism, the system will detect whether the ReplicatedMergeTree table data is complete, if the data is incomplete, it means that it cannot be serviced normally, at this time, the consumer will take the initiative to give the Leader, so that the replica node becomes a consumer, that is, the newly written data will not be written to the node with missing data, and for the query, due to the query routing mechanism, the query will not be put away Routes to nodes that are missing data, so you can always query the latest data.

Results: Improved Kafka Engine to ensure that only one node can consume data in active and standby mode, even if a node failure occurs during the recovery of a new node.
Real-time ad serving data
The second classic case is about the advertising data, which is generally that the operation personnel need to see the real-time effect of the advertising delivery. Due to the nature of the business, the data generated on the day often involves multiple days of data. This system was originally implemented based on Druid, which will have some difficulties in this scenario:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_jpg/fgbJnuXT4EptEp9ex7fJTYIDNBETlcjIZWElLUibQoE6jyCRXa1PJSBzYic3MYKRyDvUP9uwtsmrDIiaoehAcmvNQ/640?wx_fmt=jpeg”>
Choosing ClickHouse can solve the shortcomings of Druid, but there are still some problems that need to be solved: Problem 1:
Buffer Engine cannot be used with ReplicatedMergeTree
Problem & Challenge: The community provides Buffer Engine In order to solve the problem of generating too many parts in a single write, but it does not work well with ReplicatedMergeTree, buffers written to different replicas only cache the newly written data on their respective nodes. Causes inconsistencies in queries.
:
Improved the Buffer Engine with the following adjustments and optimizations:
>
We chose to combine the three tables Kafka/Buffer/MergeTree to provide an easier interface;
-
Built-in Buffer into Kafka Engine, as an option for Kafka Engine can be turned on/off, making it easier to use;
-
Inside the buffer table, a similar pipeline pattern handles multiple blocks;
-
Queries in the case of ReplicatedMergeTree are supported.
First make sure that only one node is consuming a pair of replicas, so only one node has data for two buffer tables in a pair of replicas. If a query is sent to a replica that is not consumed, a special query logic is additionally built to read data from the buffer table of the other replica.
Effect: Enhanced Buffer Engine and solved the problem of query consistency under the simultaneous use of Buffer Engine and ReplicatedMergeTree.
Problem 2: After downtime, data loss may occur and the latter is the challenge of repeated consumption
: ClickHouse lacks transaction support. A batch of writes that only partially write parts to cause downtime because there is no transaction guarantee that there may be loss or duplicate consumption after restarting.

Referring to Druid’s KIS solution, it manages Kafka Offset by itself to achieve atomic semantics of single-batch consumption/writing: the implementation chooses to bind Offset and Parts data together, which enhances the stability of consumption. Each time it is consumed, a transaction is created by default, and the transaction is responsible for writing the part data to disk together with the Offset, and if there is a failure, the transaction will roll back the offset and the written part together and then consume it again.
Effect: Ensure the atomicity of each inserted data and enhance the stability of data consumption.
Real-time data analysis is the advantage of ClickHouse, combined with the characteristics of ByteDance’s real-time data scene, we have optimized and transformed ClickHouse, and precipitated these capabilities to ByteHouse. Based on the advantages of self-developed technology and ultra-large-scale experience, ByteHouse brings new choices and support to enterprise big data teams to cope with complex and changeable business needs and rapidly growing data scenarios. In the future, ByteHouse will continue to output industry users with bytes and external best practices, help enterprises better build an interactive big data analysis platform, and share experience with the ClickHouse developer community more widely to jointly promote the development of the ClickHouse community.