pictures and texts to explain CDC technology in detail, just read this one!
is guaranteed by locking <
ul class=”list-paddingleft-2″
> Debezium When ensuring data consistency, Need to lock the read database or table, global lock may cause the database to hang
, table-level lock will lock the table read
does not support horizontal scaling
-
Flink CDC currently only supports single concurrency, in the full stage of the read stage, if the table is very large (hundreds of millions level), the read time is in hours The level
full read phase does not support checkpoint
-
CDC read is divided into two phases, full read and incremental read, full read phase does not support checkpoint, and needs to be re-read after failing
Flink CDC 2.0 is designed to solve the above pain points, borrowing the lock-free algorithm of Netflix DBLog and implementing it based on FLIP-27 to achieve the following goals:
FLIP-27
Let’s take a look at FLIP-27 before introducing Flink CDC 2.0. FLIP-27 aims to address several pain points in SourceFunction
:
-
split work discovery logic and actual data reading logic are coupled in
SourceFunction
andIn the DataStream
-
batching and stream processing requiring
-
partitions/shards/splits are not defined in the interface, making it difficult to implement event time alignment, Partitioning watermarks, dynamic split allocations, work stealing, and other features
-
Checkpoint locks are occupied by the source function, which will cause a series of problems, making the framework difficult to optimize
-
There is no common framework, which means that each source implements a complex threading model, making it more difficult to implement and test new sources
interface, concepts such as
different source partitions/shards/splits to implement different source
FLIP-27’s Source API consists of two components:
-
SplitEnumerator
The -
SourceReader
is responsible for reading the actual data of split, and running in TaskManager
two components that are responsible for discovering and discovering splits, running in JobManager
encapsulate the core functionality, making the Source interface itself just for creating Factory classes for SplitEnumerator
and SourceReader
. The following diagram shows the topological relationship between SplitEnumerator
and SourceReader
.
The source connector implemented under the new architecture can achieve batch stream integration, the only difference is that the batch SplitEnumerator
will produce a fixed number of split sets and each split is a finite data set; For stream processing, SplitEnumerator
either produces an infinite number of splits or splits are themselves infinite data sets.
SPLITEnumerator
and SourceReader
require a specific implementation class provided by the user, and FLIP-27 introduces a common messaging mechanism between these two components to communicate through the pass-through SourceEvent
interface, as shown in the following figure
SourceCoordinato r
and SourceOperator
, as the OperatorCoordinator
and Operator's
implementation of FLIP-27 in the figure above, have the following class structure (the Failover related structure is ignored in the figure):
can be seen SourceCoordinator
encapsulates SplitEnumerator
, and SourceOperator
encapsulates SourceReader
, and its interface is very simple when the Failover related structure is not considered, and the Flink framework calls The SourceCoordinator
#start function creates SplitEnumerator
and starts it, calling SourceOperator#open
to create, register, and start SourceReader
, the sequence diagram is as follows:
SourceOperator#emitNext
calls the SourceReader#pollNext
interface to pass the read data downstream.
The lowest level interface is designed to be very generic, which makes it very flexible, but also makes it difficult to implement complex readers. Therefore, FLIP-27 proposes to provide a simpler interface to allow blocking calls by building higher-level abstractions. SourceReaderBase
, as an abstract implementation of SourceReader
, provides a synchronization mechanism between the main mail box thread and the internal reading thread, and the user only needs to focus on:
-
performing record parsing and transformation (by implementing the
RecordEmitter
interface), -
extracting timestamps and choosing whether to process watermarks
> Fetching records from an external system
(by implementing the SplitReader interface),
is shown below SourceReaderBase's
workflow:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RvSRzdibrOGG00MKd1rREnu1oNlCdXxNSiapfLGOnccGSApu7Eoqicz0pa6Z9oaiagL7trqp7pEz41Etw/640?wx_fmt=png”>
Flink CDC 2.0 full incremental read implementation principle
FLIP-27
provides a mechanism for parallel reading of multiple operators at the framework level, let’s take a look at how Flink CDC 2.0 combines FLIP-27 and DBLog lock-free algorithms to seamlessly convert to single-threaded reading incremental data after concurrent reading of full data.
The table structure read at the source must have a physical primary key to partition the table, and ChunkSplitter can evenly divide the table into (max – min)/chunkSize split (min/max refers to the minimum and maximum values of the primary key) according to the primary key, or use the limit query to ensure that there are chunkSize data in a split (the number of last split data records < = chunkSize).
DBLog’s proposed algorithm is called Watermark-based Chunk Selection
, by maintaining a single-row, single-column table in the source database as an aid, updating the record before and after querying each chunk (that is, split) data so that two events lw (low water) and hw (high water) are produced in the transaction log, and then the log between the select data and [lw, hw] is processed to obtain a set of data with a point-in-time of hw for the chunk. Unlike DBLog, Flink CDC 2.0 does not maintain additional tables, but uses SHOW MASTER STATUS
to obtain binlog offsets before and after the select data, which avoids intrusion into the source system.
THE
LOGIC OF SNAPSHOT READING:
- SHOW MASTER STATUS TO GET LW, INSERT A QUEUE TO
-
READ THE RECORDS IN THE SHARD, AND INSERT A QUEUE
-
-
TO SHOW MASTER STATUS
Get -
to determine whether there is an incremental
-
If there is no change, insert a BINLOG_END record
-
otherwise read the binlog between [lw, hw] and insert it into the queue, the last record is BINLOG_END
hw, insert the queue
change between lw and hw
into the queue
MySqlSnapshotSplitReadTask#doExecute
final BinlogOffset lowWatermark = currentBinlogOffset( jdbcConnection);
LOG. info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
snapshotSplit ); ((SnapshotSplitReader. SnapshotSplitChangeEventSourceContextImpl) (context))
. setLowWatermark(lowWatermark);
signalEventDispatcher. dispatchWatermarkEvent(
snapshotSplit, lowWatermark, SignalEventDispatcher. WatermarkKind. LOW); LOG. info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, snapshotSplit. getTableId()); final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
LOG. info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit );
signalEventDispatcher. dispatchWatermarkEvent(
snapshotSplit, highWatermark, SignalEventDispatcher. WatermarkKind. HIGH);
((SnapshotSplitReader. SnapshotSplitChangeEventSourceContextImpl) (context))
. setHighWatermark(highWatermark);
Data in the queue after the above code is executed:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_jpg/1flHOHZw6RvSRzdibrOGG00MKd1rREnu1PM5uXtz4Lu8Xxb16gUibpRoxKZqq9P5iaY0IOHN0zsCTzZgvQ9BTYzTg/640?wx_fmt=jpeg”>
then use to read between lw and hw binlog, the data read append to the same queue, the data in the queue is:
class=”rich_pages wxw-img” src=”https://mmbiz.qpic.cn/mmbiz_png/1flHOHZw6RvSRzdibrOGG00MKd1rREnu1QIYyPMYCibQaZ21iaggn64vKO6LHLK0kCjap7uNgFjANicAaZJ2wSUaUA/640?wx_fmt=png”>
and then correct the data in the queue, Get the data for the shard point-in-time hw.
RecordUtils#normalizedSplitRecords separates
snapshot records and
binlog records based on high and low water
levelsSourceRecords lowWatermark = sourceRecords. get(0);
SourceRecord highWatermark = null;
int i = 1;
for (; i < sourceRecords. size(); i++) {
SourceRecord sourceRecord = sourceRecords. get(i);
if (! isHighWatermarkEvent(sourceRecord)) {
snapshotRecords. put((Struct) sourceRecord. key(), sourceRecord);
} else {
highWatermark = sourceRecord;
i++;
break; }
} binlog records contain logs that are not of this shard, you need to filter out the logs belonging to this shard
if (i < sourceRecords). . size() - 1) {
List<SourceRecord> allBinlogRecords =
sourceRecords. subList(i, sourceRecords. size() - 1);
for (SourceRecord binlog : allBinlogRecords) {
if ( isDataChangeRecord(binlog)) {
Object[] key =
getSplitKey(snapshotSplit . getSplitKeyType(), binlog, nameAdjuster);
if (splitKeyRangeContains(
key, snapshotSplit. getSplitStart(), snapshotSplit. getSplitEnd())) {
binlogRecords. add(binlog); } }
} }
} Upsert logic from Snapshot Records and Binlog Records to obtain data with point-in-time hw
normalizedRecords =
upsertBinlog(
snapshotSplit,
lowWatermark,
highWatermark,
snapshotRecords,
binlogRecords);
upsertBinlog logic:
-
add lw to normalizedBinlogRecords
-
to iterate over the records in binlogRecords
-
For creating a record, use map.put into snapshotRecords
-
Add snapshotRecords.values
-
returns normalizedBinlogRecords
For delete records, delete them from snapshotRecords For update records, put After in the record as a READ record map.put into snapshotRecords
to normalizedBinlogRecords Add hw to normalizedBinlogRecords
RecordUtils#upsertBinlog
private static List<SourceRecord> upsertBinlog(
MySqlSplit split,
SourceRecord lowWatermarkEvent,
SourceRecord highWatermarkEvent,
Map<Struct, SourceRecord> snapshotRecords ,
List<SourceRecord> binlogRecords) {
final List <SourceRecord> normalizedBinlogRecords = new ArrayList<>();
normalizedBinlogRecords. add(lowWatermarkEvent);
// upsert binlog events to snapshot events of split
if (! binlogRecords. isEmpty()) {
for (SourceRecord binlog : binlogRecords) {
Struct key = (Struct) binlog. key();
Struct value = (Struct) binlog. value();
if (value != null) {
Envelope. Operation operation =
Envelope. Operation. forCode(
value. getString(Envelope. FieldName. OPERATION));
switch (operation) {
case UPDATE:
Envelope envelope = Envelope. fromSchema(binlog. valueSchema());
Struct source = value. getStruct(Envelope. FieldName. SOURCE);
Struct updateAfter = value. getStruct(Envelope. FieldName. AFTER);
Instant ts =
Instant. ofEpochMilli(
(Long) source. get(Envelope. FieldName. TIMESTAMP));
SourceRecord record =
new SourceRecord(
binlog. sourcePartition(),
binlog. sourceOffset(),
binlog. topic(),
binlog. kafkaPartition(),
binlog. keySchema(),
binlog. key(),
binlog. valueSchema(),
envelope. read(updateAfter, source, ts));
snapshotRecords. put(key, record);
break;
case DELETE:
snapshotRecords. remove(key);
break;
case CREATE:
snapshotRecords. put(key, binlog);
break;
case READ:
throw new IllegalStateException(
String. format(
"Binlog record shouldn't use READ operation, the the record is %s." ,
binlog));
}
}
}
}
normalizedBinlogRecords. addAll(snapshotRecords. values());
normalizedBinlogRecords. add(highWatermarkEvent);
return normalizedBinlogRecords;
The
above process is executed concurrently on multiple SourceReaders
without affecting each other. Suppose that the three tables t1/t2/t3 synchronized by a task are divided into 6 shards, and due to concurrent execution, one of the possible positions of its high and low water levels on the binlog is shown in the following figure:
It can be seen that t1.split1 and t2.split1 read binlog ranges with overlap (both read binlogs between [lw2.1, hw1.1]), and t3.split2 can be executed earlier than t3.split1. These crossovers or out-of-order do not affect correctness, because MySqlSourceReader will report each split hw to MySqlSourceEnumerator
via the FinishedSnapshotSplitsReportEvent
message, and pass these hws in the incremental phase information to ensure that any binlog record is not processed repeatedly.
MySqlSourceReader#reportFinishedSnapshotSplitsIfNeed
private void reportFinishedSnapshotSplitsIfNeed() {
if (! finishedUnackedSplits. isEmpty()) {
final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
for (MySqlSnapshotSplit split : finishedUnackedSplits. values()) {
finishedOffsets. put(split. splitId(), split. getHighWatermark());
}
FinishedSnapshotSplitsReportEvent reportEvent =
new FinishedSnapshotSplitsReportEvent (finishedOffsets);
context. sendSourceEventToCoordinator(reportEvent);
LOG. debug(
"The subtask {} reports offsets of finished snapshot splits {}." ,
subtaskId,
finishedOffsets);
}}
When
MySqlSourceEnumerator
gathers all the split hw, it creates a binlog split that contains the starting position where the binlog needs to be read (all shards hw minimum) and hw information for all shards.
MySqlHybridSplitAssigner#createBinlogSplit
private MySqlBinlogSplit createBinlogSplit () {
final List<MySqlSnapshotSplit> assignedSnapshotSplit =
snapshotSplitAssigner. getAssignedSplits(). values(). stream()
. sorted(Comparator. comparing(MySqlSplit::splitId))
. collect(Collectors. toList()); Map<String, BinlogOffset> splitFinishedOffsets =
snapshotSplitAssigner. getSplitFinishedOffsets();
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>(); BinlogOffset minBinlogOffset = null;
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets. get(split. splitId());
if (minBinlogOffset == null || binlogOffset. isBefore(minBinlogOffset)) {
minBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos. add(
new FinishedSnapshotSplitInfo(
split. getTableId(),
split. splitId(),
split. getSplitStart(),
split. getSplitEnd(),
binlogOffset));
} // the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
// then transfer them boolean divideMetaToGroups = finishedSnapshotSplitInfos. size() > splitMetaGroupSize;
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
minBinlogOffset == null ? BinlogOffset. INITIAL_OFFSET : minBinlogOffset,
BinlogOffset. NO_STOPPING_OFFSET,
divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
new HashMap<>(),
finishedSnapshotSplitInfos. size());
When MySqlSourceEnumerator assigns the binlog shard to a MySqlSourceReader
, the task moves from the full phase to the incremental phase.
MySqlSourceReader
reads the binlog data and uses shouldEmit
to determine whether the record should be sent downstream.
BinlogSplitReader#shouldEmit
private boolean shouldEmit(SourceRecord sourceRecord) {
if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
if (hasEnterPureBinlogPhase(tableId, position)) {
return true;
}
// only the table who captured snapshot splits need to filter
if (finishedSplitsInfo. containsKey(tableId)) {
RowType splitKeyType =
ChunkUtils. getSplitType(
statefulTaskContext. getDatabaseSchema(). tableFor(tableId));
Object[] key =
getSplitKey(
splitKeyType,
sourceRecord,
statefulTaskContext. getSchemaNameAdjuster());
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo. get(tableId)) {
if (RecordUtils. splitKeyRangeContains(
key, splitInfo. getSplitStart(), splitInfo. getSplitEnd())
&& position. isAfter(splitInfo. getHighWatermark())) {
return true;
}
}
}
// not in the monitored splits scope, do not emit
return false;
}
// always send the schema change event and signal event
// we need record them to state of Flink
return true;
When
a
binlog record is within the primary key range of a shard, if the record is after the hw of that shard, the record should be sent downstream. BinlogSplitReader
configures maxSplitHighWatermarkMap
to determine whether a record has been in the maximum hw of the table in which it is located, that is, whether the table has entered the Pure Binlog Phase.
Full Read Phase Support Checkpoint
Through the introduction in the previous section, we learned that Flink CDC 2.0 borrows the algorithm of DBLog to achieve lock-free reads, and elegantly realizes concurrent reads in the full phase with the help of the FLIP-27 framework. For checkpoint, by implementing the interface defined by FLIP-27, it is guaranteed that it will return to the previous state after restarting, and the completed split will not be re-read.
State management on the MySqlSourceEnumerator
side is implemented in the link-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/
directory. The state management of MySqlSourceReader
is implemented in the flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/
directory, because the logic of this block is relatively more complex. This article will not cover it further.
Careful
readers may find that after the full stage is converted to the incremental stage, if the binlog read earlier is purge at the source, the incremental stage will report an error because the corresponding binlog position cannot be read. This issue occurs because Flink CDC does not have persistent storage to ensure that binlogs are saved after the task starts. Increasing the parallelism to accelerate the throughput in the full phase can alleviate this problem to some extent.
As
of version Release-2.1.1, only flink-connector-mysql-cdc supports the above CDC 2.0 features, in Release-2.2 Roadmap, a key feature is to extract the 2.0 framework to support other database connectors to use the 2.0 feature, see PR for details.
References:1 Flink
3 The first draft for MySQL-CDC 2.0 implementation
Original address: https://zhjwpku.com/2022/01/16/flink-cdc-2-0-analysis.html#flink-cdc-20-%E5%85%A8%E9%87%8F%E5%A2%9E%E9%87%8F%E8%AF%BB%E5%8F%96%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86
end
public number (zhisheng ) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitor keywords such as to view more articles corresponding to keywords. like + Looking, less bugs 👇