Classic scene

Flink side implementation

The business side usually does the JOIN output of multiple data sources on the stream based on the real-time calculation engine, but this solution faces many challenges in practice, which can be divided into the following two situations:

1. Dimension table JOIN

• Scenario challenges: Metric data is correlated with dimension data, where the amount of dimension data is relatively large and the QPS of metric data is relatively high, resulting in data output delays.

• Current scenario: Cache some dimension data to alleviate the task backpressure problem caused by accessing the dimension data storage engine under high QPS.

• There is a problem: due to the large time difference between the dimension data and the metric data of the business side, the metric data flow cannot set a reasonable TTL; Moreover, there is a problem that the dimension data in the cache is not updated in time, resulting in inaccurate downstream data.

2. Multi-stream JOIN

• Scenario challenges: Multiple metric data are correlated, and different metric data may have abnormal situations with large time differences.

• Current scenario: Use a window-based JOIN and maintain a relatively large state.

• Problematic: Maintaining a large state not only puts some strain on memory, but Checkpoint and Restore take longer and can lead to task back pressure.

Based on Hudi Payload’s merge mechanism, we have developed a new multi-stream join solution:

• Multi-stream data is stitched entirely at the storage tier and is independent of the compute engine, so there is no need to preserve state and its TTL settings.

• Dimension data and metric data are updated independently as different streams, and there is no need to do multi-stream data merging during the update process, and then Merge multi-stream data when downstream reading, so there is no need to cache dimension data, and at the same time, Merge can be performed when Compact is executed to accelerate downstream queries.

This scheme provides the ability to correlate multi-stream data at the storage layer, aiming to solve a series of problems encountered by multi-stream joins in real-time scenarios.

Maintain a timeline containing different Instant time on the dataset operation (such as new, modified or deleted) in all tables (Timeline) Each time the dataset operation on the hudi table will generate an Instant on the timeline of the table, so that the data successfully submitted after only querying a certain point in time, or querying only the data before a certain point in time, effectively avoiding scanning data with a larger time range. At the same time, it is efficient to query only the file before the change (if only the data before a certain point in time is queried after an Instant submits the change operation, the data before the modification can still be queried).

Action (Action Behavior):

• COMMITS: Data commits

• CLEANS: Data Deletion


• COMPACTION: Small file merger

• ROLLBACK: Rollback

• SAVEPOINT: A savepoint

Timeline is the abstraction that hudi uses to manage commit, and each commit is bound with a fixed timestamp that is scattered across the timeline. On Timeline, each commit is abstracted as a HoodieInstant, and an instant records the behavior, timestamp, and state of a commit.

The example in the image above shows the upsert operation on the Hudi table every 5 minutes from 10:00 to 10:20, with commit, clean, and compact on the timeline. It can also be observed that the commit time records the data arrival time (e.g., 10:20AM), when in fact the data is organized by event time from a partition per hour at 7:00. Arrival time and event events are two main concepts that balance data latency and integrity.

Late data arrives (e.g., the event time is 9:00 and arrives at 10:20 > 1 hour later) is written to the corresponding partition based on the event data. With the help of the timeline, incremental queries only need to read all the change files that have been successfully committed since the instant time to get new data, without scanning all the files.

Today, transactions on the data lake are considered a key feature of Lakehouse. But what has actually been accomplished so far? What are the current methods? How do they behave in the real world? These questions are the focus of this article.

Having had the privilege of working on various database projects – RDBMS (Oracle[1]), NoSQL key-value store (Voldemort[2]), streaming database (ksqlDB[3]), closed-source real-time data storage, and of course Apache Hudi – I can say for sure that the differences in workloads profoundly affect the concurrency control mechanisms employed in different databases. This article will also show you how we rethink the concurrency control mechanism of Apache Hudi data lake.

First, let’s get straight to the point, RDBMS databases offer the richest set of transaction functionality and the broadest concurrency control mechanisms[4], with different isolation levels, fine-grained locks, deadlock detection/avoidance, and many more mechanisms, as they must support row-level changes and reads across multiple tables, while enforcing key constraints [5] and maintaining indexes[6].

