pictures and texts to explain CDC technology in detail, just read this one!


  • is guaranteed by locking <

  • ul class=”list-paddingleft-2″

  • > Debezium When ensuring data consistency, Need to lock the read database or table, global lock may cause the database to hang

  • , table-level lock will lock the table read

  • does not support horizontal scaling

    • Flink CDC currently only supports single concurrency, in the full stage of the read stage, if the table is very large (hundreds of millions level), the read time is in hours The level

    full read phase does not support checkpoint

    • CDC read is divided into two phases, full read and incremental read, full read phase does not support checkpoint, and needs to be re-read after failing

    Flink CDC 2.0 is designed to solve the above pain points, borrowing the lock-free algorithm of Netflix DBLog and implementing it based on FLIP-27 to achieve the following goals:


    Let’s take a look at FLIP-27 before introducing Flink CDC 2.0. FLIP-27 aims to address several pain points in SourceFunction:

    • split work discovery logic and actual data reading logic are coupled in SourceFunction and In the DataStream

    • interface, concepts such as

    • batching and stream processing requiring

    • different source partitions/shards/splits to implement different source

    • partitions/shards/splits are not defined in the interface, making it difficult to implement event time alignment, Partitioning watermarks, dynamic split allocations, work stealing, and other features

    • Checkpoint locks are occupied by the source function, which will cause a series of problems, making the framework difficult to optimize

    • There is no common framework, which means that each source implements a complex threading model, making it more difficult to implement and test new sources

    FLIP-27’s Source API consists of two components:

    • SplitEnumerator The

    • two components that are responsible for discovering and discovering splits, running in JobManager

    • SourceReader is responsible for reading the actual data of split, and running in TaskManager

    encapsulate the core functionality, making the Source interface itself just for creating Factory classes for SplitEnumerator and SourceReader. The following diagram shows the topological relationship between SplitEnumerator and SourceReader.

    The source connector implemented under the new architecture can achieve batch stream integration, the only difference is that the batch SplitEnumerator will produce a fixed number of split sets and each split is a finite data set; For stream processing, SplitEnumerator either produces an infinite number of splits or splits are themselves infinite data sets.

    SPLITEnumerator and SourceReader require a specific implementation class provided by the user, and FLIP-27 introduces a common messaging mechanism between these two components to communicate through the pass-through SourceEvent interface, as shown in the following figure

    SourceCoordinato r and SourceOperator, as the OperatorCoordinator and Operator's implementation of FLIP-27 in the figure above, have the following class structure (the Failover related structure is ignored in the figure):

    can be seen  SourceCoordinator encapsulates SplitEnumerator, and SourceOperator encapsulates SourceReader, and its interface is very simple when the Failover related structure is not considered, and the Flink framework calls The SourceCoordinator#start function creates SplitEnumerator and starts it, calling SourceOperator#open to create, register, and start SourceReader, the sequence diagram is as follows:

    SourceOperator#emitNext calls the SourceReader#pollNext interface to pass the read data downstream.

    The lowest level interface is designed to be very generic, which makes it very flexible, but also makes it difficult to implement complex readers. Therefore, FLIP-27 proposes to provide a simpler interface to allow blocking calls by building higher-level abstractions. SourceReaderBase, as an abstract implementation of SourceReader, provides a synchronization mechanism between the main mail box thread and the internal reading thread, and the user only needs to focus on:

      > Fetching records from an external system

      (by implementing the SplitReader interface),

    • performing record parsing and transformation (by implementing the RecordEmitter interface),

    • extracting timestamps and choosing whether to process watermarks

    is shown below SourceReaderBase's workflow:

    class=”rich_pages wxw-img” src=””>

    Flink CDC 2.0 full incremental read implementation principle


    provides a mechanism for parallel reading of multiple operators at the framework level, let’s take a look at how Flink CDC 2.0 combines FLIP-27 and DBLog lock-free algorithms to seamlessly convert to single-threaded reading incremental data after concurrent reading of full data.

    The table structure read at the source must have a physical primary key to partition the table, and ChunkSplitter can evenly divide the table into (max – min)/chunkSize split (min/max refers to the minimum and maximum values of the primary key) according to the primary key, or use the limit query to ensure that there are chunkSize data in a split (the number of last split data records < = chunkSize).

    DBLog’s proposed algorithm is called Watermark-based Chunk Selection , by maintaining a single-row, single-column table in the source database as an aid, updating the record before and after querying each chunk (that is, split) data so that two events lw (low water) and hw (high water) are produced in the transaction log, and then the log between the select data and [lw, hw] is processed to obtain a set of data with a point-in-time of hw for the chunk. Unlike DBLog, Flink CDC 2.0 does not maintain additional tables, but uses SHOW MASTER STATUS to obtain binlog offsets before and after the select data, which avoids intrusion into the source system.






    3. hw, insert the queue

    4. to determine whether there is an incremental

    5. change between lw and hw

    6. If there is no change, insert a BINLOG_END record

    7. into the queue

    8. otherwise read the binlog between [lw, hw] and insert it into the queue, the last record is BINLOG_END


    final BinlogOffset lowWatermark = currentBinlogOffset( jdbcConnection); 
    LOG. info(
    "Snapshot step 1 - Determining low watermark {} for split {}",
    snapshotSplit );

    ((SnapshotSplitReader. SnapshotSplitChangeEventSourceContextImpl) (context))

    . setLowWatermark(lowWatermark);
    signalEventDispatcher. dispatchWatermarkEvent(
    snapshotSplit, lowWatermark, SignalEventDispatcher. WatermarkKind. LOW);

    LOG. info("Snapshot step 2 - Snapshotting data");

    createDataEvents(ctx, snapshotSplit. getTableId());

    final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);

    LOG. info(
    "Snapshot step 3 - Determining high watermark {} for split {}",
    snapshotSplit );
    signalEventDispatcher. dispatchWatermarkEvent(
    snapshotSplit, highWatermark, SignalEventDispatcher. WatermarkKind. HIGH);
    ((SnapshotSplitReader. SnapshotSplitChangeEventSourceContextImpl) (context))
    . setHighWatermark(highWatermark);

    Data in the queue after the above code is executed:

    class=”rich_pages wxw-img” src=””>

    then use to read between lw and hw binlog, the data read append to the same queue, the data in the queue is:

    class=”rich_pages wxw-img” src=””>

    and then correct the data in the queue, Get the data for the shard point-in-time hw.

    RecordUtils#normalizedSplitRecords separates

    snapshot records and

     binlog records based on high and low water 
    levelsSourceRecords lowWatermark = sourceRecords. get(0);
    SourceRecord highWatermark = null;
    int i = 1;
    for (; i < sourceRecords. size(); i++) {
    SourceRecord sourceRecord = sourceRecords. get(i);
    if (! isHighWatermarkEvent(sourceRecord)) {
    snapshotRecords. put((Struct) sourceRecord. key(), sourceRecord);
    } else {
    highWatermark = sourceRecord;
    break; }


    binlog records contain logs that are not of this shard, you need to filter out the logs belonging to this shard

    if (i < sourceRecords). . size() - 1) {
    List<SourceRecord> allBinlogRecords =
    sourceRecords. subList(i, sourceRecords. size() - 1);
    for (SourceRecord binlog : allBinlogRecords) {
    if ( isDataChangeRecord(binlog)) {
    Object[] key =
    getSplitKey(snapshotSplit . getSplitKeyType(), binlog, nameAdjuster);
    if (splitKeyRangeContains(
    key, snapshotSplit. getSplitStart(), snapshotSplit. getSplitEnd())) {
    binlogRecords. add(binlog); } }

    } }


    Upsert logic from Snapshot Records and Binlog Records to obtain data with point-in-time hw

    normalizedRecords =

    upsertBinlog logic:

    1. add lw to normalizedBinlogRecords

    2. to iterate over the records in binlogRecords

    3. For delete records, delete them from snapshotRecords For update records, put After in the record as a READ record map.put into snapshotRecords

    4. For creating a record, use map.put into snapshotRecords

    5. Add snapshotRecords.values

    6. to normalizedBinlogRecords Add hw to normalizedBinlogRecords

    7. returns normalizedBinlogRecords


    private static List<SourceRecord>  upsertBinlog(
    MySqlSplit split,
    SourceRecord lowWatermarkEvent,
    SourceRecord highWatermarkEvent,
    Map<Struct, SourceRecord> snapshotRecords ,
    List<SourceRecord> binlogRecords) {
    final List <SourceRecord> normalizedBinlogRecords = new ArrayList<>();
    normalizedBinlogRecords. add(lowWatermarkEvent);
    // upsert binlog events to snapshot events of split
    if (! binlogRecords. isEmpty()) {
    for (SourceRecord binlog : binlogRecords) {
    Struct key = (Struct) binlog. key();
    Struct value = (Struct) binlog. value();
    if (value != null) {
    Envelope. Operation operation =
    Envelope. Operation. forCode(
    value. getString(Envelope. FieldName. OPERATION));
    switch (operation) {
    case UPDATE:
    Envelope envelope = Envelope. fromSchema(binlog. valueSchema());
    Struct source = value. getStruct(Envelope. FieldName. SOURCE);
    Struct updateAfter = value. getStruct(Envelope. FieldName. AFTER);
    Instant ts =
    Instant. ofEpochMilli(
    (Long) source. get(Envelope. FieldName. TIMESTAMP));
    SourceRecord record =
    new SourceRecord(
    binlog. sourcePartition(),
    binlog. sourceOffset(),
    binlog. topic(),
    binlog. kafkaPartition(),
    binlog. keySchema(),
    binlog. key(),
    binlog. valueSchema(),
    envelope. read(updateAfter, source, ts));
    snapshotRecords. put(key, record);
    case DELETE:
    snapshotRecords. remove(key);
    case CREATE:
    snapshotRecords. put(key, binlog);
    case READ:
    throw new IllegalStateException(
    String. format(
    "Binlog record shouldn't use READ operation, the the record is %s." ,
    normalizedBinlogRecords. addAll(snapshotRecords. values());
    normalizedBinlogRecords. add(highWatermarkEvent);
    return normalizedBinlogRecords;

    above process is executed concurrently on multiple SourceReaders without affecting each other. Suppose that the three tables t1/t2/t3 synchronized by a task are divided into 6 shards, and due to concurrent execution, one of the possible positions of its high and low water levels on the binlog is shown in the following figure:

    It can be seen that t1.split1 and t2.split1 read binlog ranges with overlap (both read binlogs between [lw2.1, hw1.1]), and t3.split2 can be executed earlier than t3.split1. These crossovers or out-of-order do not affect correctness, because MySqlSourceReader will report each split hw to MySqlSourceEnumerator via the FinishedSnapshotSplitsReportEvent message, and pass these hws in the incremental phase information to ensure that any binlog record is not processed repeatedly.


    private void  reportFinishedSnapshotSplitsIfNeed() {
    if (! finishedUnackedSplits. isEmpty()) {
    final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
    for (MySqlSnapshotSplit split : finishedUnackedSplits. values()) {
    finishedOffsets. put(split. splitId(), split. getHighWatermark());
    FinishedSnapshotSplitsReportEvent reportEvent =
    new FinishedSnapshotSplitsReportEvent (finishedOffsets);
    context. sendSourceEventToCoordinator(reportEvent);
    LOG. debug(
    "The subtask {} reports offsets of finished snapshot splits {}." ,

    MySqlSourceEnumerator gathers all the split hw, it creates a binlog split that contains the starting position where the binlog needs to be read (all shards hw minimum) and hw information for all shards.


    private MySqlBinlogSplit createBinlogSplit () {
    final List<MySqlSnapshotSplit> assignedSnapshotSplit =
    snapshotSplitAssigner. getAssignedSplits(). values(). stream()
    . sorted(Comparator. comparing(MySqlSplit::splitId))
    . collect(Collectors. toList());

    Map<String, BinlogOffset> splitFinishedOffsets =

    snapshotSplitAssigner. getSplitFinishedOffsets();
    final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();

    BinlogOffset minBinlogOffset = null;

    for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
    // find the min binlog offset
    BinlogOffset binlogOffset = splitFinishedOffsets. get(split. splitId());
    if (minBinlogOffset == null || binlogOffset. isBefore(minBinlogOffset)) {
    minBinlogOffset = binlogOffset;
    finishedSnapshotSplitInfos. add(
    new FinishedSnapshotSplitInfo(
    split. getTableId(),
    split. splitId(),
    split. getSplitStart(),
    split. getSplitEnd(),

    // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and

    // then transfer them

    boolean divideMetaToGroups = finishedSnapshotSplitInfos. size() > splitMetaGroupSize;

    return new MySqlBinlogSplit(
    minBinlogOffset == null ? BinlogOffset. INITIAL_OFFSET : minBinlogOffset,
    BinlogOffset. NO_STOPPING_OFFSET,
    divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
    new HashMap<>(),
    finishedSnapshotSplitInfos. size());

    When MySqlSourceEnumerator assigns the binlog shard to a MySqlSourceReader, the task moves from the full phase to the incremental phase. MySqlSourceReader reads the binlog data and uses shouldEmit to determine whether the record should be sent downstream.


    private boolean shouldEmit(SourceRecord  sourceRecord) {
    if (isDataChangeRecord(sourceRecord)) {
    TableId tableId = getTableId(sourceRecord);
    BinlogOffset position = getBinlogPosition(sourceRecord);
    if (hasEnterPureBinlogPhase(tableId, position)) {
    return true;
    // only the table who captured snapshot splits need to filter
    if (finishedSplitsInfo. containsKey(tableId)) {
    RowType splitKeyType =
    ChunkUtils. getSplitType(
    statefulTaskContext. getDatabaseSchema(). tableFor(tableId));
    Object[] key =
    statefulTaskContext. getSchemaNameAdjuster());
    for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo. get(tableId)) {
    if (RecordUtils. splitKeyRangeContains(
    key, splitInfo. getSplitStart(), splitInfo. getSplitEnd())
    && position. isAfter(splitInfo. getHighWatermark())) {
    return true;
    // not in the monitored splits scope, do not emit
    return false;
    // always send the schema change event and signal event
    // we need record them to state of Flink
    return true;


    binlog record is within the primary key range of a shard, if the record is after the hw of that shard, the record should be sent downstream. BinlogSplitReader configures maxSplitHighWatermarkMap to determine whether a record has been in the maximum hw of the table in which it is located, that is, whether the table has entered the Pure Binlog Phase.

    Full Read Phase Support Checkpoint

    Through the introduction in the previous section, we learned that Flink CDC 2.0 borrows the algorithm of DBLog to achieve lock-free reads, and elegantly realizes concurrent reads in the full phase with the help of the FLIP-27 framework. For checkpoint, by implementing the interface defined by FLIP-27, it is guaranteed that it will return to the previous state after restarting, and the completed split will not be re-read.

    State management on the MySqlSourceEnumerator side is implemented in the link-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/ directory. The state management of MySqlSourceReader is implemented in the flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/ directory, because the logic of this block is relatively more complex. This article will not cover it further.


    readers may find that after the full stage is converted to the incremental stage, if the binlog read earlier is purge at the source, the incremental stage will report an error because the corresponding binlog position cannot be read. This issue occurs because Flink CDC does not have persistent storage to ensure that binlogs are saved after the task starts. Increasing the parallelism to accelerate the throughput in the full phase can alleviate this problem to some extent.


    of version Release-2.1.1, only flink-connector-mysql-cdc supports the above CDC 2.0 features, in Release-2.2 Roadmap, a key feature is to extract the 2.0 framework to support other database connectors to use the 2.0 feature, see PR for details.

    References:1 Flink

    CDC 2.0 officially released, detailing the core improvements by Xu Pungjiang. 2021.72 Flink 1.11.0 released, what are the new features worth paying attention to? by Wang Zhijiang 2020.7

    3 The first draft for MySQL-CDC 2.0 implementation

    Original address:




    public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords.

    like + Looking, less bugs 👇