About the author

Du Junling, big data engineer of ByteDance data platform, ten years of big data experience, many years of Spark, Presto development work, is currently responsible for Apache Doris optimization.

Doris is an MPP architecture analytical database, mainly for multi-dimensional analysis, data reports, user portrait analysis and other scenarios. It comes with an analysis engine and storage engine, supports vectorized execution engine, does not rely on other components, and is compatible with the MySQL protocol.

First, Doris introduction

Apache Doris has the following features:

1) Good architecture design, support high concurrency and low latency query services, support high throughput interactive analysis. Multiple FEs can provide external services, and when concurrency increases, the linear expansion FE and BE can support high-concurrency query requests.

2) Support batch data load and streaming data load, support data update. Support Update/Delete syntax, unique/aggregate data model, support dynamic update data, real-time update of aggregate indicators.

3) Provides enterprise-level features with high availability, fault tolerant processing, and high scalability. FE Leader error exception, FE Follower seconds switch to the new Leader to continue to provide external services.

4) Support aggregation table and materialized view. A variety of data models, support aggregate, replace and other data models, support for creating rollup tables, support for creating materialized views. Rollup tables and materialized views support dynamic updates without the need for manual user processing.

5) MySQL protocol is compatible, supports direct use of MySQL client connection, very easy to use data application docking.

Doris is composed of Frontend (hereinafter referred to as FE) and Backend (hereinafter referred to as BE), in which FE is responsible for accepting user requests, compiling, optimization, distributing execution plans, metadata management, BE node management and other functions, BE is responsible for executing the execution plans issued by FE, storing and managing user data.

Introduction to Hudi Data Lake Format

Hudi is a next-generation streaming data lake platform that provides tabular management capabilities for data lakes, providing transactional, ACID, MVCC, data update and delete, incremental data reading, and more. Support Spark, Flink, Presto, Trino and other computing engines.

Hudi is divided into two table types based on how the data behaves differently when it is updated:

There are 3 different query types for Hudi’s two table formats:

Third, the technical background of Doris’s analysis of Hudi data

In the data warehouse business, as the business has higher and higher requirements for data real-time, the T+1 data warehouse business has gradually evolved to the hour, minute, and even second levels. The application of real-time digital warehouses has also become more and more extensive, and it has also experienced multiple stages of development. Multiple solutions exist.

Lambda divides the data processing stream into two different processing paths, online analysis and offline analysis, the two paths are independent of each other and do not affect each other.

Offline analysis processes T+1 data, uses Hive/Spark to process large volumes, immutable data, and data is generally stored on systems such as HDFS. If you encounter data updates, you need to overwrite the entire table or the entire partition, which is relatively costly.

Online analytics process real-time data, stream data using Flink/Spark Streaming, analyze and process second- or minute-level streaming data, and save data in Kafka or periodically (minute-level) to HDFS.

This suite of solutions has the following disadvantages:

The same set of indicators may require the development of two codes for online and offline analysis, which is complex to maintain.

Data application query metrics may require querying both offline and online data, making development complex.

Deploy two sets of engines for batch and streaming computing at the same time, which is complex for operation and maintenance.

Data updates require overwriting entire tables or partitions, which is costly.

With more and more online analysis business, the drawbacks of Lambda architecture are becoming more and more obvious, adding an indicator needs to be developed online and offline, it is difficult to maintain, offline indicators may not be aligned with online indicators, complex deployment, and many components. The Kappa architecture was born.

The Kappa architecture uses a set of architectures to process both online and offline data, using the same set of engines to process both online and offline data, and data is stored on message queues.

The Kappa architecture also has certain limitations:

The streaming compute engine has weak batch processing capabilities and weak performance in processing large volumes.

The data store uses Message Queuing, which has validity restrictions on the data store, and historical data cannot be traced.

Data timing may be out of order, which may cause data errors for some applications with strict time series requirements.

Data applications need to take numbers from the message queue, and need to develop adaptation interfaces, which is complex to develop.

In view of the defects of Lambda architecture and Kappa architecture, the industry has developed Iceberg, Hudi, DeltaLake data lake technologies based on data lake, so that the data warehouse supports ACID, Update/Delete, data Time Travel, Schema Evolution and other features, so that the timeliness of the data warehouse is improved from the hour level to the minute level, and the data update also supports partial updates, which greatly improves the performance of data updates. It combines the real-time of streaming computing with the throughput of batch computing, and supports near-real-time scenarios.

Among the above solutions, the application based on data lake is the most extensive, but the data lake mode cannot support higher real-time in seconds, nor can it directly provide data services to the outside world, and other data service components need to be built, and the system is more complex. Based on this background, some businesses began to use Doris to undertake, business data analysts need to perform federal analysis of data in Doris and Hudi, in addition to Doris to provide data services to the outside world, both to query the data in Doris, but also to speed up the query of data lake data in offline business, so we developed the characteristics of Doris access to data in the data lake Hudi.

Doris analyzes the design principles of Hudi data

Based on the above background, we designed the query data lake format Hudi data in Apache Doris, because the Hudi ecology is the Java language, and the execution node BE of Apache Doris is the C++ environment, and C++ cannot directly call the Hudi java SDK, for this, we have four solutions.

1) Implement Hudi C++ client and directly call Hudi C++ client in BE to read and write Hudi tables.

The solution requires a complete implementation of a set of Hudi C++ client, the development cycle is long, and the later Hudi behavior changes need to be modified synchronously to the Hudi C++ client, which is more difficult to maintain.

2) BE sends a read and write request to the broker via the thrift protocol, and the broker calls the Hudi java client to read the Hudi table.

The scheme needs to increase the function of reading and writing Hudi data in Broker, the current Broker positioning is only the operation interface of fs, and the introduction of Hudi breaks the positioning of Broker. Second, the data needs to be transferred between BE and Broker, which has lower performance.

3) Use JNI to create the JVM in BE, load the Hudi java client to read and write Hudi tables.

This solution requires maintaining the JVM in the BE process, and the JVM calls the Hudi java client to read and write to Hudi. The read and write logic is implemented using Hudi Community Java, which can maintain synchronization with the community; At the same time, the data is processed in the same process, and the performance is high. However, it is necessary to maintain a JVM in the BE, which is more complicated to manage.

4) Use BE arrow parquet c++ api to read the hudi parquet base file, and the delta file in the hudi table is not processed for the time being.

The program can directly read the parquet file of the hudi table by BE with the highest performance. However, the combined read of base file and delta file is not currently supported, so only COW table Snapshot Queries and Read Optimized Queries of MOR table are supported, and Incremental Queries are not supported.

In summary, we chose the fourth solution, the first phase implemented the COW table Snapshot Queries and MOR table Read Optimized Queries, and then jointly developed the base file and delta file combined reading C++ interface in conjunction with the Hudi community.

Doris’ technical implementation of analyzing Hudi data

The steps to query and analyze Hudi tables in Doris are very simple.

When creating the table, specify the engine as Hudi, and specify the relevant information about the Hudi appearance, such as the hive metastore uri, the database in the hive metastore, and the table name.

Table creation simply adds a table to Doris’s metadata, without any data movement.

Specifying all or part of the hudi schema when creating a table also supports creating a hudi foreign table without specifying a schema. The schema must be specified with the column name of the hudi table in the hiveMetaStore, of the same type.

Example:

When querying the Hudi data table, FE will query the metadata in the analazy stage to obtain the hive metastore address of the Hudi surface, and obtain the schema information and file path of the hudi table from the Hive metastore.

Gets the data address of the hudi table.

FE plans fragments to add HudiScanNode. Get the list of data file files corresponding to Hudi table in HudiScanNode.

Generate a scanRange based on the list of data files obtained by Hudi table.

Send the HudiScan task to the BE node.

The BE node calls the native parquet reader for data reading based on the Hudi foreign appearance file path specified by HudiScanNode.

Sixth, the later planning

At present, Apche Doris query Hudi table has been integrated into the community, and currently supports Snapshot Query for COW tables and Read Optimized Query for MOR tables. Snapshot Query for MOR tables is not yet supported, nor is Incremental Query in streaming scenarios.

There are several more work to be done, and we and the community are actively working together:

Snapshot Query for MOR tables. The real-time reading of the MOR table needs to merge the reading data file with the corresponding Delta file, BE needs to support the reading of the Delta file AVRO format, and the native reading method of avro needs to be added.

Incremental Query for COW/MOR tables. Supports incremental reads in real-time services.

BE reads the native interface of the Hudi base file and the delta file. Currently, when BE reads Hudi data, it can only read the data file, using parquet’s C++ SDK. Later we and the joint Hudi community provided reading interfaces for languages such as Huid base file and C++/Rust of the delta file, and used the native interface directly in Doris BE to query Hudi data.