NoSQL storage, on the other hand, provides very weak guarantees, such as only eventual consistency and simple row-level atomicity, in exchange for better scalability for simpler workloads. While traditional data warehouses more or less provide the full set of features you can find in an RDBMS based on column storage, forcing locks and key constraints[7] to be enforced, cloud data warehouses seem to focus more on storage-compute separation architectures while providing less level of isolation. As a surprising example, no [8] key constraint is enforced.

Historically, data lakes have been seen as batch jobs that read/write files on cloud storage, and it’s interesting to see how most of the new work extends this view and uses some form of “optimistic concurrency control [9]” (OCC) to implement file versioning.

OCC jobs employ table-level locks to check if they affect overlapping files, abort operations if there are conflicts, and locks are sometimes even just JVM-level locks held on a single Apache Spark Driver node, which may be fine for lightweight coordination of legacy batch jobs that primarily attach files to tables, but not broadly applicable to modern data lake workloads. Such methods are built with immutable/append-only data models in mind, which are not suitable for incremental data processing or keyed updates/deletes.

The OCC is very optimistic that real conflict will never happen. Developer sermons comparing OCC to the fully fledged transactional capabilities of an RDBMS or traditional data warehouse are completely wrong, quoting Wikipedia directly—”If data resources are frequently contended, the cost of repeatedly restarting transactions can significantly hurt performance, in which case other concurrency control methods [10] may be more appropriate.” When conflicts do occur, they cause a lot of wasted resources because you have batch jobs that fail after several hours of running every attempt!

Imagine a real-world scenario of two write processes: an ingestion write job that generates new data every 30 minutes and a delete job that executes the GDPR and takes 2 hours to complete. These are most likely overlapping files with random deletions, and deletion jobs are guaranteed to starve to death almost every time and fail to submit. On the database side, mixing long-running transactions with optimism can lead to disappointment, because the longer the transactions, the more likely they are to overlap.

So what are the alternatives? Lock? Wikipedia also says – “However, lock-based (‘pessimistic’) approaches may also provide poorer performance, since locks can greatly limit effective concurrency even if deadlocks are avoided.” ”。 This is where Hudi takes a different approach, which we think is better suited for modern data lake transactions, which are often long-running or even continuous. Data lake workloads share more characteristics than high-throughput stream processing jobs compared to standard read/write databases, and that’s where we learn from them. In stream processing, events are serialized into a single ordered log, avoiding any lock/concurrency bottlenecks, and users can continuously process millions of events per second. Hudi implements a file-level, log-based concurrency control protocol on the Hudi timeline [11], which in turn relies on minimal atomic writes to cloud storage. By building event logs as a core part of interprocess coordination, Hudi is able to provide flexible deployment models that provide higher concurrency than a pure OCC approach that only tracks table snapshots.

The simplest form of concurrency control is that there is no concurrency at all. Data lake tables typically run public services on top of them to ensure efficiency, reclaiming storage space from older versions and logs, merging files (Clustering in Hudi), merging increments (Compaction in Hudi), and so on. Hudi can simply eliminate the need for concurrency control and maximize throughput by supporting these out-of-the-box table services and running inline after each table write. Execution plans are idempotent, persisted to the timeline and automatically recovered from failures. For most simple use cases, this means that simply writing is enough to get a well-managed table that doesn’t require concurrency control.

Our removal/ingestion example above is not that simple. While ingestion/write may only be updating the last N partitions on a table, deleting may even span the entire table, mixing them in the same workload can significantly impact ingestion latency, so Hudi offers the option to run the Table service asynchronously, where much of the heavy lifting (such as actually rewriting column data by compressing the service) is done asynchronously, eliminating any duplicate wasteful retries while also using Clustering techniques. So individual writes can be deleted using both regular updates and GDPR and serialized into the log. Given that Hudi has a record-level index and that avro log writes are much cheaper (which can be 10 times or more expensive than writing to parquet), ingest latency can be sustained while enjoying excellent traceability. In fact, we were able to scale this model to 100 petabytes of data at Uber[12], by sorting all deletes and updates into the same source Apache Kafka theme, and concurrency control is more than just locks, Hudi does all of this without any external locks.

