> background This
topic describes how to import data in TiDB into Kafka through TiCDC and then consume it by Flink.
In order to quickly verify the functionality of the entire process, all components are deployed as stand-alone machines. If you need to deploy in a production environment, we recommend that you replace each component with a highly available cluster deployment solution.
Among them, we created a separate set of Zookeeper single-node environment, and Flink, Kafka, and other components share this Zookeeper environment.
For all components that require JRE, such as Flink, Kafka, Zookeeper, we choose to use each component independently of its own JRE environment, considering that upgrading the JRE may affect other applications.
This article is divided into two parts, the first five sections mainly introduce the basic environment construction, and the last section introduces how data flows in each component.

The application scenarios introduce the structure of TiDB + Flink, which supports the
development and operation of many different kinds of applications.
At present, the main features mainly include:
-
batch flow integration -
and precision state management -
event time support -
Accurate first-time state consistency guarantee
Flink can run on a variety of resource management frameworks including YARN, Mesos, Kubernetes, and supports independent deployment on bare metal clusters. TiDB can be deployed on AWS, Kubernetes, GCP GKE, and also supports independent deployment on bare metal clusters using TiUP.
The common types of applications of the TiDB + Flink structure are as follows:
-
event-driven application -
anti-fraud -
anomaly detection -
Rule-based alerting -
Business process -
application -
network quality monitoring -
product update and test evaluation analysis
-
factual data ad-hoc analysis -
Large-scale graph analysis -
data pipeline applies -
real-time query index to build e-commerce -
continuous ETL
monitoring data analysis
e-commerce
Operating System Environment
[root@r20 topology]# cat /etc/redhat-releaseCentOS Stream release 8
Software environment
machine assignment
Compared with traditional stand-alone databases, TiDB Cluster has the following advantages:
-
pure distributed architecture, good scalability, and support elastic scaling and contracting -
Support SQL, expose MySQL network protocols to the outside world, and compatible with most MySQL syntax, can be directly replaced in most scenarios MySQL -
supports high availability by default, in the case of a few replica failures, the database itself can automatically repair and fail over -
support for ACID transactions, For some scenarios with strong consistent requirements, for example: bank transfer -
has a rich tool chain ecology, covering data migration, synchronization, backup and other scenarios
, transparent
In terms of kernel design, TiDB distributed database splits the overall architecture into multiple modules, and each module communicates with each other to form a complete TiDB system. The corresponding architecture diagram is as follows:

