Click Follow Official Account 👆 to explore more Shopee technology practices

As an important development direction in the field of big data, LakeHouse provides a new scene of the integration of flow batches and the combination of lake warehouses. At present, the problems of data timeliness, accuracy, and storage costs that enterprises will encounter in many businesses can be solved through the integrated lake warehouse solution.

At present, several mainstream open source solutions for lake warehouse are constantly being developed iteratively, and the applications of the industry are also moving forward in exploration, and it is inevitable that some imperfect places and unsupported features will be encountered in actual use. Shopee internally customizes its own version based on the open source Apache Hudi to implement enterprise-grade applications and some new features for internal business needs.

Through the introduction of Hudi’s Data lake solution, the data processing process of Shopee’s Data Mart, Recommendation, ShopeeVideo and other products has realized the characteristics of flow batch integration and incremental processing, which greatly simplifies this process and improves performance.

The image above is a set of overall solutions provided by the Shopee Data Infrastructure team for the company’s internal business parties.

Let’s first analyze three typical problems encountered in the construction of Shopee data system.

The first problem: In the process of database-based data integration, there is the same copy of data facing both stream processing and batch processing requirements. The traditional approach is to implement two links, full export and CDC. The full export link meets the needs of batch processing, and the CDC link is used for real-time processing and incremental processing scenarios.

However, one problem with this practice is that full export is inefficient, resulting in high database load. In addition, data consistency is difficult to guarantee.

At the same time, there is a certain storage efficiency optimization in the construction of batch data sets, so we hope to build batch data sets based on CDC data, so as to meet the needs of three processing scenarios at the same time and improve data timeliness.

The second problem is that the status indicates fine storage. We can think that a traditional batch data set is a snapshot of the overall state of business data at a certain point in time, and a snapshot compressed to a point will merge the process information in the business process. These processes of change reflect the user’s use of our services and are very important analytical objects. Once merged, it cannot be expanded.

In addition, in many scenarios, the daily change of business data only accounts for a small part of the total amount of data, and each batch is stored in full volume, which will bring a lot of waste of resources.

The third problem is the creation of large wide tables. Near-real-time wide table construction is a common scenario in data processing, and its problem is that the traditional batch processing latency is too high, and the use of streaming computing engine resources is seriously wasted. Therefore, we built a business wide table based on multiple data collections that support Ad hoc class OLAP queries.

In view of the problems encountered in the above business, based on the following three considerations, we finally chose Apache Hudi as the solution.

We expect to build data integration environments using a pure streaming approach, and Hudi has good support for streaming scenarios.

The second point is the compatibility of various big data ecosystems. The datasets we build will have a load of batching, streaming, deltating, and dynamic exploration. These workloads currently run in a variety of compute engines, so support for multiple compute engines is also on our consideration.

Another consideration is the fit for Shopee’s business needs. Currently, most of the datasets we need to work on come from business systems and have unique identification information, so Hudi’s design is more in line with our data characteristics.

Currently, our platform provides Flink and Spark as general-purpose compute engines, as bearers of data integration and data warehouse construction workloads, and also uses Presto to carry data exploration functions. Hudi supports all three.

In actual use, depending on the importance of business data, we will also provide users with different ways of indexing data.

During data integration, a user’s schema change is a very common need. Data changes in ODS can cause errors in downstream calculation tasks. At the same time, when processing incrementally, we need the semantics of time processing. Supporting the storage of primary key data is of great significance to the data in our business database.

At present, Shopee has a large amount of business data from the business database, and we use CDC-like technology to obtain the change data in the database and build ODS layer data for the business side to support batch processing and near-real-time incremental processing.

When a business party’s data needs to be accessed, we will do a full amount of Bootstrap before carrying out incremental real-time integration, build the basic table, and then build it in real time based on the newly accessed CDC data.

In the process of building, we generally choose the COW table or MOR table to be built according to the user’s needs.

In the process of building in real time, there are two more common problems:

One is that the user configures the type of the dataset with a large number of changes to a COW table, causing the data to be written to zoom in. What we need to do at this point is to establish the appropriate monitoring to identify this configuration. At the same time, we support synchronous or asynchronous updates of data files based on the configured data merge logic of MOR tables.

The second problem is that the default Bloom filter causes data existential judgments. A better approach here is to use HBase Index to solve the problem of writing to very large data sets.

This is the effect of swapping some of our data integration links with real-time Hudi-based integration. The above figure is the relationship between the proportion of data visibility and latency, and we can currently ensure that 80% of the data is visible and available within 10 minutes, and all data is visible and available within 15 minutes.

The following figure is our statistical proportion of resource consumption. The blue part is the resource consumption of the real-time link, and the red part is the historical resource consumption of the batch data integration.

Because the switch is a real-time link, the repetitive processing is reduced for some large tables with low repetition rate, and the resource consumption caused by the reduction of centralized processing efficiency is also reduced. As a result, our resource consumption is much lower than that of batch mode.

For scenarios where users need status details, we provide a service based on the Hudi Savepoint feature that periodically builds snapshots according to the time period that users need, and these snapshots exist in the metadata management system in the form of partitions.

Users can easily use this data in Flink, Spark, or Presto. Because the data store is complete and has no consolidated details, the data itself supports full computation and incremental processing.

When using the storage of incremental views, for some scenarios where the proportion of change data is not large, better storage savings will be achieved.

Here is a simple formula for calculating space usage: (1 + (t – 1) * p ) / t.

where P represents the proportion of change data and t represents the number of time periods that need to be saved. The lower the proportion of changing data, the better the storage savings. For long-term data, there will also be a better saving effect.

At the same time, this method has a better resource saving effect on incremental computing. The disadvantage is that there will be a certain problem of reading amplification by batch full calculation.

When our dataset is built on Hudi MOR tables, we can support batch, delta, and near-real-time processing loads at the same time.

Taking the figure as an example, Table A is an incremental MOR table, when we build subsequent tables B and table C based on Table A, if the calculation logic supports incremental construction, then we only need to obtain new data and changed data in the process of calculation. This significantly reduces the amount of data involved in the calculation process.

Here’s a near-real-time user job analysis built by the offline computing platform based on Hudi’s incremental computing. When a user submits a Spark task to the cluster to run, the user’s logs are automatically collected after the task ends, and the relevant metric and key logs are extracted from them and written to the Hudi table. Then a processing task incrementally reads these logs and analyzes the optimization items of the task for the user’s reference.

When a user job is run, the user’s job situation can be analyzed within one minute, and an analysis report can be formed for the user.

In addition to incremental computing, incremental Join is also a very important application scenario.

Compared with traditional Join, incremental computing only needs to find the data file that needs to be read according to the incremental data, read it, and analyze the partition that needs to be rewritten and rewrite.

Incremental computing significantly reduces the amount of data involved in the calculation relative to the full amount.

Merge Into is a very useful technique for building real-time wide tables in Hudi, which is mainly based on Partial update.

The Spark SQL-based Merge Into syntax is shown here, which makes it easy to develop jobs that build wide tables.

Hudi’s implementation is in the Payload fashion, where only a subset of columns of a table can exist.

The Payload of the incremental data is written to a log file, and then a wide table used by the user is generated in a subsequent merge. Because there is a time delay in subsequent merges, we have optimized the write logic for the merge.

After the data merge is complete, we write a merged data time and the associated DML in metadata management, and then analyze the DML and time as we read the MOR table to provide data visibility.

The benefits of using Partial Update are:

While solving the internal business problems of Shopee, we have also contributed a batch of code to the community, sharing internal optimizations and new features, and the larger features are meta sync (RFC-55 completed), snapshot view (RFC-61), partial update (HUDI-3304), FileSystemLocker (HUDI-4065 completed) and so on; It also helped the community fix a lot of bugs. In the future, I also hope to use this way to better meet business needs while participating in community co-construction.

There are several typical scenarios for incremental views:

For the snapshot view requirements, Hudi can already support it to some extent through two key features:

A simple implementation is shown in the following figure:

However, in the actual business scenario, in order to meet the user’s snapshot view needs, more needs to be considered from the ease of use and usability.

For example, how do users know that a snapshot has been released correctly? One of the issues involved in this is visibility, that is, the user should be able to get the snapshot table explicitly throughout the pipeline, where Git-like tag functionality is needed to enhance ease of use.

In addition, in the snapshot scenario, a common requirement is the accurate slicing of data. An example is that users don’t actually want the event time to drift into the snapshot of No. 2 in the event time, but rather do it in combination with watermark in each FileGroup to do fine instant slicing.

To better meet the needs of the production environment, we have implemented the following optimizations:

We are currently contributing the overall functionality back to the community through RFC-61, and the benefits of the actual landing process have been described in the previous chapters and will not be repeated here.

In the previous article, we briefly introduced the multi-source partial column update (large wide table stitching) scenario, we rely on Hudi’s multi-source merge capability to implement the Join operation in the storage layer, which greatly reduces the pressure on the compute layer on state and shuffle.

Currently, we mainly implement multi-source partial column updates through Hudi’s internal Payload interface. The following diagram shows the interaction between Payload and the reader side of Hudi.

The principle of the implementation is basically to implement the merge logic of different source data of the same key through a custom Payload class, the writer will do the merging of multiple sources within the batch and write to the log, and the reader will also call the same logic when merging when reading to handle the cross-batch situation.

What needs to be noted here is the problem of out-of-order and late events. If left unprocessed, it often results in old data overwriting new data or incomplete column updates downstream.

For out-of-order and late-arriving data, we have enhanced the Multiple ordering value in Hudi to ensure that each source can only update the data that belongs to its own part of the column, and can ensure that only new data will overwrite the old data according to the event time (ordering value) column set.

In the future, we are also ready to combine lock less multiple writers to implement multi-job multi-source concurrent writes.

In view of the problems faced in the construction of Shopee data system, we proposed a solution of lake warehouse integration, and selected Hudi as the core component through comparative selection.

In the process of landing, we met three main user demand scenarios by using Hudi’s core features and extensions and transformations on top of them: real-time data integration, incremental view, and incremental computing. And brings users low latency (about 10 minutes), reduced computing resource consumption, reduced storage consumption and other benefits.

Next, we will also provide more features and further improve the following two aspects, so as to meet more scenarios of users and provide better performance.

Currently, Hudi supports the write mode of a single task based on file locks.

However, in practice, there are scenarios where multiple tasks require multiple writers to write at the same time, and the write partitions are crossed, and the current OCC does not support this situation well. We are currently working with the community to solve the Flink and Spark multiple writer scenarios.

Metadata reading and File listing operations can have a large performance consumption on both the write and read sides, and the massive number of partitions can also put a lot of pressure on external metadata systems such as HMS.

In response to this problem, we plan to take the first step in transitioning information storage outside the schema from HMS to MDT; The second step is to use a separate MetaStore and Table service server in the future, no longer strongly coupled to HDFS.

In this server, we can more easily optimize read performance and make resource adjustments more flexible.

The author of this article

Jian, a big data technology expert, from the Shopee Data Infrastructure team.

Team profile

The Shopee Data Infrastructure team focuses on building a stable, efficient, secure, and easy-to-use big data infrastructure and platform for companies.

Our business includes: real-time data link support, Kafka, Flink related development; Development and maintenance of Hadoop ecosystem components such as HDFS and Spark; O&M of Linux operating system and O&M of big data components; OLAP components, Presto, Druid, Trino, Elasticsearch, ClickHouse development and business support; Development of big data platform systems, resource management, task scheduling and other platforms.