However, it is not always possible to serialize deletes into the same write stream, or to require SHL-based deletions. For multiple distributed processes, some form of locking is unavoidable, but just like a real database, Hudi’s concurrency model is smart enough to distinguish what is actually written to a table from the table service that manages or optimizes the table. Hudi provides similar optimistic concurrency control across multiple writers, but the Table service can still execute completely lock-free and asynchronously. This means that the delete job can only encode the delete, the ingest job can record the update, and the compression service applies the update/delete to the base file again. Although delete jobs and ingest jobs can compete with each other and starve to death as we mentioned above, they run much less and waste is greatly reduced because compression does the heavy lifting of parquet/column data writes.

In summary, there are many ways to improve on this basis.

• First, Hudi has implemented a markup mechanism [13] that tracks all files that are part of an active write transaction, and a mechanism that tracks the heartbeat of an active writer to a table. This can be used directly by other active transactions/writers to detect what other writers are doing, and if a conflict is detected, abort [14] early, thus returning cluster resources to other jobs more quickly.

• While optimistic concurrency control is attractive when serializable snapshot isolation is required, it is neither the best approach nor the only way to handle concurrency between writers. We plan to use CRDTs and widely adopted stream processing concepts to achieve completely lock-free concurrency control through our log merge API, which has been proven [15] to maintain huge continuous writes for the data lake.

• When it comes to key constraints, Hudi is the only lake transaction layer today that ensures unique key constraints[16], but only record keys for tables. We will look to extend this functionality to non-primary key fields in a more generic form and use the newer concurrency model described above.

Hudi supports automatic cleansing of unsuccessfully committed data on write. Apache Hudi introduces a markup mechanism at write time to efficiently track data files written to storage. In this article, we’ll dive into the design of existing direct tag file mechanisms and explain its performance issues for very large batches of writes on cloud storage (e.g., AWS S3, Aliyun OSS). And shows how to improve write performance by introducing timeline server-based markers.

A marker in Hudi is a label that indicates the existence of a corresponding data file in storage, and Hudi uses it to automatically clean up uncommitted data in failure and rollback scenarios. Each tag entry consists of three parts

• Data file name

• Tag extension (.marker)

• I/O operations to create files (CREATE – Insert, MERGE – Update/Delete, or APPEND – both).

For example, the tag 91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605.parquet.marker.CREATE indicates that the corresponding data file is 91245ce3-bb82-4f9f-969e-343364159174-0_140-579-0_20210820173605. parquet and the I/O type is CREATE. Before writing to each data file, the Hudi write client first creates a token in the store, which is persisted and explicitly deleted by the write client after the commit is successful. Tags are useful for write clients to efficiently perform different operations, and tags have the following two main functions

• Remove duplicate/partial data files: Multiple executors write concurrently when writing to Hudi via Spark. An executor may fail, leaving part of the data file to write, in which case Spark will retry the Task, and when speculative execution is enabled, there can be multiple attempts to successfully write the same data to different files, but only once in the end the attempt will be submitted to the Spark Driver program process for submission. Tagging helps to effectively identify partially written data files that contain duplicate data compared to data files that were successfully written later, and cleans these duplicate data files before the write and commit are complete.

• Rollback failed commits: Writes may fail in the middle, leaving a partially written data file. In this case, the tag entry remains in storage if the commit fails. In subsequent write operations, the write client first rolls back failed commits, identifies the data files written in those commits by tokens, and deletes them. Next, we will delve into the existing labeling mechanism, explain its performance issues, and demonstrate a new timeline server-based labeling mechanism to solve the problem.

The timeline server-based tagging mechanism is mainly described here, which optimizes the latency associated with storing tags. The timeline server in Hudi is used to provide a file system and timeline view. As shown in the following figure, the new timeline server-based tagging mechanism delegates tag creation and other tag-related operations from individual executors to the timeline server for centralized processing. The timeline server maintains the tags created in memory for the corresponding tag requests, and the timeline server achieves consistency by periodically flushing the memory tags to a limited number of underlying files in storage. In this way, even with a large number of data files, the number of actual file operations and delays related to the markup can be significantly reduced, thereby improving write performance.

