Abstract: This article is compiled by community volunteer Chen Zhengyu and sourced from “A Detailed Explanation of Flink-CDC” shared by Alibaba Senior Development Engineer Xu Bangjiang (Xuejian) at Flink Meetup in Beijing on July 10. It explains the core features brought by the latest release of Flink CDC version 2.0.0, including major improvements such as concurrent reading of full data, checkpoint, and lock-free reading.

GitHub Project Address

:

https://github.com/ververica/flink-cdc-connectors

Tips: Click Read the original article “Learn more about the features of Flink CDC 2.0.0~

I. CDC Overview

    data

  • distribution: one data source is distributed to multiple downstream systems;
  • Data acquisition: ETL data integration for data warehouses/data lakes is a very important data source.
CDC has many technical solutions, and the current mainstream implementation mechanism in the industry can be divided into two types:

    • Offline scheduling query jobs, batch processing. Synchronize a table to other systems, and obtain the latest data in the table through query each time;
    • Data consistency cannot be guaranteed, and the data may have changed many times during the check;
    • Real-time performance is not guaranteed, and there is a natural delay based on offline scheduling.
    • Real-time consumption logs, stream processing, such as MySQL’s binlog log log completely records the changes in the database, you can use the binlog file as the data source of the stream;
    • Ensure data consistency, because the binlog file contains all historical change details;
    • Real-time performance is guaranteed because binlog-like log files can be streamed and consumed, providing real-time data.
Comparing common open source CDC scenarios, we can find:

      based

    • on the log, which can achieve incremental synchronization well;
    • Incremental synchronization is difficult to achieve in the query-based approach.
  • Compared with full synchronization capabilities, CDC solutions based on queries or logs are basically supported, except for Canal.
  • Compared with the ability of full + incremental synchronization, only Flink CDC, Debezium, and Oracle Goldengate support better.
  • From the perspective of architecture, the

  • table divides the architecture into stand-alone and distributed, and the distributed architecture here is not only reflected in the horizontal expansion of data reading capabilities, but more importantly, the access capabilities of distributed systems in big data scenarios. For example, when Flink CDC’s data enters the lake or warehouse, the downstream is usually distributed systems, such as Hive, HDFS, Iceberg, Hudi, etc., then from the perspective of the ability to access distributed systems, the architecture of Flink CDC can access such systems well.
  • In terms of data conversion / data cleaning capabilities, when the data enters the CDC tool, can it be more convenient to filter or clean the data, or even aggregate it?
    • is quite simple to operate on Flink CDC, and this data can be manipulated through Flink SQL;
    • However, things like DataX, Debezium, etc. need to be done through scripts or templates, so the threshold for users will be relatively high.
  • In addition, in terms of ecology, this refers to the support of some downstream databases or data sources. Flink CDC has rich connectors downstream, such as writing to some common systems such as TiDB, MySQL, Pg, HBase, Kafka, ClickHouse, etc., and also supports various custom connectors.

The Flink CDC project

At this point, let’s review the motivation for developing the Flink CDC project.

1. Dynamic Table & ChangeLog

Stream

Everyone knows that Flink has two basic concepts: Dynamic Table and Changelog Stream.

    Dynamic

  • Table is a dynamic table defined by Flink SQL, and the concepts of dynamic tables and streams are equivalent. Referring to the figure above, streams can be converted to dynamic tables, and dynamic tables can also be converted to streams.
  • In Flink SQL, data flows from one operator to another in the form of a Changelog Stream, and the Changelog Stream at any time can be translated as a table or a stream.
Associate the table and binlog log in MySQL, you will find that all changes to a table in the MySQL database are recorded in the binlog log, if the table is updated all the time, the binlog log stream will always be appended, and the table in the database is equivalent to the result of the materialization of the binlog log stream at a certain point in time; The log stream is the result of continuously capturing the change data of a table. This shows that Flink SQL’s Dynamic Table is a very natural representation of an ever-changing MySQL database table.

