Sharing guest: Zhang Youjun ByteDance
Editor: Wang Yuxiang
Production platform: DataFunTalk
Introduction: Today I am very happy to share with you some of the practices of the byte data platform in the real-time data warehouse. There is already a lot of talk in the community about some of the basic technical principles related to data lakes and Hudi, so our sharing today focuses on the practical part.
Today our sharing is mainly divided into four parts:
Introduction to real-time data warehouse scenarios
A preliminary study of the data lake in the real-time data warehouse scenario
Data lake practice and deep optimization in the typical scenario of real-time data warehouse
Introduction to real-time data warehouse scenarios
In order to better land the data lake, we have done some in-depth communication with the business before landing, and are mainly divided into three scenarios according to the characteristics of different businesses:
(1) Scenario A typical business is mainly short video and live broadcasting, its data magnitude is generally relatively large, such as large traffic log data, its calculation cycle is generally natural days, hours or minutes level, real-time requirements are generally within five minutes, the main appeal is the multiplexing of batch streams, can tolerate inconsistencies in a small amount of data.
(2) Scenario 2 is generally part of the scene of live broadcasting or e-commerce, the data volume is generally medium volume, which is a long-term calculation, and the requirements for real-time are generally within one minute, and the main appeal is low-cost data backtracking and cold start.
(3) Scenario three is mainly some scenarios of e-commerce and education, generally small-scale business data, will do full calculation of data, its real-time requirements are second-level, the main appeal is strong consistency and high QPS.
We have combined these characteristics to make some complete solutions based on the data lake, and then we will understand them one by one based on some actual scenarios and cases.
A preliminary study of real-time data warehouse scenarios
In this section, we discuss the preliminary study of the byte real-time data warehouse scenario and the problems and solutions encountered.
Frankly speaking, when it was first landed, everyone’s attitude towards the data lake supporting online production was in doubt, and our initial plan was relatively conservative. Let’s first select some scenarios that compare existing solutions and data lakes with prominent advantages, and try to land on a small scale for some of these pain points.
There are two relatively big problems in offline number positions, one is the timeliness problem, and the status quo is generally day or hour; The second big problem is the update problem, such as the need to update part of the data within a certain hour, the status quo needs to re-brush all the data in the partition, such an update efficiency is very low. For such a scenario, the data lake is both time-sensitive and efficiently updated. At the same time, compared with the real-time data warehouse, the data lake can be stored in one and used in batches, so as to directly carry out efficient data analysis.
Based on the above analysis of the business, we will follow the following steps to do the landing of the first line.
1. Landing scheme based on video metadata
Looking at the above figure, our original scheme has three Hive tables, Hive Table 1, 2, 3. For the entire link, we will import the data from the MySQL data source on the left into Table 1, the data from Redis on the right into Table 2, and then make the two tables Join. There are two big problems here, one is the higher resource utilization rate during peak periods, because the amount of day-level Dump data is large and concentrated in the early morning; Second, the readiness time is relatively long, because there is deduplication logic, and the data of the T-1 day partition and the data of the current day partition will be combined with the data of the current day to fall to the partition of the same day (T day) after decalculation.
We spread the sky-level Dump to each hour by introducing Hudi for Upsert. Since Hudi itself can support deduplication logic, we can think of Table 1 as a real-time full amount of data, when hour-level (e.g. 23 o’clock) data once Upsert is completed, we can directly perform downstream Join logic, so that we can advance the data ready time by about 3.5 hours, and the resource consumption during peak periods can be reduced by 40%.
2. Near real-time data verification scheme
For real-time scenarios, when a real-time task makes a more frequent change, such as optimization or change of new indicators, it is generally necessary to verify whether the output of the real-time task meets expectations. Our current scenario is to run an hour-level job, take an hour of data from Kafka Dump to Hive, and then verify whether the full amount of data is as expected. In some more urgent scenarios, we can only check part of the data, and the requirements for timeliness are relatively high. After using the Hudi-based scheme, we can uproot the data directly into the Hudi table through Flink, and then query the data directly through Presto to achieve near-real-time visible and measurable of the full data. From the perspective of online effect, it can greatly improve the development efficiency of real-time tasks and ensure data quality.
In the above exploration process, we have encountered many problems, the first problem is that the ease of use is relatively poor, and the operation and maintenance costs and interpretation costs are relatively high. For the ease of use part, we initially submitted the SQL through the script, you can see that the parameters in the SQL are relatively large, and the schema containing DDL is more troublesome, which will lead to poor ease of use and is unacceptable to the business side.
For the above problems we made a targeted solution, first of all, we replaced the previous task submission method with a pure SQL commit, and by accessing the unified Catalog automation to read the schema and the necessary parameters, the SQL into the lake can be simplified to the form shown in the figure.
Typical scenario practice
Next, let’s look at Byte’s current Hudi-based real-time digital warehouse overall link.
As you can see, we support real-time access to the lake of data, such as MySQL, Kafka can fall directly to Hudi via Flink; It also supports certain in-lake calculations, such as importing MySQL data into Hudi through Flink in the lower left of the figure, and then doing some calculations through Flink and then falling to Hudi. In terms of data analysis, we can use Spark and Presto to connect Kanban BI to do some interactive queries. When we need to connect to other online systems, especially in scenarios with high QPS, we will first access KV storage and then access the service system.
Let’s look at the specific scenario.
1. Real-time multidimensional summarization
For a real-time multidimensional summarization scenario, we can write Kafka data increments to Hudi’s light summarization layer. For analysis scenarios, you can perform multi-dimensional heavy summary calculations on demand based on Presto, and you can directly build corresponding visual Kanban. This scenario does not have very high requirements for QPS and latency, so it can be built directly, but for the data product scenario with high QPS and low latency appeal, the current solution is to perform multi-dimensional pre-calculation through Presto, and then import it into the KV system to further dock the data product. In the medium to long term, we will take a materialized view-based approach, so that we can further simplify some operations on the business side.
In the above link, we also encountered more problems:
(1) Poor write stability. The first point is that Flink occupies a large amount of resources in the process of entering the lake, the second point is that frequent restart of tasks can easily lead to failure, and the third point is that Compaction has no way to execute in time and affect the query.
(2) Poor update performance. The backpressure that causes the task to be severe is severe.
(3) The degree of concurrency is difficult to improve. It will have a greater impact on the stability of Hudi Metastore Service (currently Byte’s self-developed Hudi metadata service, compatible with Hive interface, ready to contribute to the community).
(4) The query performance is relatively poor. There is a delay of ten minutes and even frequent query failures.
In the face of these problems, I will briefly introduce some targeted solutions:
(1) Write stability governance
In this part we solve this problem through the asynchronous Compaction + Compaction Service solution. Before Flink entered the lake, we did Compaction inside Flink, and found that this step was the key to exposing the above series of problems. Optimized, Flink’s inbound task is only responsible for writing incremental data and Schedule Compaction logic, while Compaction execution is handled by the Compaction Service. Specifically, the Compaction Service asynchronously pulls the Pending Compaction Plan from the Hudi Metastore and submits the Spark batch task to complete the actual Compact. The Compaction execution task is completely asynchronous and isolated from the Flink write task, which greatly improves stability.
(2) Efficient update of indexes
Supports a significant increase in the magnitude of data. In simple terms, we can quickly locate the target file based on hash calculation to improve write performance; At the same time, hash filtering can be performed, so that the optimization of the query analysis side can also be performed.
(3) Request the optimization of the model
The current Hudi Community Edition of WriteTask polls Timeline, resulting in continuous access to the Hudi Metastore, resulting in limited scalability. We shifted the polling request for WriteTask from the Hudi Metastore to the pull of the JobManager cache, which greatly reduced the impact on the Hudi Metastore. This optimization allows us to move from RPS (Request Per Sec) in the order of hundreds of thousands to tens of millions.
Next, let’s talk about query-related optimizations.
(4) MergeOnRead column cropping
For native MergeOnRead, we will merge LogFile and BaseFile after reading the full amount, which will cause performance loss when querying only some columns, especially if there are many columns. The optimization we made is to push the column reading down to the Scan layer, and at the same time, when the log file is merged, the map structure will be used to store K, V (K is the primary key, V is the row record), and then the row record is column clipped, and finally the Log Merge operation is performed. This significantly reduces serialization and deserialization overhead, as well as memory usage.
(5) Parallel read optimization
In general, when the engine layer reads Hudi, a Filegroup only corresponds to a Task, which is easy to cause performance bottlenecks when a single FileGroup has a large amount of data. Our optimization for this is to split BaseFiles, and each split file corresponds to a Task to improve read parallelism.
Hudi Community Edition’s current implementation of in-memory merging and transferring data is based entirely on the Avro format, which results in a large number of serialization and deserialization calculations when interfacing with specific engines, resulting in relatively large performance problems. We worked with the community to optimize the Combine Engine by drilling down the interface to the data structure at the engine level. For example, when reading FileGroup, we can directly read Spark’s InternalRow or Flink’s RowData, thus minimizing dependence on the Avro format. Such optimizations can greatly improve the performance of MergeOnRead and Compaction.
The next two optimizations will not be described in detail for time reasons.
(7) Real-time data analysis
In this scenario, we can import the detailed data directly into Hudi through Flink, and also do a wide table processing according to the DIM table and fall to the Hudi table. There are two main demands in this scenario, one is the efficient entry of log-type data into the lake, and the other is the correlation of real-time data. For the appeal of these two scenarios, we have made some targeted optimizations.
(8) Log-type data enters the lake efficiently
For logged data, we support NonIndex’s index. The main support for Hudi Community Edition is based on indexes with primary keys, such as the Bloom Filter or the Bucket Index we provide to the community. There are two main steps in generating indexing methods based on primary keys, the first step is that the data will be positioned on the data when it is written in, query whether there is historical data, if there is Update, if not, Insert, and then locate the corresponding file to put the data Append into the Log. Then merge and deduplicate in memory during Merge or in Compaction, which are also time-consuming. For NonIndex, there is no concept of a primary key, so it is also supported that log-type data without a primary key is entered into the lake. In this way, for log data, you can directly apply to the Log File when writing, and in the process of merge, we can directly apply the incremental data data to the Base File without deduplication. In this way, the efficiency of entering the lake has been greatly improved.
(9) Real-time data correlation
In response to a series of issues that currently arise with real-time Join, we have supported the correlation of storage tiers based on Hudi. For Hudi, different streams can write their corresponding columns and stitch them at Merge, which is a complete wide table for external queries. Specifically, one of the big problems in real-time data writing is how to handle write conflicts across multiple streams. We mainly do conflict detection based on Hudi Metastore.
For the read process, we will first read multiple LogFiles into memory for Merge, then perform the final Merge with BaseFile, and finally output the query results, both Merge and Compaction will use this optimization.
1. Elastic and scalable indexing system
We just introduced the bucket index to support the update of the big data volume scenario, the bucket index can also be bucket storage of data, but the calculation of the number of buckets needs to be evaluated according to the size of the current data volume, if the subsequent need to re-hash then the cost will be relatively high. Here we expect to improve the scalability of hash indexes by establishing the Extensible Hash Index.
2. Adaptive table optimization service
In order to reduce the cost of understanding and use of users, we will work with the community to launch the Table Management Service to host Compaction, Clean, Clustering, and Index Building jobs. In this way, the relevant optimizations are transparent to the user, thereby reducing the user’s cost of use.
3. Metadata service enhancements
At present, we have used Hudi Metastore to support some online services internally, but there are more needs that follow, and it is expected that the enhanced metadata services are as follows:
(1) Schema Evolution: Support business requests for Hudi Schema changes.
(2) Concurrency Control: Supports batch stream concurrent writes in the Hudi Metastore.
4. Batch flow integration
For stream-batch processing, our plan is as follows:
(1) Unified SQL: The SQL layer that achieves unified batch flow, Runtime is co-computed by Flink/Spark/Presto multiple engines.
(2) Unified Storage: Hudi-based real-time data lake storage, by Hudi to do unified storage.
(3) Unified Catalog: Construction and access of unified metadata.
Q1: Is the file format of MergeOnRead column cropping columnar or row?
A1: Before optimization, it was based on Avro-based row storage, and we have now supported the Parquet Log column memory format, which can bring about improvements in storage and query performance.
Q2: Is the dispatch of Async Compaction the Hudi Metastore accessed inside Flink?
A2: Flink did three things by default before, the first thing was Hudi incremental writing, the second thing was to schedule a Compaction plan after several incremental writes, and the third thing was to execute the Compaction Plan. What we’re doing so far is just splitting up this step of the Compaction Plan execution and using the Compaction Service to pull the Hudi Metastore metadata to perform the Compaction. (See Write stability governance)
Q3: How are Hudi tables managed?
A3: Via Hudi Metastore. At present, the Hudi Metastore we use is mainly deployed on MySQL, which supports the management of file and library table metadata, the Snapshot service, Hudi’s own Timeline service, and some concurrency control processing. We are fully compatible with the Hive Metastore’s interface standards, and then based on the Hudi feature, we extend Hudi-specific interfaces such as Timeline-related interfaces. At the same time, we support the unified access of streaming batch applications to Hudi Metastore for data processing. (See Metadata Service Enhancements for the figure)
Q4: Can you get an in-depth look at the process of writing Hudi and conflict resolution?
A4: Multi-stream Hudi incremental write, different streams can be written to different Log files, this point is not a conflict, the point that will produce a conflict is a two-stage commit conflict, if it involves writing the same column, it will produce a column conflict, then we will do column-level conflict detection in the Hudi Metastore, if there is a conflict, it will directly refuse to commit, if not, we will think that it is to write two streams, you can write normally concurrently. (See Real-Time Data Correlation – Write Flow)
Q5: What is the relationship between the Kafka flow table and the Hudi flow table in the real-time data warehouse, and will the Hudi table be used instead of the Kafka flow table in the future?
A5: Today we introduce the landing and trial of some near-real-time (minute level) scenarios mainly supported by Hudi, and in some scenarios we need a second-level response, so we need to do some planning and experimentation of streaming-batch integration. In the long run, we will replace Kafka’s streaming capabilities with Hudi streaming capabilities. (See future planning – flow batch integration)
Q6: Does the real-time data warehouse use Hudi at each layer?
A6: In-lake computing is still in the small-scale promotion stage, some scenarios we are doing POC, and some scenes are also in the process of preparing to go online.
Q7:Why use Bucket Index?
A7: Before using the Bucket Index, we use the Bloom Filter Index, and the Bloom filter is no problem in small data volume scenarios, but there will be prominent false positives in 100 terabytes of data, and when the data does not exist, it will scan a lot of non-essential files and cause waste of resources. With the Bucket Index we can locate the file where the data is located more quickly directly through the calculation of the hash value.
This concludes today’s sharing, thank you.
Share at the end of the article, like, watch, give a 3 combo ~
01/ Sharing guests
ByteDance is a big data engine R&D engineer
He worked in the data lake team of ByteDance’s data engine department and served as a senior engineer of data lake. He has successively engaged in Spark engine research and development, intelligent data warehouse research and development, and is now responsible for the real-time data lake core research and development based on HUDI and the landing of ByteDance scenarios.
02/ Free download materials
03/ Register to watch live PPT for free
DataFun: Focus on the sharing and exchange of big data and artificial intelligence technology applications. Founded in 2017, more than 100+ offline and 100+ online salons, forums and summits have been held in Beijing, Shanghai, Shenzhen, Hangzhou and other cities, and more than 2,000 experts and scholars have been invited to participate in sharing. Its public account DataFunTalk has produced 700+ original articles, millions + reads, and 140,000+ accurate fans.
🧐 Share, like, watch, give a 3 combo! 👇