To improve the efficiency of handling tag creation requests, we designed to batch process tag requests on a timeline server. Each token creation request is processed asynchronously in the Javalin timeline server and queued before processing. For each batch interval, such as 20 milliseconds, the dispatch thread pulls pending requests from the queue and sends them to the worker thread for processing. Each worker thread processes the token creation request and stores the token’s underlying file by overriding it. There are multiple worker threads running concurrently, and considering that the file overwrites longer than the batch time, each worker thread writes an exclusive file that is not touched by other threads to guarantee consistency and correctness. Both the batch interval and the number of worker threads can be configured through the write options.

Note that the worker thread always checks whether the tag has been created by comparing the tag name in the request to the memory copy of all the tags maintained on the timeline server. The underlying file that stores the tag is read only when the first tag request (lazy loading) is made. The response to the request is returned only after the new tag is flushed to the file, so that in the event of a timeline server failure, the timeline server can recover the tag that has been created. These ensure consistency between storage and in-memory replicas and improve performance in handling tag requests.

Hudi has implemented a timeline-based OCC (Optimistic Concurrency Control) to ensure consistency, integrity, and correctness across multiple writes of data. However, the associated conflict detection is before the metadata is submitted and after the data is written. If any conflicts are detected, it will result in a waste of cluster resources because the computation and write have already completed. To address this problem, this RFC[17] proposes an early collision detection mechanism based on the existing Hudi labeling mechanism. There are some subtle differences in the early conflict detection workflow between the different types of tag maintainers:

• For direct tags, hoodie lists the necessary tag files directly and performs conflict checks before writers create tags and before starting to write to the corresponding data files.

• For timeline server-based tags, hoodie simply gets the results of the tag violation check before writers create the tag and before they start writing to the corresponding data file. Conflicts are checked asynchronously and periodically so that write conflicts are detected as early as possible. Both writers can still write to the same file slice’s data file until a conflict is detected in the next round of checks.

What’s more, Hoodie can stop writes early because early conflict detection can release resources to the cluster and improve resource utilization.

Data lake transactions and multi-writers are becoming key features of building Lakehouse today. To quote Lakehouse Concurrency Control: Are We Overly Optimistic? [18]

“Hudi implements a file-level, log-based concurrency control protocol on the Hudi timeline, which in turn relies on a minimal atomic placement of cloud storage. By building the event log as a core part of interprocess coordination, Hudi is able to provide flexible deployment models that provide higher concurrency than a pure OCC approach that only tracks table snapshots. ”

In the multi-writer scenario, Hudi’s existing conflict detection occurs after the writer has written the data and before the metadata is submitted. That is, although all the calculations and data writes have been completed, the writer only detects the occurrence of the conflict when commit starts, which causes a waste of resources. For example, there are now two write jobs: job1 will write 10M of data to the Hudi table, including updating filegroup 1. Another job2 will write 100G to the Hudi table and will also update the same filegroup 1. Job1 completes successfully and is submitted to Hudi. After a few hours, job2 finished writing the data file (100G) and began submitting metadata. At this time, it was found that there was a conflict with job1, and after job2 failed, it had to be aborted and rerun. Obviously, a lot of computing resources and time are wasted on job2.

Hudi currently has two important mechanisms, the marker mechanism and the heartbeat mechanism:

1. The tagging mechanism keeps track of all files that are part of the active write.

2. Heartbeat mechanism that can track all active writers to a Hudi table.

Based on the labeling mechanism and the heartbeat mechanism, this RFC proposes a new type of conflict detection: Early Conflict Detection. Before the writer creates the marker and starts writing to the file, Hudi performs this new conflict detection, trying to detect the write conflict directly or get the timeline-Based result as early as possible and abort the writer when the conflict occurs, so that we can release resources as quickly as possible and improve resource utilization.

This is the workflow for early conflict detection, as shown in Figure 1. As we can see, we can use this early collision detection feature when both supportsOptimisticConcurrencyControl and isEarlyConflictDetectionEnable are true. Otherwise, we skip this check and create the tag directly.

Traditional data lakes don’t do a good job of transactional data writes, but as more business-critical processes move to data lakes, things are changing, and we need a mechanism to atomically publish a batch of data, that is, only valid data, and partial failures must be rolled back without damaging existing data sets. At the same time, the results of the query must be repeatable, the query side cannot see any partially extracted data, and any submitted data must be written reliably. Hudi offers powerful ACID capabilities. An efficient rollback mechanism ensures data consistency and avoids the persistence and generation of “orphan files” or intermediate state data files.

Apache Hudi’s Payload is an extensible data processing mechanism, through different Payloads we can achieve customized data writing methods for complex scenarios, greatly increasing the flexibility of data processing. Hudi Payload’s utility class for deduplication, filtering, merging, etc. of data when writing to and reading Hudi tables, specifies the Payload class we need to use by using the parameter “hoodie.datasource.write.payload.class”. In this article, we will delve into the mechanism of Hudi Payload and the differences and usage scenarios of different Payloads.

When the data is written, the existing whole row insertion, the whole row coverage method can not meet all the scene requirements, the written data will also have some customized processing needs, so there is a need for a more flexible writing method and a certain processing of the written data, Hudi provides a playload method can solve the problem very well, for example, can solve the problem of data deduplication when writing, update for some fields and so on.

When writing to the Hudi table, you need to specify a parameter hoodie.datasource.write.precombine.field, this field is also known as Precombine Key, Hudi Payload is based on this specified field to process data, it builds each piece of data into a Payload, so the comparison between the data becomes the comparison between Payloads. You only need to implement the comparison method of Payload according to your business needs to realize the processing of data. All Hudi Payloads implement the HoodieRecordPayload interface, and all the preset Payload classes that implement the interface are listed below.

The following figure lists the methods that need to be implemented by the HoodieRecordPayload interface, here are two important methods preCombine and combineAndGetUpdateValue, and below we analyze these two methods.

As you can see from the following figure, the method compares the current data and oldValue, and then returns a record.

It can also be known from the annotated description of the preCombine method that it is first used for data deduplication when multiple pieces of data with the same primary key are written to Hudi at the same time. The call location

In fact, this method has another call, that is, the data of the same primary key in the Log file is processed when the MOR table is read. If the same piece of data is modified multiple times and written to the Log file of the MOR table, preCombine is also performed on read.

This method compares currentValue (i.e. data in an existing parquet file) with new data to determine whether it is necessary to persist the new data.

Due to the difference in the read-write principle of the COW table and the MOR table, the call to combineAndGetUpdateValue also differs in COW and MOR:

• At COW write, the newly written data is compared to the currentValue stored in the Hudi table, returning the data that needs to be persisted

• On MOR read, the data in the preComine-processed log is compared to the data in the Parquet file, returning the data that needs to be persisted

The internal Hudi version supports the scenario of Flink multi-writer concurrent writing based on file lock and OCC mechanism.

Hudi supports inline compation and clean within the job, which can merge small files and clean up in a timely manner, thus avoiding small file problems. Of course, you can also turn off inline compaction with the parameter, and hudi provides offline compaction and clean in both spark/flink.

Next, we introduce the core process of Snapshot Query in a multi-stream stitching scenario, that is, first de-merge the LogFile and then merge the data in BaseFile and the deduplicated LogFile. The following diagram shows the entire data merging process, which can be split into the following two processes:

• Merge LogFile

Hudi’s existing logic is to read out the data in the LogFile and store it in the Map, for each Record in the Logfile, if the Key does not exist in the Map, it is directly put into the Map, and if the Key already exists in the Map, an update operation is required.

In multi-stream splicing, because there is data written by different data streams in the LogFile, that is, the columns of each data may be different, it is necessary to determine whether two Records with the same Key are from the same stream when updating. As shown in Figure 3, when reading that the primary key in LogFile2 is the Record of key1, the Record corresponding to key1 already exists in the Map, but the two Records are from different streams, and you need to stitch together to form a new Record (key1, b0_new, c0_new, d0_new) into the Map.

• Merge BaseFile and LogFile

Hudi’s existing default logic is to see if a Record with the same key exists in the Map for each Record that exists in BaseFile and, if so, overwrite the Record in BaseFile with Record in the Map. In multi-stream stitching, the Record in the Map does not completely overwrite the Record in BaseFile and may only update the values of some of the columns, that is, the columns corresponding to the Record in the Map.

As shown in the following figure, taking the simplest override logic as an example, when reading the Record where the primary key in BaseFile is key1, and it is found that key1 already exists in the Map and the corresponding Record has the value of BCD three columns, update the BCD column in BaseFile to get a new Record (key1, b0_new, c0_new, d0_new, e0), note that the E column has not been updated, so keep the original value e0. For new Keys such as Record for Key3, you need to fill the three BCE columns with the default values to form a complete Record.

The principle of the implementation is basically to implement the merge logic of different source data of the same key through a custom Payload class, the writer will do the merging of multiple sources within the batch and write to the log, and the reader will also call the same logic when merging when reading to handle the cross-batch situation.

What needs to be noted here is the problem of out-of-order and late events. If left unprocessed, it often results in old data overwriting new data or incomplete column updates downstream.

For out-of-order and late-arriving data, we have enhanced the Multiple ordering value in Hudi to ensure that each source can only update the data that belongs to its own part of the column, and can ensure that only new data will overwrite the old data according to the event time (ordering value) column set. Finally, lock less multiple writers are combined to achieve multi-job multi-source concurrent writes.

A snapshot version based on 0.12.0-1-tencent was issued for this feature


• Source Table A

• Target table

A stream data is written


• Source Table B

• Target table

B-stream data is written

• Create source table A, source table B and create table ddl in 5.2

• A stream, B stream data write the same as in 5.2 insert write

Note: 1.Compare whether you want to update the data according to the precombine key, suitable for real-time lake entry and lake entry order is out of order 2.If the time field values in the user’s original table are the same and cannot be compared, they will be stitched and merged according to the FIFO order.

Note: Presto pending update

Finally, based on the Hudi multi-stream stitching solution, in addition to solving the problem in the background, the DWS layer of the real-time data warehouse is landed, and the single table supports the concurrent import of 3+ data streams, covering hundreds of terabytes of data. In addition, when querying wide-table data using Spark, because the data has been de-compressed and stitched into a large wide-table, in queries with a single scan volume of tens of terabytes, the performance performance is improved by more than 200% compared with the direct use of multi-table association, and in some more complex queries, there is also a performance improvement of 40-140%.

• Further improve the ease of use of the Hudi multi-stream stitching scheme, reduce parameter configuration, and then do some column insertion and update of SQL syntax support and parameter convergence.

• Use the payload mechanism to implement Flink left Join, right join, TopN and other functions.

• Push the multi writer feature back into the community.

Recommended reading

ByteDance builds real-time data bins based on Apache Hudi

HUAWEI CLOUD MRS is based on Apache Hudi’s Extreme Query Optimization Exploration Practice

Lake warehouse technology based on Apache Hudi in practice at Shopee

ByteDance is based on Apache Doris + Hudi’s Lake Warehouse Analysis Exploration Practice

Build an end-to-end open source modern data platform

• Apache Hudi’s flexible payload mechanism[19]

• Snapshot Isolation using Optimistic Concurrency Control for multi-writers[20]



[1] Oracle:[2] Voldemort:[3] ksqlDB:[4] mechanism:[5] key constraints:[6] Index:[7] Enforces:[8] Enforced: #supported-constraint-types[9] optimistic concurrency control:[10] Concurrency control method:[11] Timeline:[12] Uber:[13] Markup mechanism: [14] Early Stop:[15] Proven: #functionality-support[16] unique key constraint:[17] This RFC:[18] Lakehouse Concurrency Control: Are We Overly Optimistic? :[19] Apache Hudi’s flexible payload mechanism:[20]. ] Snapshot Isolation using Optimistic Concurrency Control for multi-writers: