background
This topic describes how to import MySQL data into Kafka in the form of Binlog + Canal, and then consume it by Flink.
In order to quickly verify the functionality of the entire process, all components are deployed as stand-alone machines. If you don’t have enough physical resources, you can build all the components in this article in a 4G 1U virtual machine environment.
If you need to deploy in a production environment, we recommend that you replace each component with a highly available cluster deployment solution.
Among them, we created a separate set of Zookeeper single-node environment, and Flink, Kafka, Canal and other components share this Zookeeper environment.
For all components that require a JRE, such as Flink, Kafka, Canal, Zookeeper, considering that upgrading the JRE may affect other applications, we choose to use each component independently of its own JRE environment.
This article is divided into two parts, the first seven sections mainly introduce the construction of the basic environment, and the last section introduces how data flows in each component.

The data flows through the following components:
-
MySQL data source -
reads Binlog, generates Canal json, and pushes it to the topic specified by Kafka -
Flink uses the flink-sql-connector-kafka API to consume data in a Kafka topic -
writes data to TiDB + Flink in TiDB through flink-connector-jdbc
generates Binlog Canal
Flink
structure to support the development and running of many different kinds of applications.
At present, the main features mainly include:
-
batch flow integration -
precision state management -
event time support -
Accurate first-time state consistency guarantee
Flink can run on a variety of resource management frameworks including YARN, Mesos, Kubernetes, and supports independent deployment on bare metal clusters. TiDB can be deployed on AWS, Kubernetes, GCP GKE, and also supports independent deployment on bare metal clusters using TiUP.
The common types of applications of the TiDB + Flink structure are as follows:
-
event-driven application -
anti-fraud -
anomaly detection -
Rule-based alerting -
Business process -
application -
network quality monitoring -
product update and test evaluation analysis
-
factual data ad-hoc analysis -
Large-scale graph analysis -
data pipeline applies -
real-time query index to build e-commerce -
continuous ETL
monitoring data analysis
e-commerce
Environment IntroductionOperating
System Environment
[root@r20 topology]# cat /etc/ redhat-releaseCentOS Stream release 8
software environment

machine assignment

Compared with
traditional stand-alone databases, TiDB
has the following advantages:
pure distributed architecture, with good
and
in kernel design, TiDB The distributed database splits the overall architecture into multiple modules, and each module communicates with each other to form a complete TiDB system. The corresponding architecture diagram is as follows:

In this article, we only do the simplest functional tests, so we deploy a set of single-node but replica TiDB, involving the following three modules
:
-
TiDB Server: SQL layer, which exposes the connection endpoint of the MySQL protocol, is responsible for accepting client connections and executing SQL Parse and optimize, and finally generate a distributed execution plan. -
PD (Placement Driver) Server: The meta-information management module of the entire TiDB cluster, responsible for storing the real-time data distribution of each TiKV node and the overall topology of the cluster, providing the TiDB Dashboard management and control interface, and assigning transaction IDs to distributed transactions. -
TiKV Server: responsible for storing data, from the outside TiKV is a distributed key-value storage engine that provides transactions.
TiUP Deployment Template File
# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing. global: user: "tidb"
ssh_port: 22 deploy_dir: "/opt/tidb-c1/"
data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
# node_exporter_port: 19100
# blackbox_exporter_port: 39115
# deploy_dir: "/opt/tidb-c3/monitored"
# data_dir: "/opt/tidb-c3/data/monitored"
# log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
# # All configuration items use points to represent the hierarchy, e.g:
# # readpool.storage.use-unified-pool
# #
# # You can overwrite this configuration via the instance-level `config` field. server_configs: tidb: log.slow-threshold: 300 binlog.enable: false
binlog.ignore-error: false
tikv-client.copr-cache.enable: true tikv: server.grpc-concurrency: 4 raftstore.apply-pool-size: 2 raftstore.store-pool-size: 2 rocksdb.max-sub-compactions: 1 storage.block-cache.capacity: " 16GB"
readpool.unified.max-thread-count: 12 readpool.storage.use-unified-pool: false
readpool.coprocessor.use-unified-pool: true raftdb.rate-bytes-per-sec: 0 pd: schedule.leader-schedule-limit: 4 schedule.region-schedule-limit: 2048 schedule.replica-schedule-limit: 64pd_servers: - host: 192.168.12.21 ssh_ port: 22 name: "pd-2"
client_port: 12379 peer_port: 12380 deploy_dir: "/opt/tidb-c1/pd-12379"
data_dir: "/opt/tidb-c1/data/ pd-12379"
log_dir: "/opt/tidb-c1/log/pd-12379"
numa_node: "0"
# # The following configs are used to overwrite the `server_ configs.pd` values. config: schedule.max-merge-region-size: 20 schedule.max-merge-region-keys: 200000tidb_servers: - host: 192.168.12.21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: "/opt/tidb-c1/tidb-14000"
log_dir: "/opt/tidb-c1/log/tidb-14000"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tidb` values. config: log.slow-query-file: tidb-slow-overwrited.log tikv-client.copr-cache.enable: true
tikv_servers: - host: 192.168.12.21 ssh_port: 22 port: 12160 status_ port: 12180 deploy_dir: "/opt/tidb-c1/tikv-12160"
data_dir: "/opt/tidb-c1/data/tikv-12160"
log_dir: "/opt/tidb-c1/log/ tikv-12160"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tikv` values. config: server.grpc-concurrency: 4 #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
#monitoring_servers:
# - host: 192.168.12.21
# ssh_port: 22
# port: 19090
# deploy_dir: "/opt/tidb-c1/prometheus-19090"
# data_dir: "/opt/tidb-c1/data/prometheus-19090"
# log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
# - host: 192.168.12.21
# port: 13000
# deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
# - host: 192.168.12.21
# ssh_port: 22
# web_port: 19093
# cluster_port: 19094
# deploy_dir: "/opt/tidb-c1/ alertmanager-19093"
# data_dir: "/opt/tidb-c1/data/alertmanager-19093"
# log_dir: "/opt/tidb-c1/log/alertmanager-19093"
TiDB Cluster Environment
This article does not focus on deploying TiDB Cluster, as a quick experimental environment, only a single-copy TiDB Cluster cluster is deployed on one machine. You do not need to deploy a monitoring environment.
[root@r20 topology]# tiup cluster display tidb-c1-v409Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/ tiup-cluster display tidb-c1-v409Cluster type: tidb
Cluster name: tidb-c1-v409Cluster version: v4.0.9SSH type: builtin
Dashboard URL: http://192.168.12.21:12379/dashboardID Role Host Ports OS/Arch Status Data Dir Deploy Dir-- ---- ---- ----- ------- ------ -------- ----------192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up| L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160Total nodes: 4Create
a table
mysql > for testing show create table t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+| Table | Create Table |+-------+-------------------------------------------------------------------------------------------------------------------------------+| t1 | CREATE TABLE `t1` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |+-------+-------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.00 sec)
Deploy the Zookeeper environment In this lab, the Zookeeper environment
is configured separately to serve the Kafka and Flink environments.
As an experimental demonstration protocol, only a stand-alone environment is deployed.
Unzip the Zookeeper package
[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[ root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper deploys jre for Zookeeper
[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
Modify the /opt/zookeeper/bin/zkEnv.sh file to add JAVA_HOME environment variables
## add bellowing env var in the head of zkEnv.shJAVA_HOME=/opt/zookeeper/jre
Create a configuration file
for Zookeeper [root@r24 conf]# cat zoo.cfg | grep -v "#" tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper/dataclientPort=2181
Start Zookeeper
[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start to
check the status of Zookeeper
## check zk status
[root@r24 bin]# ./zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper/bin/.. /conf/zoo.cfgClient port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone## check OS port status
[root@r24 bin]# netstat -ntlp Active Internet connections (only servers)Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program nametcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshdtcp6 0 0 :::2181 :::* LISTEN 15062/javatcp6 0 0 :::8080 :::* LISTEN 15062/javatcp6 0 0 :::22 :::* LISTEN 942/sshdtcp6 0 0 :::44505 :::* LISTEN 15062/java ## use zkCli tool to check zk connection
[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181
Suggestions for Zookeeper
I personally have a small piece of immature advice about
Zookeeper: the
Zookeeper cluster version must have network monitoring enabled.
In particular, pay attention to the network bandwidth in System Metrics.
Deployment Kafka
is
a distributed stream processing platform that is mainly used in two types of applications:
-
constructing real-time streaming data pipelines, It reliably acquires data between systems or applications. (equivalent to message queue). -
Build real-time streaming applications to transform or influence this streaming data. (i.e. stream processing, which varies internally between the Kafka Stream topic and the topic).

Kafka There are four core APIs:
-
The Producer API allows an application to publish a stream of data to one or more Kafka topics. -
The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them. -
The Streams API allows an application to act as a stream processor, consuming input streams generated by one or more topics, and then producing an output stream into one or more topics, effectively transforming the input and output streams. -
The Connector API allows you to build and run reusable producers or consumers, connecting Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.
In this experiment, only functional verification is performed, and only a stand-alone version of the Kafka environment is built.
Download and unzip Kafka
[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
[root@r22 soft]# MV kafka_2.13-2.7.0 /opt/kafka deploys jre[root@r22 soft]# tar vxzf for Kafka
jre1.8.0_281.tar.gz
[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre modifies Kafka's jre
environment variable
[root@r22 bin]. # vim /opt/kafka/bin/kafka-run-class.sh
## add bellowing line in the head of kafka-run-class.shJAVA_HOME=/opt/kafka/jre
modify Kafka configuration filemodify
Kafka configuration file /opt/kafka/config/server.properties## change bellowing variable in /opt/kafka/config/ server.properties
broker.id=0listeners=PLAINTEXT://192.168.12.22:9092log.dirs=/opt/kafka/logszookeeper.connect=i192.168.12.24:2181
Start kafka[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka
/config/server.properties
View Kafka version information Kafka does not provide the –version optional to view
Kafka’s version information.
[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
-rw-r--r-- 1 root root 4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc-rw-r--r-- 1 root root 41793 Dec 16 09:02 kafka_2.13-2.7.0- javadoc.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc-rw-r--r-- 1 root root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc... ...
where 2.13 is the version information of scale and 2.7.0 is the version information of Kafka.
Apache
Flink is a framework and distributed processing engine for stateful computation on borderless and bounded data streams. Flink runs in all common cluster environments and can compute at memory speed and at any scale.
Apache Flink, a distributed processing framework
that supports high throughput, low latency, and high performance, is a framework and distributed processing engine for stateful computing of unbounded and bounded data streams. Flink is designed to run in all common clustered environments, performing computations at in-memory execution speeds and at any scale.

This lab only performs functional testing and only deploys a stand-alone Flink environment.
Download and distribute Flink[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
[root@r23 soft]# mv flink-1.12.1 /opt/flink deploys Flink's
jre
[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
Add the lib required by Flink
Flink consumes Kafka data, requires flink-sql-connector-kafka package Flink to MySQL/TiDB, requires flink-connector-jdbc package [root@r23 soft]# mv flink-sql-connector-kafka_ 2.12-1.12.0.jar /opt/flink/lib/
[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
Modify Flink config file
## add or modify bellowing lines in /opt/flink/conf/flink-conf.yamljobmanager.rpc.address: 192.168.12.23env.java.home:
/opt/flink/jre
start Flink[root@r23 ~]# /opt/flink
/bin/ start-cluster.shStarting cluster. Starting standalonesession daemon on host r23. Starting taskexecutor daemon on host r23.
Check out the Flink GUI

deployment MySQL decompresses MySQL
package
[root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
[root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/
Create MySQL Service File
[root@r25 ~]# touch /opt/mysql/support-files/mysqld.service
[root@r25 support-files ]# cat mysqld.service[Unit]Description=MySQL 8.0 database serverAfter=syslog.targetAfter=network.target[Service]Type=simpleUser=mysqlGroup=mysql #ExecStartPre=/usr/libexec/mysql-check-socket
#ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n
# Note: we set --basedir to prevent probes that might trigger SELinux alarms,
# per bug #547485ExecStart=/opt/mysql/bin/mysqld_safe #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade
#ExecStopPost=/opt/mysql/bin/mysql-wait-stop
# Give a reasonable amount of time for the server to start up/shut downTimeoutSec=300# Place temp files in a secure directory, not /tmp
PrivateTmp=trueRestart= on-failureRestartPreventExitStatus=1# Sets open_files_limit
LimitNOFILE = 10000# Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.
Environment=MYSQLD_PARENT_PID=1[Install]WantedBy=multi-user.target## copy mysqld.service to /usr/lib/systemd/system/
[root@r25 support-files] # cp mysqld.service /usr/lib/systemd/system/
create my.cnf file
[root@r34 opt]. # cat /etc/my.cnf[mysqld]port=3306basedir=/opt/mysqldatadir=/opt/mysql/datasocket=/opt/mysql/data/mysql.socketmax_connections = 100default-storage-engine = InnoDBcharacter-set-server=utf8log-error = /opt/mysql/log/error.log
slow_query_log = 1long-query-time = 30slow_query_log_file = /opt/mysql/ log/show.log
min_examined_row_limit = 1000log-slow-slave-statements
log-queries-not-using-indexes
#skip-grant-tables
initialize and start MySQL
[root@r25 ~]# /opt/mysql/bin/mysqld -- initialize --user=mysql --console
[root@r25 ~]# chown -R mysql:mysql /opt/mysql
[root@r25 ~]# systemctl start mysqld
## check mysql temp passord from /opt/mysql/log/error.log
2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3> -
Create a new MySQL user to connect to Canal
## change mysql temp password firstly
mysql> alter user 'root'@'localhost' identified by 'mysql'; Query OK, 0 rows affected (0.00 sec)## create a management user 'root'@'%'
mysql> create user 'root'@'%' identified by 'mysql' ; Query OK, 0 rows affected (0.01 sec)mysql> grant all privileges on *.* to 'root'@'%';
Query OK, 0 rows affected (0.00 sec)## create a canal replication user 'canal'@'%'
mysql> create user 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected (0.01 sec)mysql> grant select, replication slave, replication client on *.* to 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)mysql> flush privileges; Query OK, 0 rows affected (0.00 sec)
Create a table
mysql > for testing in MySQL show create table test.t2; +-------+----------------------------------------------------------------------------------+| Table | Create Table |+-------+----------------------------------------------------------------------------------+| t2 | CREATE TABLE `t2` ( `id` int DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8 |+-------+----------------------------------------------------------------------------------+1 row in set (0.00 sec)
deployment The main purpose of Canal Canal
is to provide incremental data subscription and consumption based on MySQL database delta log parsing
In the early days, Alibaba had cross-data center synchronization business requirements due to the deployment of dual data centers in Hangzhou and the United States, and the implementation method was mainly based on business triggers to obtain incremental changes.
Since 2010, the business has gradually tried to parse database logs to obtain incremental changes for synchronization, resulting in a large number of incremental database subscription and consumption services.

Services based on incremental log subscription and consumption include:
-
real-time - backup index
-
construction and real-time maintenance of database mirror database (split heterogeneous index, inverted index, etc.) -
The
current canal MySQL versions supported by the source side of the canal include 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x
Unzip the Canal package
[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal
Deploy Canal’s jre
[root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre
## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh JAVA=/opt/canal/jre/bin/java
Modify the configuration file for Canal Modify the /opt/canal/conf/canal.properties configuration file
## modify bellowing configuration
canal.zkServers =192.168.12.24:2181canal.serverMode = kafkacanal.destinations = example ## You need to create an example folder in the /opt/canal/conf directory for storage Destination configuration
canal.mq.servers = 192.168.12.22:9092 modify /opt/canal/conf/example/instance.properties configuration file ## modify bellowing configuration
canal.instance.master.address=192.168.12.25:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.filter.regex=.*\\.. * ## Filter the database table
canal.mq.topic=canal-kafka,
configure data to
MySQL Binlog -> canal -> Kafka channel
view MySQL binlog information
to ensure that Binlog is a normal
mysql > show master status; +---------------+----------+--------------+------------------+-------------------+| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |+---------------+----------+--------------+------------------+-------------------+| binlog.000001 | 2888 | | | |+---------------+----------+--------------+------------------+-------------------+1 row in set (0.00 sec)
Create a topic in Kafka Create a
topic
canal-kafka in Kafka, the name of the topic should be the same as canal.mq.topic= in the Canal configuration file /opt/canal/conf/example/instance.properties Canal-kafka corresponds to:
[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \ > --zookeeper 192.168.12.24:2181 \> --config max.message.bytes=12800000 \> --config flush.messages=1 \> --replication-factor 1 \> --partitions 1 \> --topic canal-kafkaCreated topic canal-kafka. [2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager)
[2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)[2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false , min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false , retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition)
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)
View all of Kafka Topic:
[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181__consumer_ offsetscanal-kafkaticdc-test
See information about the topic ticdc-test in Kafka:
[root@r22 ~] # /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic canal-kafka Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1 Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr:
0 8.1.3 Starting Canal Before starting Canal, you need to check the port status on the Canal
node:
## check MySQL 3306 port
## canal.instance.master.address=192.168.12.25:3306
[root@r26 bin]# telnet 192.168.12.25 3306
## check Kafka 9092 port
## canal.mq.servers = 192.168.12.22:9092
[root@r26 bin]# telnet 192.168.12.22 9092
## check zookeeper 2181 port
## canal.zkServers = 192.168.12.24:2181[root@r26 bin]# telnet 192.168.12.24 2181
start Canal:
[root@r26 bin] # /opt/canal/bin/startup.sh
cd to /opt/canal/bin for workaround relative pathLOG CONFIGURATION : /opt/canal/bin/.. /conf/logback.xmlcanal conf : /opt/canal/bin/.. /conf/canal.propertiesCLASSPATH :/opt/canal/bin/.. /conf:/opt/canal/bin/.. /lib/zookeeper-3.4.5.jar:/opt/canal/bin/.. /lib/zkclient-0.10.jar:/opt/canal/bin/.. /lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/.. /lib/snakeyaml-1.19.jar:/opt/canal/bin/.. /lib/slf4j-api-1.7.12.jar:/opt/canal/bin/.. /lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient-0.4.0.jar:/opt/canal/bin/.. /lib/scala-reflect-2.11.12.jar:/opt/canal/bin/.. /lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/.. /lib/scala-library-2.11.12.jar:/opt/canal/bin/.. /lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/.. /lib/protobuf-java-3.6.1.jar:/opt/canal/bin/.. /lib/oro-2.0.8.jar:/opt/canal/bin/.. /lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/.. /lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/.. /lib/netty-3.2.2.Final.jar:/opt/canal/bin/.. /lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/.. /lib/metrics-core-2.2.0.jar:/opt/canal/bin/.. /lib/lz4-java-1.4.1.jar:/opt/canal/bin/.. /lib/logback-core-1.1.3.jar:/opt/canal/bin/.. /lib/logback-classic-1.1.3.jar:/opt/canal/bin/.. /lib/kafka-clients-1.1.1.jar:/opt/canal/bin/.. /lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/.. /lib/jsr305-3.0.2.jar:/opt/canal/bin/.. /lib/jopt-simple-5.0.4.jar:/opt/canal/bin/.. /lib/jctools-core-2.1.2.jar:/opt/canal/bin/.. /lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/.. /lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/.. /lib/jackson-databind-2.9.6.jar:/opt/canal/bin/.. /lib/jackson-core-2.9.6.jar:/opt/canal/bin/.. /lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/.. /lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/.. /lib/httpcore-4.4.3.jar:/opt/canal/bin/.. /lib/httpclient-4.5.1.jar:/opt/canal/bin/.. /lib/h2-1.4.196.jar:/opt/canal/bin/.. /lib/guava-18.0.jar:/opt/canal/bin/.. /lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/.. /lib/fastjson-1.2.58.jar:/opt/canal/bin/.. /lib/druid-1.1.9.jar:/opt/canal/bin/.. /lib/disruptor-3.4.2.jar:/opt/canal/bin/.. /lib/commons-logging-1.1.3.jar:/opt/canal/bin/.. /lib/commons-lang3-3.4.jar:/opt/canal/bin/.. /lib/commons-lang-2.6.jar:/opt/canal/bin/.. /lib/commons-io-2.4.jar:/opt/canal/bin/.. /lib/commons-compress-1.9.jar:/opt/canal/bin/.. /lib/commons-codec-1.9.jar:/opt/canal/bin/.. /lib/commons-cli-1.2.jar:/opt/canal/bin/.. /lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/.. /lib/canal.store-1.1.4.jar:/opt/canal/bin/.. /lib/canal.sink-1.1.4.jar:/opt/canal/bin/.. /lib/canal.server-1.1.4.jar:/opt/canal/bin/.. /lib/canal.protocol-1.1.4.jar:/opt/canal/bin/.. /lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/.. /lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/.. /lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/.. /lib/canal.parse-1.1.4.jar:/opt/canal/bin/.. /lib/canal.meta-1.1.4.jar:/opt/canal/bin/.. /lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/.. /lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/.. /lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/.. /lib/canal.filter-1.1.4.jar:/opt/canal/bin/.. /lib/canal.deployer-1.1.4.jar:/opt/canal/bin/.. /lib/canal.common-1.1.4.jar:/opt/canal/bin/.. /lib/aviator-2.2.1.jar:/opt/canal/bin/.. /lib/aopalliance-1.0.jar:cd to /opt/canal/bin for continue
View the Canal log
See the /opt/canal/logs/example/example
.log 2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=,timestamp=1614134832000] cost : 244ms , the next step is binlog dumpView
consumer information in Kafka
Insert a test message in MySQL:
mysql > insert into t2 values(1); Query OK, 1 row affected (0.00 sec)
Looking at the consumer’s information, there is already the test data just inserted:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning
{"data":null,"database": "test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null, "pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts" :1614151725890,"type":"QUERY"}
{"data":null,"database":"test","es":1614151746000, "id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql": "create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE" }
{"data":[{"id":"1"}],"database":"test","es":1614151941000,"id" :4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null," sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type": "INSERT"}
The Kafka -> Flink pathway
creates a t2 table
in Flink, connector type kafka
## create a test table t2 in FlinkFlink SQL> create table t2(id int)> WITH (> 'connector' = 'kafka',
> 'topic' = 'canal-kafka',
> 'properties.bootstrap.servers' = '192.168.12.22:9092',
> 'properties.group.id' = 'canal-kafka-consumer-group',
> ' format' = 'canal-json',
> 'scan.startup.mode' = 'latest-offset'> ); Flink SQL> select * from t1;
Insert a test data in MySQL:
mysql> insert into test.t2 values(2); Query OK, 1 row affected (0.00 sec)Data
can be synchronized in real time from Flink:
Flink SQL> select * from t1; Refresh: 1 s Page: Last of 1 Updated: 02:49:27.366 id 2
Flink -> TiDB path
Create a table for testing in TiDB downstream[root@r20 soft]# mysql -uroot -P14000 -hr21 mysql> create table
t3 (id int); Query OK, 0 rows affected (0.31 sec)
Create a test table
in Flink Flink SQL> CREATE TABLE t3 (> id int> ) with (> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.12.21:14000/test',
> 'table-name' = ' t3',
> 'username' = 'root',
> 'password' = 'mysql'> ); Flink SQL> insert into t3 values(3); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster:Job ID: a0827487030db177ee7e5c8575ef714e
View the inserted data in the downstream TiDB
mysql> select * from test.t3; +------+| id |+------+| 3 |+------+1 row in set (0.00 sec)This
article is transferred from: https://asktug.com/t/topic/68731
Author: I understand everything