Based on this, we investigated some CDC technologies and finally chose Debezium as the underlying collection tool for Flink CDC. Debezium supports full synchronization, delta synchronization, and full + incremental synchronization, which is very flexible, while journal-based CDC technology makes it possible to provide Exactly-Once.
Comparing the internal data structures of Flink SQL, RowData, and Debezium, we can see that the two are very similar.

  • Each RowData has a metadata RowKind, including 4 types, namely INSERT, pre-update mirror (UPDATE_BEFORE), post-update mirror (UPDATE_AFTER), delete (DELETE), these four types are consistent with the binlog concept in the database.
  • The data structure of Debezium also has a similar metadata op field, and there are four values for op fields, namely c, u, d, r, each corresponding to create, update, delete, read. For u, which represents an update operation, its data section contains both a front mirror (before) and a back mirror (after).

By analyzing the two data structures, the underlying data of Flink and Debezium can be easily connected, and you can find that Flink is technically very suitable for CDC.

2. Traditional CDC ETL Analysis


Let’s look at the traditional CDC’s ETL analyzes the link as shown in the following figure:

In the traditional CDC-based ETL analysis, data collection tools are necessary, foreign users commonly use Debezium, domestic users often use Ali open source Canal, collection tools are responsible for collecting incremental data in the database, and some collection tools also support synchronizing full data. The collected data is generally output to message middleware such as Kafka, and then the Flink computing engine consumes this part of the data and writes it to the destination, which can be various DBs, data lakes, real-time data warehouses and offline data warehouses.

Note that Flink provides a changelog-json format, which can write changelog data to offline data warehouses such as Hive / HDFS; For real-time data warehouses, Flink supports writing changelog directly to Kafka via the upsert-kafka connector.

We have been thinking about whether we can use Flink CDC to replace the collection components and message queues in the dotted box in the diagram above, simplifying the analysis link and reducing maintenance costs. At the same time, fewer components mean that data timeliness can be further improved. The answer is yes, and this leads to our ETL analysis process based on Flink CDC.

3. ETL analysis based on Flink

CDC

After using Flink CDC, in addition to fewer components and easier maintenance, another advantage is through Flink SQL Greatly reduce the threshold for user use, you can see the following example:

In this example, you use Flink CDC to synchronize database data and write it to TiDB, and you can directly use Flink SQL to create MySQL-CDC tables for products and orders, and then JOIN the data stream and write it directly to the downstream database after processing. CDC data analysis, processing, and synchronization are completed through a Flink SQL job.
You will find that this is a pure SQL job, which means that as long as you know SQL BI, line-of-business students can complete this kind of work. At the same time, users can also use the rich syntax provided by Flink SQL for data cleaning, analysis, and aggregation.

And these capabilities, for existing CDC solutions, to carry out data cleaning, analysis and aggregation is very difficult.

In addition, using Flink SQL dual-stream JOIN, dimension table JOIN, and UDTF syntax, data widening and various business logic processing can be completed very easily.

4. Flink CDC project development

  • the first commit submitted by Yunxie in July 2020, which is a project incubated based on personal interests;
  • MySQL-CDC was supported in mid-July 2020;
  • Postgres-CDC was supported in late July 2020;
  • In a year, the project has more than 800 stars on GitHub.

Flink CDC 2.0 in detail

1. Flink CDC Pain Points

MySQL CDC

is the most used and important connector in Flink CDC, and the following sections of this article describe the Flink CDC Connector as MySQL CDC Connector.

With the development of the Flink CDC project, there has been a lot of feedback from users in the community, which is mainly summarized into three:

    The

  • process of full + incremental read needs to ensure the consistency of all data, so it needs to be guaranteed by locking, but locking is a very high-risk operation at the database level. When the underlying Debezium ensures data consistency, it needs to lock the read database or table, global locking may cause the database to be locked, table-level locks will lock the read of the table, and the DBA generally does not give lock authority.
  • Horizontal scaling is not supported because the underlying Flink CDC is based on Debezium, and the architecture is a single node, so Flink CDC only supports single concurrency. In the full-volume read stage, if the table is very large (hundreds of millions) and the read time is at the level of hours or even days, users cannot increase the operation speed by increasing resources.
  • Full read phase does not support checkpoint:

  • CDC read is divided into two phases, full read and incremental read, currently the full read phase does not support checkpoint, so there will be a problem: when we synchronize the full data, let’s say it takes 5 hours, when we synchronize 4 hours when the job fails, then we need to start again, Read for another 5 hours.

2. Debezium Lock Analysis

The bottom layer of Flink CDC encapsulates Debezium, and Debezium synchronizes a table in two stages:
    >

    Full stage: query all records in the current table;
  • Incremental phase: Consume change data from binlog.