In this article, we only do the simplest functional tests, so we deploy a set of single-node but replica TiDB, involving the following three modules
:
-
TiDB Server: SQL layer, which exposes the connection endpoint of the MySQL protocol, is responsible for accepting client connections and executing SQL Parse and optimize, and finally generate a distributed execution plan. -
PD (Placement Driver) Server: The meta-information management module of the entire TiDB cluster, responsible for storing the real-time data distribution of each TiKV node and the overall topology of the cluster, providing the TiDB Dashboard management and control interface, and assigning transaction IDs to distributed transactions. -
TiKV Server: responsible for storing data, from the outside TiKV is a distributed key-value storage engine that provides transactions.
TiUP Deployment Template File
# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing. global: user: "tidb"
ssh_port: 22 deploy_dir: "/opt/tidb-c1/"
data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
# node_exporter_port: 19100
# blackbox_exporter_port: 39115
# deploy_dir: "/opt/tidb-c3/monitored"
# data_dir: "/opt/tidb-c3/data/monitored"
# log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
# # All configuration items use points to represent the hierarchy, e.g:
# # readpool.storage.use-unified-pool
# #
# # You can overwrite this configuration via the instance-level `config` field. server_configs: tidb: log.slow-threshold: 300 binlog.enable: false
binlog.ignore-error: false
tikv-client.copr-cache.enable: true tikv: server.grpc-concurrency: 4 raftstore.apply-pool-size: 2 raftstore.store-pool-size: 2 rocksdb.max-sub-compactions: 1 storage.block-cache.capacity: " 16GB"
readpool.unified.max-thread-count: 12 readpool.storage.use-unified-pool: false
readpool.coprocessor.use-unified-pool: true raftdb.rate-bytes-per-sec: 0 pd: schedule.leader-schedule-limit: 4 schedule.region-schedule-limit: 2048 schedule.replica-schedule-limit: 64pd_servers: - host: 192.168.12.21 ssh_ port: 22 name: "pd-2"
client_port: 12379 peer_port: 12380 deploy_dir: "/opt/tidb-c1/pd-12379"
data_dir: "/opt/tidb-c1/data/ pd-12379"
log_dir: "/opt/tidb-c1/log/pd-12379"
numa_node: "0"
# # The following configs are used to overwrite the `server_ configs.pd` values. config: schedule.max-merge-region-size: 20 schedule.max-merge-region-keys: 200000tidb_servers: - host: 192.168.12.21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: "/opt/tidb-c1/tidb-14000"
log_dir: "/opt/tidb-c1/log/tidb-14000"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tidb` values. config: log.slow-query-file: tidb-slow-overwrited.log tikv-client.copr-cache.enable: true
tikv_servers: - host: 192.168.12.21 ssh_port: 22 port: 12160 status_ port: 12180 deploy_dir: "/opt/tidb-c1/tikv-12160"
data_dir: "/opt/tidb-c1/data/tikv-12160"
log_dir: "/opt/tidb-c1/log/ tikv-12160"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tikv` values. config: server.grpc-concurrency: 4 #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
#monitoring_servers:
# - host: 192.168.12.21
# ssh_port: 22
# port: 19090
# deploy_dir: "/opt/tidb-c1/prometheus-19090"
# data_dir: "/opt/tidb-c1/data/prometheus-19090"
# log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
# - host: 192.168.12.21
# port: 13000
# deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
# - host: 192.168.12.21
# ssh_port: 22
# web_port: 19093
# cluster_port: 19094
# deploy_dir: "/opt/tidb-c1/ alertmanager-19093"
# data_dir: "/opt/tidb-c1/data/alertmanager-19093"
# log_dir: "/opt/tidb-c1/log/alertmanager-19093"
TiDB Cluster Environment
This article does not focus on deploying TiDB Cluster, as a quick experimental environment, only a single-copy TiDB Cluster cluster is deployed on one machine. You do not need to deploy a monitoring environment.
[root@r20 topology]# tiup cluster display tidb-c1-v409Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/ tiup-cluster display tidb-c1-v409Cluster type: tidb
Cluster name: tidb-c1-v409Cluster version: v4.0.9SSH type: builtin
Dashboard URL: http://192.168.12.21:12379/dashboardID Role Host Ports OS/Arch Status Data Dir Deploy Dir-- ---- ---- ----- ------- ------ -------- ----------192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up| L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160Total nodes: 4Create
a table
mysql > for testing show create table t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+| Table | Create Table |+-------+-------------------------------------------------------------------------------------------------------------------------------+| t1 | CREATE TABLE `t1` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |+-------+-------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.00 sec)
Deploy the Zookeeper environment In this lab, the
Zookeeper environment
is configured separately to serve the Kafka and Flink environments.
As an experimental demonstration protocol, only a stand-alone environment is deployed.
Unzip the Zookeeper package
[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper deploys jre[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz for Zookeeper
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
modifies the /opt/zookeeper/bin/zkEnv.sh file to add JAVA_HOME environment variables
## add bellowing env var in the head of zkEnv.shJAVA_HOME=/opt/zookeeper/jre Create Zookeeper's
configuration file
[root@r24 conf]# cat zoo.cfg | grep -v "#"tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper/dataclientPort=2181
Start Zookeeper
[root@r24 bin]. # /opt/zookeeper/bin/zkServer.sh start check
the status of Zookeeper
##
check zk status
[root@r24 bin]# ./ zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper/bin/.. /conf/zoo.cfgClient port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone## check OS port status
[root@r24 bin]# netstat -ntlp Active Internet connections (only servers)Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program nametcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshdtcp6 0 0 :::2181 :::* LISTEN 15062/javatcp6 0 0 :::8080 :::* LISTEN 15062/javatcp6 0 0 :::22 :::* LISTEN 942/sshdtcp6 0 0 :::44505 :::* LISTEN 15062/java ## use zkCli tool to check zk connection
[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181
Advice about Zookeeper
I personally have a suggestion about Zookeeper’s immature tip:
The
Zookeeper cluster version must enable network monitoring.
In particular, pay attention to the network bandwidth in System Metrics.
Deployment Kafka
is a distributed stream processing platform that is mainly used in two types of applications:
-
constructing real-time streaming data pipelines that can reliably obtain data between systems or applications. (equivalent to message queue). -
Build real-time streaming applications to transform or influence this streaming data. (i.e. stream processing, which varies internally between the Kafka Stream topic and the topic).

Kafka There are four core APIs:
-
The Producer API allows an application to publish a stream of data to one or more Kafka topics. -
The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them. -
The Streams API allows an application to act as a stream processor, consuming input streams generated by one or more topics, and then producing an output stream into one or more topics, effectively transforming the input and output streams. -
The Connector API allows you to build and run reusable producers or consumers, connecting Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.
In this experiment, only functional verification is performed, and only a stand-alone version of the Kafka environment is built.
Download and unzip Kafka
[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka
部署用於 Kafka 的
jre[root@r22 soft]#
tar vxzf jre1.8.0_281.tar.gz[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre
Modify Kafka’s jre environment variable
[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh#
# add bellowing line in the head of kafka-run-class.shJAVA_HOME=
/opt/kafka/jre modify Kafka configuration
file modify Kafka configuration file /opt/kafka/config/server.properties ## change bellowing variable in /opt/kafka/config/server.properties
broker.id=0listeners=PLAINTEXT://192.168.12.22:9092log.dirs=/opt/kafka/logszookeeper.connect= i192.168.12.24:2181
starts Kafka
[root@r22 bin]# /opt/kafka/bin/
kafka-server-start.sh /opt/kafka/config/server.properties
View Kafka version information
Kafka does not provide the --version optional to view Kafka's version information. [root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
-rw-r--r-- 1 root root 4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc-rw-r--r-- 1 root root 41793 Dec 16 09:02 kafka_2.13-2.7.0- javadoc.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc-rw-r--r-- 1 root root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc... ...
where 2.13 is the version information of scale and 2.7.0 is the version information of Kafka.
Apache
Flink is a framework and distributed processing engine for stateful computation on borderless and bounded data streams. Flink runs in all common cluster environments and can compute at memory speed and at any scale.
Apache Flink, a distributed processing framework
that supports high throughput, low latency, and high performance, is a framework and distributed processing engine for stateful computing of unbounded and bounded data streams. Flink is designed to run in all common clustered environments, performing computations at in-memory execution speeds and at any scale.

This lab only performs functional testing and only deploys a stand-alone Flink environment.
Download and distribute Flink
[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
[root@r23 soft]# mv flink-1.12.1 /opt/flink
部署 Flink 的 jre
[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/ flink/jre
to add lib
Flink to consume Kafka data, flink-sql-connector-kafka package Flink MySQL/TiDB, flink-connector-jdbc package is required [root@r23 soft] # mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
Modify the Flink configuration file
## Add or modify bellowing lines in/opt/flink/conf/flink-conf.yaml jobmanager.rpc.address: 192.168.12.23env.java.home: /opt/flink/jre
Start Flink
[root@r23 ~]# /opt/flink/bin/start-cluster.shStarting cluster. Starting standalonesession daemon on host r23. Starting taskexecutor daemon on host r23.
Check out the Flink GUI

The TiCDC runtime is a stateless node that achieves high availability through etcd inside PD. TiCDC clusters support creating multiple synchronization tasks to synchronize data to different downstreams.
The system architecture of TiCDC is shown in the following figure

TiCDC System role
:
-
internal logic assembles KV change log. -
Provides an interface for output KV change log, and sends data including real-time change log and incremental scan change log. -
Capture: TiCDC runs the process, and multiple captures form a TiCDC cluster, which is responsible for the synchronization of KV change log. -
Each capture is responsible for pulling a portion of the KV change log.
-
sort one or more KV change logs pulled. -
Restore transactions downstream or output according to TiCDC Open Protocol.
>TiKV CDC component: outputs only key-value (KV) change log.
Create Kafka Topic ticdc-test
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --create \ > --zookeeper 192.168.12.24:2181 \> --config max.message.bytes=12800000 \> --config flush.messages=1 \> --replication-factor 1 \> --partitions 1 \> --topic ticdc-testCreated topic ticdc-test.
View all topic
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181 ticdc-testview
Information about the Topic
ticdc-test in Kafka
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic ticdc-test Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1 Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0Create
Kafka’s changefeed in TiCDC Create a changefeed configuration file, open enable-old-value:## create a changefeed
configuration file
[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
enable-old-value=true
Create Kafka’s
changefeed:
## create a changefeed for kafka
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed create \> --pd=http://192.168.12.21:12379 \> --sink-uri= "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-old-value=true&protocol=canal-json" \
> --changefeed-id="ticdc-kafka" \> --config=/opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.confCreate changefeed successfully!ID: ticdc-kafkaInfo: {"sink-uri": "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\u0026artition-num=1\u0026max-message-bytes=67108864\u0026replication-factor=1\u0026enable-old-value=true\ u0026protocol=canal-json","opts":{"max-message-bytes":"67108864"},"create-time":"2021-02-22T00:0 8:50.185073755-05:00","start-ts":423092690661933057,"target-ts":0,"admin-job-type":0,"sort-engine":" memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value": true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":[ "*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{ "dispatchers":null,"protocol":"canal-json"},"cyclic-replication":{"enable":false," replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{" type":"table-number","polling-time":-1}},"state":"normal","history":null,"error": null,"sync-point-enabled":false,"sync-point-interval":600000000000}
[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/ cdc-changefeed-old-value-enabled.conf
where Kafka’s sink url parameter is configured as follows:

view what has been created changefeed:
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 list[ { "id": "ticdc-kafka",
"summary": {
"state": "normal",
"tso": 423092789699936258,
" checkpoint": "2021-02-22 00:15:07.974",
"error": null } }]
View information about the ticdc-kafka changefeed:
[ root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 query -c ticdc-kafka{ "info": {
"sink-uri" : "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\u0026artition-num=1\u0026max-message-bytes=67108864\u0026replication-factor=1\ u0026enable-old-value=true\u0026protocol=canal -json",
"opts": {
"max-message-bytes" : "67108864" }, "create-time": "2021-02-22T00:08:50.185073755-05:00",
"start-ts": 423092690661933057,
"target-ts": 0,
"admin-job-type": 0,
"sort-engine": "memory",
"sort-dir": ".",
" config": {
"case-sensitive": true,
"enable-old-value": true,
"force-replicate": false,
"check-gc-safe-point": true,
"filter": {
"rules": [
"*.*" ], "ignore-txn-start-ts": null,
"ddl-allow-list": null }, "mounter": {
"worker-num": 16 }, "sink" : {
"dispatchers": null,
"protocol": "canal-json" }, "cyclic-replication": {
"enable": false,
"replica-id": 0,
"filter-replica-ids": null,
"id-buckets": 0,
"sync-ddl": false }, "scheduler": {
"type": "table-number",
"polling-time": -1 } }, "state": "normal" ,
"history": null,
"error": null,
"sync-point-enabled": false,
"sync-point-interval" : 600000000000 }, "status": {
"resolved-ts": 423093295690285057,
"checkpoint-ts": 423093295428403201,
" admin-job-type": 0 }, "count": 0,
"task-status": []}View
consumer information in Kafka
After creating a Kafka changefeed in TiCDC and streaming data to the ticdc-test topic in Kafka, the TiCDC -> Kafka channel is established.
Insert a piece of data to test:
mysql> insert into t1 values(1); Query OK, 1 row affected (0.00 sec)You
can see the following information in the
log output of TiCDC: [2021/02/22 01:14:02.816 -05:00] [INFO] [statistics.go:118] ["sink replication status"] [name=MQ] [changefeed=ticdc-kafka] [captureaddr=192.168.12.21:18300] [count=1] [qps=0]
At this time, look at Kafka’s consumer information, you can see that the data has come:
[root@r22 bin]# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic ticdc-test --from-beginning
{ "id":0,"database":"test","table":"t1","pkNames":["id"],"isDdl" :false,"type":"INSERT","es":1613974420325,"ts":0,"sql":"","sqlType" :{"id":-5},"mysqlType":{"id":"int"},"data":[{"id":"1"}],"old" :[null]}
Kafka -> The Flink pathIn Flink’s
sql-client, create the t1 table, and the connector uses the kafka type:
[root@r23 ~ ]# /opt/flink/bin/sql-client.sh embedded
## create a test table t1 in Flink SQL> create table t1(id int)> WITH (> 'connector' = ' kafka',
> 'topic' = 'ticdc-test',
> 'properties.bootstrap.servers' = '192.168.12.22:9092',
> ' properties.group.id' = 'cdc-test-consumer-group',
> 'format' = 'canal-json',
> 'scan.startup.mode' = ' latest-offset'> ); Flink SQL> select * from t1;
Insert data in TiDB
and query from Flink:
## insert a test row in TiDBmysql> insert into test.t1 values(4); Query OK, 1 row affected (0.00 sec)## check the result from Flink
SQL Query Result (Table) Refresh: 1 s Page: Last of 1 Updated: 03:02:28.838 id 4
This article is transferred from: https://asktug.com/t/topic/68884
Author: Everyone who understands
knows end
public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.
like + Looking, less bugs 👇