How Bilibili built an efficient lake warehouse integrated platform based on Trino+Iceberg
Foreword: Li Chengxiang, Head of Bilibili OLAP Platform, shared the theme content entitled “How Bilibili Builds an Efficient Lake Warehouse Integrated Platform Based on Trino+Iceberg” at the Hadoop meetup 2022 Shanghai event on September 24, 2022, and the following is the specific sharing.
Hello everyone, I am Li Chengxiang from Bilibili, very happy to participate in Hadoop Meetup, can communicate with you offline, this time I mainly share the content is how Bilibili based on Trino + Iceberg to create an efficient lake warehouse integrated platform.
First of all, I will introduce the data processing process of Bilibili, our data mainly comes from three aspects: buried data from APP/web page, log data from the server and data in the business system database, collected into the big data platform through our data collection tools and services, including offline HDFS data and real-time Kafka data.
After entering the big data platform, our data developers process the data according to the data requirements and model the data stratified. Mainly offline data processing with Hive and Spark, real-time data processing with Flink. Users can use Spark or Trino to directly access the data tables modeled by the students, but many times, in order to better support the actual needs of the business in terms of data exploration/BI reports/data services/full-text retrieval, etc., we also need to export the data from the Hive table on HDFS to external storage, so as to meet the needs of users more efficiently, such as ClickHouse/Redis/ES.
The current architecture can basically meet our needs for data analysis, but there are many shortcomings, the main problems are:
First, exporting from Hive tables to external storage requires additional data synchronization and data storage costs, and the entire data processing link is also longer, and the reliability is reduced.
Second, there are actually islands of data between the various stores, and cross-origin queries are costly, inefficient, and basically unfeasible.
Third, it is difficult for our internal platform service toolchain based on the Hadoop/Hive ecosystem to fully cover each external storage computing engine.
So we introduced the lake warehouse integrated architecture based on Trino+Iceberg, hoping to simplify the current data processing process, the core goals are two: First, there were a large number of cases of accessing Hive through Trino/Spark, and we hope to accelerate the query efficiency of this part through the lake warehouse integration, improve the user experience while reducing the cost of machine resources for querying.
The second is that for many previous scenarios that need to be synchronized to external storage, in the case of no particularly high performance requirements, you can directly use Iceberg table responses without syncing to external storage, simplifying the data development process of the business and improving development efficiency.
Finally, the integrated architecture of the lake warehouse based on Trino+Iceberg can basically be fully compatible with all the toolchains of our big data development platform, and the cost of adaptation is very low.
We hope to implement an efficient OLAP engine based on Trino and Iceberg, providing second-level query response and supporting most interactive analysis needs, and a small number of special needs through ClickHouse/ElasticSearch and KV class storage. The industry has multiple directions of practice for the integration of the lake warehouse, such as solving the data upsert scenario, or real-time data visibility, etc., and the B station also has some exploration and landing practice in these directions, but the focus I want to introduce today is our exploration of the query performance direction of the warehouse in the lake warehouse integration.
The overall architecture of our lake warehouse integration is shown in the figure, based on Spark/Flink’s ETL task to read and write Iceberg tables, and external services query Iceberg table data through the Trino engine.
Magnus is our self-developed Iceberg intelligent data management service, Spark/Flink will notify Magnus of the commit information every time it is a new file to Iceberg commit, Magnus according to the commit information of the table and the related policy asynchronous scheduling Spark tasks for optimizing the organization of data in Iceberg tables, such as small file merge, data sorting and so on.
Our goal is to build a second-level responsive lake warehouse all-in-one platform based on Trino and Iceberg, and this goal is mainly based on the following facts that we observe:
First, our main target business scenarios, such as reports/data products, their tables are strong schema normalized data after our data development classmates ETL, and the query scenarios are mainly projection/filtering/association/aggregation These basic operators are combined, such as two large table associations, or complex nested subqueries, of course, can be executed, but not our main target scenario. For such SPJA queries, the result set is generally very small.
Second, we can enhance Iceberg and Trino to support OLAP advanced features such as sorting/indexing/precomputation, so that the query only scans the data logically required by SQL, and the unwanted data is Skip off, while controlling the amount of data that needs to be scanned within a certain range.
Third, Iceberg’s transactional support is that we can safely reorganize the data, and this foundation is the basis for our ability to sort data/index/precompute asynchronously through the Magnus service.
At the same time, we do not pursue the ability to respond milliseconds to ClickHouse, mainly different from ClickHouse’s memory integration architecture, Iceberg data is stored on the HDFS distributed file system, introducing additional network and file system overhead, Iceberg is mainly metadata management at the file level, the file is generally around 256M size level, the granularity is thicker than ClickHouse.
In addition, there are many gaps between the open query engine and the OLAP engine that makes full use of the vectorization ability compared with other OLAP engines developed based on the native language on the computing side. Based on the predictable amount of data scanning and the controllable SPJA computational complexity, we can have a predictable query response time, and the question is: how can we execute the query based on Trino and Iceberg, try to access only the data that the query logically needs?
For SPJA operators in a typical multi-bit analysis scenario, we analyze each operator type. The first is projection, the actual data storage type of the Iceberg table is ORC column-memory format, so the projected fields in the query are pushed down to the TableScan layer, and the ORC Reader will only read the required fields instead of all the fields, which is a problem that has been solved.
For filtration, we need to consider different filtration types to find suitable solutions. Filtering can generally be divided into two kinds of filtering conditions, equivalent value and range filtering, and the filter field itself can also be divided into high cardinality segment and low cardinality segment according to the different base numbers of the field.
In the current community versions of Trino and Iceberg, some data skipping-related technologies for filtering have been implemented. On the engine side, Trino’s FiterPushDown correlation optimizer rules push filtering conditions as far as possible to the bottom TableScan layer.
Iceberg first supports Partition Prunning, partitioned files are stored in the partition directory, the filter condition contains the partition can skip the irrelevant partition directory, and will record the minmax value information of all fields of each file in the metadata of the table, when generating InputSplit, if the filter condition of a field and the minmax value of the field match to determine whether the file needs to be scanned.
In addition, Iceberg also supports user-defined sort fields, for example, age is a commonly used filter field, users can define age as a sort field, Magnus will pull up the asynchronous Spark task, the data of the Iceberg table according to the user’s definition of the data according to the age field sorted. The advantage of data sorting is to readjust the aggregation of data so that they can aggregate according to the age field, for example, for a query with age=16, 3 files need to be scanned before sorting, and only 1 file needs to be scanned after sorting.
The above example explains the impact of the sort distribution of data on index and query performance, and we further expand Iceberg’s ability to sort the distribution of data. We first divide the sorted distribution of Iceberg table data into two categories: data organization between files and data organization within files, which are independent of each other and can be configured separately. We mainly extended the data organization between files, supporting a total of four data organization methods: Hash/Range/Zorder and HibertCurve. Hash and Range are familiar to everyone, here is a brief introduction to Zorder and HibertCurve two kinds of distribution.
If for an Iceberg table, there are multiple commonly used filter fields, we use Order By a, b, c to sort multiple fields, the aggregation of the data for a, b, c in turn decreases, the data skipping effect will also decrease sequentially, especially when the cardinality of a is relatively high, it is likely that the query with only b or c filter conditions cannot skip any file.
Zorder’s practice is to map the multidimensional data of multiple field values into one-dimensional data according to the rules, we organize according to the one-dimensional data sorted by the mapped, and the one-dimensional data is connected in size order is a nested Z-shape, so it is called Zorder sorting.
Zorder can ensure that the order of the mapped one-dimensional data can ensure the aggregation of the original dimensions at the same time, so as to ensure that there is a good Data Skipping effect for each filter field participating in Zorder sorting.
For different data types and data distribution, the implementation of Zorder is also a more challenging thing, interested students can refer to our previous article: https://zhuanlan.zhihu.com/p/354334895.
We support Zorder in the inter-file sorting phase, so we actually need this nested Z-shaped data distribution to be cut into many segments, and each piece of data is stored in a file.
It can be seen that the span of some of the Zorder connectors is relatively large, if the data of the two points of the connection line with a large span is split into a file, the range of minmax values on the corresponding field of the file will be very large, and the corresponding field filter conditions are likely to be unable to skip this file, resulting in a decrease in the probability of data skipping. The Hibbert curve is similar to Zorder, with the advantage that it does not have a large span of connecting lines, so it is a better way to sort multidimensional fields than Zorder.
This is a specific test scenario for us, using the star schema 1TB dataset, a total of 1000 files, you can see that after sorting according to Zorder, for the three participating Zorder sorting fields of equivalent filtering, only need to scan more than a hundred files, can skip more than 80% of the files, and after the Hibbert curve sorting, the number of files that need to be scanned has been further reduced.
In addition to the data sort distribution, we have also enhanced the indexes supported by Iceberg, supporting multiple indexes to handle different filter criteria and field types.
BitMap indexes can support range filtering, and bitmaps with multiple filter criteria can be combined, increasing the skip probability. However, there are two main problems with bitmap, one is that it is too expensive to store a bitmap for each cardinal value, and the other is that the range query needs to read a large number of bitmaps to calculate the intersection and difference, which greatly limits the application scenarios of bitmap indexes, and the use of indexes may lead to inverse optimization of performance. We also have some exploration in this area, if you are interested, you can refer to: https://zhuanlan.zhihu.com/p/433622640.
BloomRangeFilter is an index similar to BloomFilter but supports Range filtering scenarios implemented by us with reference to the public paper, there is a possibility of False Positive, but the storage space required is greatly reduced compared to the precise index of BitMap, in our actual tests, it can generally reach one-tenth of the size of our optimized Bitmap index.
Based on Iceberg’s support for rich index types, as well as improving data aggregation through data sorting distribution, and ensuring the effectiveness of indexing, how did Trino use Iceberg’s index?
It can be divided into two stages, the first stage is when the Coordinator gets the InputSplit, this stage uses the relevant index information stored in the metadata file of the Iceberg table, such as the minmax value of each field of each file, and the skipped file does not generate the InputSplit.
The second stage is that when the Trino Worker receives the assigned task and processes the data in the Input Split, it first reads the index file data corresponding to the file according to the file and interprets whether the current file can be skipped.
The index file and the data file we generate are one-to-one correspondence, when the index size is less than a certain threshold is saved in the metadata of the table, used in the stage for a while, when the threshold is greater than the threshold, saved in a separate file, the second phase is used.
In queries with Joins, how to effectively Skip does not need to access the data is a difficult problem to solve. For a typical star model scenario, the key to performance is the amount of data scanned in the fact table, but the filter conditions are generally filtered according to the dimension fields in the dimension table, and Trino cannot use the filter conditions of the dimension fields to skip the files of the fact table.
We support the definition of virtual related columns on the Iceberg table, the association column is equivalent to the dimension field of the dimension table to the fact table, of course, it will not really be stored, just a logical definition, and then the user can treat the associated column like the original column, you can define the data sorting organization based on the associated column, and you can define the index based on the associated column. The association column requires that the fact table and the dimension table meet a certain constraint relationship, that is, the result after the fact table and the dimension table Join is equivalent to the widening of the fact table, and the number of rows in the fact table has not increased or decreased, which is called Record-Preserved Join.
Generally satisfying this condition is: the fact table Left Join dimension table, and the Join key of the dimension table satisfies the constraint of the Unique Key, or the join key of the two tables satisfies the constraint of PKFK, then the fact table and the dimension table LEFT JOIN or Inner Join can guarantee the Record-Preserved Join.
After we can define the data sorting organization and index according to the associated columns, for the filter conditions of the dimension field, by adding a Trino optimizer Rule, the filter conditions that meet the conditions can be extracted from the TableScan of the dimension table and pushed down to the TableScan of the fact table, and the index data of the field defined on the fact table can be used to determine whether the current fact table data file can be Skip, so as to make the data of the star model The Skipping effect can achieve a similar effect to large watches, which is a very big improvement for our business scenarios that support star models.
Through the support for data sorting organization, indexes and associated columns, Trino + Iceberg can basically do only the actual scanning of SQL logically required data at the file level to participate in the calculation in the engine, but for some query scenarios that contain aggregation operators, you can SQL logically need to calculate a large amount of data, aggregate into a small number of result sets to return to the user, such a scenario is mainly to solve performance problems through precalculation, and directly respond to the query through the results of precalculation. This avoids the actual scan to calculate a large amount of data.
We currently support direct response to user table/partition level count/min/max aggregate queries directly from data in Iceberg metadata. For a more general precomputing scheme, in the development process, how to achieve efficient file-level precomputing storage and query, how to use part of the file precomputing results to speed up the query, how to solve the precomputing cube dimension explosion problem, etc., this is a very interesting and challenging direction, we have the actual results of the time to share with you in this aspect of the work.
B station lake warehouse integrated platform is currently in the stage of rapid development, here and you to share some of the current key indicators, our Trino cluster is about 5376 core, there are 70,000 queries per day, the total amount of data access is currently 2PB, through the data sorting / index and other extensive applications, the average query only needs to scan 2GB of data, the overall P90 response time is within 2s, basically reached our goal of building a second-level response to the lake warehouse integrated platform.