Most of the scenarios used by users are full + incremental synchronization, and locking occurs in the full stage, in order to determine the initial site of the full stage and ensure that the increment + full realization is not more, one is not much, so as to ensure data consistency. From the following figure, we can analyze some locking processes for global locks and table locks, the red line on the left is the lifetime of the lock, and the right is the life cycle of MySQL to open a repeatable read transaction.

In the case of global locks, a lock is acquired first, and then repeatable transactions are opened. Here the lock operation is to read the starting position of binlog and the schema of the current table. The purpose of this is to ensure that the starting position of binlog and the current schema read can be corresponding, because the schema of the table will change, such as deleting columns or adding columns. After reading these two information, SnapshotReader will read the full data in the repeatable read transaction, and after the full data reading is completed, it will start BinlogReader to read incrementally from the starting position of the binlog read, so as to ensure the seamless connection of full data + incremental data.

Table locks are a degraded version of global locks because the permissions of global locks are higher, so in some scenarios, users only have table locks. Table locks take longer to lock, because table locks have a characteristic: locks release repeatable reads in advance and are committed by default, so locks need to wait until the full data is read before they can be released.

After the above analysis, let’s see what serious consequences these locks will cause:

Flink CDC 1.x can be unlocked, which can meet most scenarios at the expense of some data accuracy. Flink CDC 1.x adds global locks by default, which can ensure data consistency, but there is a risk of the above-mentioned data hanging.

3. Flink CDC 2.0 design (taking MySQL as an example)

Through the above

analysis, we can know the design scheme of 2.0, the core to solve the above three problems, namely support for no lock, horizontal expansion, checkpoint.

The lock-free algorithm described in the DBlog paper is shown in the following figure:

On the left is the description of Chunk’s sharding algorithm, which is actually similar to the principle of sharding and sharding tables in many databases, sharding the data in the table through the primary key of the table. Assuming that the step size of each chunk is 10, according to this rule, you only need to make the intervals of these chunks into left-open-right-closed or left-closed right-open intervals to ensure that the connected interval can be equal to the primary key interval of the table.

On the right is the description of the lock-free read algorithm for each chunk

, the core idea of the algorithm is to complete the consistent merging without locks for the full read and incremental read of each chunk after dividing the chunk. The sharding of Chunk is shown in the following figure:

Because each chunk is only responsible for the data within its own primary key range, it is not difficult to deduce that as long as the consistency of each chunk read can be guaranteed, the consistency of the entire table reading can be guaranteed, which is the basic principle of lock-free algorithm.

In Netflix’s DBLog paper, the chunk reading algorithm is to maintain a signal table in the DB, and then dot in the binlog file through the signal table, record the low position before each chunk reading and the high position after the end of the reading, and query the full data of the chunk between the low and high points. After reading out this part of the chunk’s data, the binlog delta data between the two sites is combined into the full data to which chunk belongs, so as to obtain the full data corresponding to the chunk at the high site.

Flink CDC combined its own situation, in the chunk reading algorithm to make the improvement of the signal table,

no need to maintain the signal table, by directly reading the binlog site to replace the function of marking in the binlog, the overall chunk reading algorithm description is shown in the following figure:

For example, if Chunk-1 is being read, the interval of Chunk is [K1, K10], first select the data in the interval directly and store it in the buffer, record a site of binlog (low site) before select, and record a site (high site) of binlog after select. Then start the incremental part, consuming binlogs from low to high sites.

    the – ( k2,100 )

  • + ( k2,108 ) record indicates that the value of this data is updated from 100 to 108;
  • The second record is to delete k3;
  • The third record is to update k2 to 119; the
  • fourth record is to change the data of k5 from 77 to 100.
Looking at the final output in the lower right corner of the picture, you will find that when consuming the binlog of the chunk, the keys that appear are k2, k3, and k5, and we go to the buffer to mark these keys.
  • For k1, k4, k6, k7, these records have not changed after the high site is read, so these data can be directly output;
  • For changed data, you need to merge the incremental data into the full amount of data, and only retain the final data after the merger. For example, if the final result of k2 is 119, then only +(k2,119) needs to be output, and no data that has changed in the middle is required.
In this way, the final output of Chunk is the latest data in chunk at the high site.

The above figure depicts the consistent read of a single chunk, but if there are multiple tables divided into many different chunks, and these chunks are distributed in different tasks, how to distribute chunk and ensure globally consistent reads?

This is based on FLIP-27 to implement elegantly, through the figure below you can see the component of SourceEnumerator, this component is mainly used for the division of Chunk, the divided Chunk will be provided to the downstream SourceReader to read, by distributing chunk to different SourceReaders to achieve the process of concurrently reading Snapshot Chunk, and based on FLIP-27 We can easily implement chunk-granular checkpoints.

When the Snapshot Chunk is read, there needs to be

a reporting process, such as the orange report information in the figure below, and the Snapshot Chunk completion information needs to be reported to SourceEnumerator.

The main purpose of the debriefing is to subsequently distribute the binlog chunk (as shown below). Because Flink CDC supports full + incremental synchronization, when all Snapshot Chunk reads are completed, incremental binlogs need to be consumed, which is achieved by sending a binlog chunk to any Source Reader for single concurrent read.

For most users, there is no need to pay too much attention to the details of how to lock free algorithms and sharding, just understand the overall process.

The overall process can be summarized as follows, first Snapshot Chunk division of the table through the primary key, and then distribute Snapshot Chunk to multiple SourceReaders, each Snapshot Chunk reads through the algorithm to achieve consistent reading under lock-free conditions, SourceReader supports chunk granularity checkpoint when reading, and sends one after all Snapshot Chunk reads binlog chunk performs the binlog reading of the incremental part, which is the overall flow of Flink CDC 2.0, as shown in the following figure:

Flink CDC is a completely open source project, all the design and source code of the project have been contributed to the open source community, and Flink CDC 2.0 has been officially released, this time the core improvements and improvements include:
    > MySQL CDC 2.0 is provided, and the core feature includes

    concurrent reading

    In order to provide better documentation support, the Flink CDC community has built a documentation website that supports versioning of documents:

    document site supports keyword search , very practical:

    4. Future planning

    Regarding the future planning of the CDC project, we hope to focus on three aspects: stability, advanced feature and ecological integration.

    • stability
      • By attracting more developers through the community, the company’s open source power enhances the maturity of Flink CDC;
      • Support for Lazy Assigning. Lazy Assigning’s idea is to divide chunks into batches first, rather than all at once. For example, if there are 10,000 chunks, you can divide 1,000 chunks first, instead of dividing them all at once, and continue to divide them after SourceReader has read 1,000 chunks to save time for dividing chunks.
    • Advanced feature
      • > Support for Schema Evolution. This scenario is: when a field is suddenly added to the table during database synchronization, and it is hoped that this field can be automatically added when the downstream system is subsequently synchronized;
      • Support Watermark Pushdown to obtain some heartbeat information through CDC’s binlog, and these heartbeat information can be used as a watermark, through which you can know some progress of the current consumption of this stream;
      • Support META data, in

      • the scenario of database sharding and table sharding, it may be necessary to know which database and which table the data source from, and there can be more flexible operations in the downstream system entering the lake and warehousing;
      • Whole database synchronization: Users can synchronize the entire database with just one line of SQL syntax, rather than defining a DDL and query per table.
    • Ecological integration
      • > Integrate more upstream databases, such as Oracle, MS SqlServer. Cloudera is currently actively contributing to the oracle-cdc connector;

        At the lake

      • level, Hudi and Iceberg have certain room for optimization in writing, for example, when high QPS enters the lake, the data distribution has a relatively large performance impact, which can be further optimized through integration and integration with the ecosystem.
    Finally, welcome to join the Flink CDC user group.

    < img src="https://mmbiz.qpic.cn/mmbiz_jpg/8AsYBicEePu6uLztUR7sDKgicQxZkCnbouVGmrKcGia5Btic7fiaViayrXFMHmrib5sIFQgqqEq0R8P9uGjIIEOM61zNA/640?wx_fmt=jpeg" >

    appendix

    [1] Flink-CDC Project Address:
    https://github.com/ververica/flink-cdc-connectors
    [2] Flink-CDC documentation site

    :

    https://ververica.github.io/flink-cdc-connectors/master/[3] Percona – MySQL Global Lock Time Analysis:
    https://www.percona.com/blog/2014/03/11/introducing-backup-locks-percona-server-2/[4] DBLog – Lock-Free Algorithm Paper:
    https://arxiv.org/pdf/2010.12597v1.pdf
    [5] Flink FLIP-27 design document:
    https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface