background

This topic describes how to import MySQL data into Kafka in the form of Binlog + Canal, and then consume it by Flink.

In order to quickly verify the functionality of the entire process, all components are deployed as stand-alone machines. If you don’t have enough physical resources, you can build all the components in this article in a 4G 1U virtual machine environment.

If you need to deploy in a production environment, we recommend that you replace each component with a highly available cluster deployment solution.

Among them, we created a separate set of Zookeeper single-node environment, and Flink, Kafka, Canal and other components share this Zookeeper environment.

For all components that require a JRE, such as Flink, Kafka, Canal, Zookeeper, considering that upgrading the JRE may affect other applications, we choose to use each component independently of its own JRE environment.

This article is divided into two parts, the first seven sections mainly introduce the construction of the basic environment, and the last section introduces how data flows in each component.

The data flows through the following components:

  • MySQL data source
  • generates Binlog Canal

  • reads Binlog, generates Canal json, and pushes it to the topic specified by Kafka
  • Flink uses the flink-sql-connector-kafka API to consume data in a Kafka topic
  • Flink

  • writes data to TiDB + Flink in TiDB through flink-connector-jdbc

structure to support the development and running of many different kinds of applications.

At present, the main features mainly include:

  • batch flow integration
  • precision state management
  • event time support
  • Accurate first-time state consistency guarantee

Flink can run on a variety of resource management frameworks including YARN, Mesos, Kubernetes, and supports independent deployment on bare metal clusters. TiDB can be deployed on AWS, Kubernetes, GCP GKE, and also supports independent deployment on bare metal clusters using TiUP.

The common types of applications of the TiDB + Flink structure are as follows:

  • event-driven application
  • anti-fraud
  • anomaly detection
  • Rule-based alerting
  • Business process
  • monitoring data analysis

  • application
  • network quality monitoring
  • product update and test evaluation

    analysis

  • factual data ad-hoc analysis
  • Large-scale graph analysis
  • data pipeline applies
  • e-commerce

  • real-time query index to build e-commerce
  • continuous ETL

Environment IntroductionOperating

System Environment

[root@r20 topology]# cat /etc/ redhat-releaseCentOS Stream release 8

software environment

machine assignment

Compared with

traditional stand-alone databases, TiDB

has the following advantages:

    pure distributed architecture, with good

  • scalability. Support elastic scaling
  • and

  • support SQL, expose MySQL network protocols to the outside world, and are compatible with most MySQL syntax, which can be directly replaced in most scenarios MySQL
  • supports high availability by default, and in the case of a few replica failures, the database itself can automatically repair and fail over, which is transparent to business
  • Support ACID transactions, friendly to some scenarios with strong consistent requirements, such as: bank transfer
  • has a rich tool chain ecology, covering data migration, synchronization, backup and other scenarios
  • in kernel design, TiDB The distributed database splits the overall architecture into multiple modules, and each module communicates with each other to form a complete TiDB system. The corresponding architecture diagram is as follows:

    In this article, we only do the simplest functional tests, so we deploy a set of single-node but replica TiDB, involving the following three modules

    :

    • TiDB Server: SQL layer, which exposes the connection endpoint of the MySQL protocol, is responsible for accepting client connections and executing SQL Parse and optimize, and finally generate a distributed execution plan.
    • PD (Placement Driver) Server: The meta-information management module of the entire TiDB cluster, responsible for storing the real-time data distribution of each TiKV node and the overall topology of the cluster, providing the TiDB Dashboard management and control interface, and assigning transaction IDs to distributed transactions.
    • TiKV Server: responsible for storing data, from the outside TiKV is a distributed key-value storage engine that provides transactions.

    TiUP Deployment Template File

    # # Global variables are applied to all deployments and used as the default value of
    # # the deployments if a specific deployment value is missing. global:

      user: "tidb"

      ssh_port: 22

      deploy_dir: "/opt/tidb-c1/"


      data_dir: "/opt/tidb-c1/data/"
    # # Monitored variables are applied to all the machines.
    #monitored:
    #  node_exporter_port: 19100
    #  blackbox_exporter_port: 39115
    #  deploy_dir: "/opt/tidb-c3/monitored"
    #  data_dir: "/opt/tidb-c3/data/monitored"
    #  log_dir: "/opt/tidb-c3/log/monitored"
    # # Server configs are used to specify the runtime configuration of TiDB components.
    # # All configuration items can be found in TiDB docs:
    # # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
    # # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
    # # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
    # # All configuration items use points to represent the hierarchy, e.g:
    # #   readpool.storage.use-unified-pool
    # #
    # # You can overwrite this configuration via the instance-level `config` field. server_configs:  tidb:    log.slow-threshold: 300

        binlog.enable: false


        binlog.ignore-error: false
        tikv-client.copr-cache.enable:  true  tikv:    server.grpc-concurrency: 4    raftstore.apply-pool-size: 2    raftstore.store-pool-size: 2    rocksdb.max-sub-compactions: 1

        storage.block-cache.capacity: " 16GB"

        readpool.unified.max-thread-count: 12

        readpool.storage.use-unified-pool: false


        readpool.coprocessor.use-unified-pool: true     raftdb.rate-bytes-per-sec: 0  pd:    schedule.leader-schedule-limit: 4    schedule.region-schedule-limit: 2048    schedule.replica-schedule-limit: 64pd_servers:  - host: 192.168.12.21    ssh_ port: 22

        name: "pd-2"

        client_port: 12379    peer_port: 12380

        deploy_dir: "/opt/tidb-c1/pd-12379"


        data_dir: "/opt/tidb-c1/data/ pd-12379"
        log_dir: "/opt/tidb-c1/log/pd-12379"
        numa_node: "0"
        # # The following configs are used to overwrite the `server_ configs.pd` values.    config:      schedule.max-merge-region-size: 20      schedule.max-merge-region-keys: 200000tidb_servers:  - host: 192.168.12.21    ssh_port: 22    port: 14000    status_port: 12080

        deploy_dir:  "/opt/tidb-c1/tidb-14000"


        log_dir: "/opt/tidb-c1/log/tidb-14000"
        numa_node: "0"
         # # The following configs are used to overwrite the `server_configs.tidb` values.    config:      log.slow-query-file: tidb-slow-overwrited.log

          tikv-client.copr-cache.enable: true

    tikv_servers:  - host: 192.168.12.21    ssh_port: 22    port: 12160    status_ port: 12180

        deploy_dir: "/opt/tidb-c1/tikv-12160"


        data_dir: "/opt/tidb-c1/data/tikv-12160"
        log_dir: "/opt/tidb-c1/log/ tikv-12160"
        numa_node: "0"
        # # The following configs are used to overwrite the `server_configs.tikv` values.    config:      server.grpc-concurrency: 4

          #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }


    #monitoring_servers:
    #  - host: 192.168.12.21
    #    ssh_port: 22
    #    port: 19090
    #    deploy_dir: "/opt/tidb-c1/prometheus-19090"
    #    data_dir: "/opt/tidb-c1/data/prometheus-19090"
    #    log_dir: "/opt/tidb-c1/log/prometheus-19090"
    #grafana_servers:
    #  - host: 192.168.12.21
    #    port: 13000
    #    deploy_dir: "/opt/tidb-c1/grafana-13000"
    #alertmanager_servers:
    #  - host: 192.168.12.21
    #    ssh_port: 22
    #    web_port: 19093
    #    cluster_port: 19094
    #    deploy_dir: "/opt/tidb-c1/ alertmanager-19093"
    #    data_dir: "/opt/tidb-c1/data/alertmanager-19093"
    #    log_dir: "/opt/tidb-c1/log/alertmanager-19093"

    TiDB Cluster Environment

    This article does not focus on deploying TiDB Cluster, as a quick experimental environment, only a single-copy TiDB Cluster cluster is deployed on one machine. You do not need to deploy a monitoring environment.

    [root@r20 topology]# tiup cluster display tidb-c1-v409Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/ tiup-cluster display tidb-c1-v409

    Cluster type:       tidb

    Cluster name:       tidb-c1-v409Cluster version:    v4.0.9

    SSH type:           builtin

    Dashboard URL:      http://192.168.12.21:12379/dashboardID                   Role  Host           Ports        OS/Arch       Status   Data Dir                      Deploy Dir--                   ----  ----           -----        -------       ------   --------                      ----------192.168.12.21:12379  pd    192.168.12.21  12379/12380  linux/x86_64  Up| L|UI  /opt/tidb-c1/data/pd-12379    /opt/tidb-c1/pd-12379192.168.12.21:14000  tidb  192.168.12.21  14000/12080  linux/x86_64  Up       -                             /opt/tidb-c1/tidb-14000192.168.12.21:12160  tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160Total nodes: 4Create

    a table

    mysql > for testing  show create table t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+| Table | Create Table                                                                                                                  |+-------+-------------------------------------------------------------------------------------------------------------------------------+| t1    | CREATE TABLE `t1` (  `id` int(11) NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin  |+-------+-------------------------------------------------------------------------------------------------------------------------------+

    1 row in  set (0.00 sec)


    Deploy the Zookeeper environment In this lab, the Zookeeper environment

    is configured separately to serve the Kafka and Flink environments.

    As an experimental demonstration protocol, only a stand-alone environment is deployed.

    Unzip the Zookeeper package

    [root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
    [ root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper deploys jre for Zookeeper

     [root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
    [root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre

    Modify the /opt/zookeeper/bin/zkEnv.sh file to add JAVA_HOME environment variables

    ## add bellowing env var in the head of zkEnv.shJAVA_HOME=/opt/zookeeper/jre 

    Create a configuration file

     for Zookeeper [root@r24 conf]# cat zoo.cfg | grep -v "#" tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper/dataclientPort=2181

    Start Zookeeper

     [root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start to

    check the status of Zookeeper

     ## check zk status
    [root@r24 bin]# ./zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper/bin/.. /conf/zoo.cfg

    Client port found: 2181. Client address: localhost. Client SSL: false.

    Mode: standalone

    ## check OS port status


    [root@r24 bin]# netstat -ntlp Active Internet connections (only servers)Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program nametcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      942/sshdtcp6       0      0 :::2181                 :::*                    LISTEN      15062/javatcp6       0      0 :::8080                 :::*                    LISTEN      15062/javatcp6       0      0 :::22                   :::*                    LISTEN      942/sshdtcp6       0      0 :::44505                :::*                    LISTEN      15062/java

    ## use zkCli tool to check zk connection


    [root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181

    Suggestions for Zookeeper

    I personally have a small piece of immature advice about

    Zookeeper: the

    Zookeeper cluster version must have network monitoring enabled.

    In particular, pay attention to the network bandwidth in System Metrics.

    Deployment Kafka

    is

    a distributed stream processing platform that is mainly used in two types of applications:

    • constructing real-time streaming data pipelines, It reliably acquires data between systems or applications. (equivalent to message queue).
    • Build real-time streaming applications to transform or influence this streaming data. (i.e. stream processing, which varies internally between the Kafka Stream topic and the topic).

    Kafka There are four core APIs:

    • The Producer API allows an application to publish a stream of data to one or more Kafka topics.
    • The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them.
    • The Streams API allows an application to act as a stream processor, consuming input streams generated by one or more topics, and then producing an output stream into one or more topics, effectively transforming the input and output streams.
    • The Connector API allows you to build and run reusable producers or consumers, connecting Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.

    In this experiment, only functional verification is performed, and only a stand-alone version of the Kafka environment is built.

    Download and unzip Kafka

    [root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
    [root@r22 soft]# MV kafka_2.13-2.7.0 /opt/kafka deploys jre[root@r22 soft]# tar vxzf for Kafka

     jre1.8.0_281.tar.gz
    [root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre modifies Kafka's jre

    environment variable

    [root@r22 bin]. # vim /opt/kafka/bin/kafka-run-class.sh
    ## add bellowing line in the head of kafka-run-class.shJAVA_HOME=/opt/kafka/jre

    modify Kafka configuration filemodify

     Kafka configuration file /opt/kafka/config/server.properties#

    # change bellowing variable in /opt/kafka/config/ server.properties

    broker.id=0listeners=PLAINTEXT://192.168.12.22:9092log.dirs=/opt/kafka/logszookeeper.connect=i192.168.12.24:2181

    Start kafka[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka

    /config/server.properties 

    View Kafka version information Kafka does not provide the –version optional to view

    Kafka’s version information.

     

    [root@r22 ~]# ll /opt/kafka/libs/ | grep kafka

    -rw-r--r-- 1 root root  4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc-rw-r--r-- 1 root root    41793 Dec 16 09:02 kafka_2.13-2.7.0- javadoc.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc-rw-r--r-- 1 root root   892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc... ...

    where 2.13 is the version information of scale and 2.7.0 is the version information of Kafka.

    Apache

    Flink is a framework and distributed processing engine for stateful computation on borderless and bounded data streams. Flink runs in all common cluster environments and can compute at memory speed and at any scale.

    Apache Flink, a distributed processing framework

    that supports high throughput, low latency, and high performance, is a framework and distributed processing engine for stateful computing of unbounded and bounded data streams. Flink is designed to run in all common clustered environments, performing computations at in-memory execution speeds and at any scale.

    This lab only performs functional testing and only deploys a stand-alone Flink environment.

    Download and distribute Flink[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz


    [root@r23 soft]# mv flink-1.12.1 /opt/flink deploys Flink's

    jre

    [root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
    [root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre

    Add the lib required by Flink

    Flink consumes Kafka data, requires flink-sql-connector-kafka package Flink to MySQL/TiDB, requires flink-connector-jdbc package 

    [root@r23 soft]# mv flink-sql-connector-kafka_ 2.12-1.12.0.jar /opt/flink/lib/


    [root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/

    Modify Flink config file

    ## add or modify bellowing lines in /opt/flink/conf/flink-conf.yamljobmanager.rpc.address: 192.168.12.23env.java.home:

    /opt/flink/jre

    start Flink[root@r23 ~]# /opt/flink

    /bin/ start-cluster.shStarting cluster. Starting standalonesession daemon on host r23. Starting taskexecutor daemon on host r23.

    Check out the Flink GUI

    deployment MySQL decompresses MySQL

    package

    [root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
    [root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/

    Create MySQL Service File

    [root@r25 ~]# touch /opt/mysql/support-files/mysqld.service
    [root@r25 support-files ]# cat mysqld.service[Unit]Description=MySQL 8.0 database serverAfter=syslog.targetAfter=network.target[Service]Type=simpleUser=mysqlGroup=mysql

    #ExecStartPre=/usr/libexec/mysql-check-socket


    #ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n
    # Note: we set --basedir to prevent probes that might trigger SELinux alarms,
    # per bug #547485ExecStart=/opt/mysql/bin/mysqld_safe

    #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade


    #ExecStopPost=/opt/mysql/bin/mysql-wait-stop
    # Give a reasonable amount of time for the server to start up/shut downTimeoutSec=300

    # Place temp files in a secure directory, not /tmp


    PrivateTmp=trueRestart= on-failureRestartPreventExitStatus=1

    # Sets open_files_limit

    LimitNOFILE = 10000

    # Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.

    Environment=MYSQLD_PARENT_PID=1[Install]WantedBy=multi-user.target

    ## copy mysqld.service to /usr/lib/systemd/system/


    [root@r25 support-files] # cp mysqld.service /usr/lib/systemd/system/

    create my.cnf file

    [root@r34 opt]. # cat /etc/my.cnf[mysqld]port=3306basedir=/opt/mysqldatadir=/opt/mysql/datasocket=/opt/mysql/data/mysql.socketmax_connections = 100default-storage-engine = InnoDBcharacter-set-server=utf8

    log-error = /opt/mysql/log/error.log

    slow_query_log = 1long-query-time = 30

    slow_query_log_file = /opt/mysql/ log/show.log

    min_examined_row_limit = 1000

    log-slow-slave-statements


    log-queries-not-using-indexes
    #skip-grant-tables

    initialize and start MySQL

    [root@r25 ~]# /opt/mysql/bin/mysqld -- initialize --user=mysql --console
    [root@r25 ~]# chown -R mysql:mysql /opt/mysql
    [root@r25 ~]# systemctl start mysqld
    ## check mysql temp passord from /opt/mysql/log/error.log
    2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3> -

    Create a new MySQL user to connect to Canal

    ## change mysql temp password firstly
    mysql> alter user 'root'@'localhost' identified by 'mysql'; Query OK, 0 rows affected (0.00 sec)

    ## create a management user 'root'@'%'


    mysql> create user 'root'@'%' identified by 'mysql' ; Query OK, 0 rows affected (0.01 sec)

    mysql> grant all privileges on *.* to 'root'@'%';

    Query OK, 0 rows affected (0.00 sec)

    ## create a canal replication user 'canal'@'%'


    mysql> create user 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected (0.01 sec)

    mysql> grant select, replication slave, replication client on *.* to 'canal'@'%';

    Query OK, 0 rows affected (0.00 sec)mysql> flush privileges; Query OK, 0 rows affected (0.00 sec)

    Create a table

    mysql > for testing in MySQL  show create table test.t2; +-------+----------------------------------------------------------------------------------+| Table | Create Table                                                                     |+-------+----------------------------------------------------------------------------------+| t2    | CREATE TABLE `t2` (  `id` int DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8 |+-------+----------------------------------------------------------------------------------+

    1 row in set (0.00 sec)


    deployment The main purpose of Canal Canal

    is to provide incremental data subscription and consumption based on MySQL database delta log parsing

    In the early days, Alibaba had cross-data center synchronization business requirements due to the deployment of dual data centers in Hangzhou and the United States, and the implementation method was mainly based on business triggers to obtain incremental changes.

    Since 2010, the business has gradually tried to parse database logs to obtain incremental changes for synchronization, resulting in a large number of incremental database subscription and consumption services.

    Services based on incremental log subscription and consumption include:

    • real-time
    • backup index
    • construction and real-time maintenance of database mirror database (split heterogeneous index, inverted index, etc.)
    • The

    current canal MySQL versions supported by the source side of the canal include 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x

    Unzip the Canal package

    [root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal

    Deploy Canal’s jre

    [root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz
    [root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre
    ## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh JAVA=/opt/canal/jre/bin/java

    Modify the configuration file for Canal Modify the /opt/canal/conf/canal.properties configuration file

     #

    # modify bellowing configuration

    canal.zkServers =192.168.12.24:2181canal.serverMode = kafka

    canal.destinations = example ## You need to create an example folder in the /opt/canal/conf directory for storage Destination configuration

    canal.mq.servers = 192.168.12.22:9092 modify /opt/canal/conf/example/instance.properties configuration file #

    # modify bellowing configuration

    canal.instance.master.address=192.168.12.25:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canal

    canal.instance.filter.regex=.*\\.. * ## Filter the database table

    canal.mq.topic=canal-kafka,

    configure data to

    MySQL Binlog -> canal -> Kafka channel

    view MySQL binlog information

    to ensure that Binlog is a normal

    mysql >  show master status; +---------------+----------+--------------+------------------+-------------------+| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |+---------------+----------+--------------+------------------+-------------------+| binlog.000001 |     2888 |              |                  |                   |+---------------+----------+--------------+------------------+-------------------+

    1 row in set (0.00 sec)


    Create a topic in Kafka Create a

    topic

    canal-kafka in Kafka, the name of the topic should be the same as canal.mq.topic= in the Canal configuration file /opt/canal/conf/example/instance.properties Canal-kafka corresponds to:

     [root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \ > --zookeeper 192.168.12.24:2181 \> --config max.message.bytes=12800000 \> --config flush.messages=1 \> --replication-factor 1 \> --partitions 1 \> --topic canal-kafkaCreated topic canal-kafka. 

    [2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager)

    [2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)

    [2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false , min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false , retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)


    [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition)
    [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)

    View all of Kafka Topic:

    [root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181__consumer_ offsetscanal-kafkaticdc-test

    See information about the topic ticdc-test in Kafka:

    [root@r22 ~] # /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181  --topic canal-kafka Topic: ticdc-test       PartitionCount: 1       ReplicationFactor: 1    Configs: max.message.bytes=12800000,flush.messages=1        Topic: ticdc-test       Partition: 0    Leader: 0       Replicas: 0     Isr:  
    0 8.1.3 Starting Canal Before starting Canal, you need to check the port status on the Canal

    node:

     ## check MySQL 3306 port 
    ## canal.instance.master.address=192.168.12.25:3306
    [root@r26 bin]# telnet 192.168.12.25 3306
    ## check Kafka 9092 port
    ## canal.mq.servers = 192.168.12.22:9092
    [root@r26 bin]# telnet 192.168.12.22 9092
    ## check zookeeper 2181 port
    ## canal.zkServers = 192.168.12.24:2181[root@r26 bin]# telnet 192.168.12.24 2181

    start Canal:

    [root@r26 bin] # /opt/canal/bin/startup.sh
    cd to /opt/canal/bin for  workaround relative pathLOG CONFIGURATION : /opt/canal/bin/.. /conf/logback.xmlcanal conf : /opt/canal/bin/.. /conf/canal.propertiesCLASSPATH :/opt/canal/bin/.. /conf:/opt/canal/bin/.. /lib/zookeeper-3.4.5.jar:/opt/canal/bin/.. /lib/zkclient-0.10.jar:/opt/canal/bin/.. /lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/.. /lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/.. /lib/snakeyaml-1.19.jar:/opt/canal/bin/.. /lib/slf4j-api-1.7.12.jar:/opt/canal/bin/.. /lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/.. /lib/simpleclient-0.4.0.jar:/opt/canal/bin/.. /lib/scala-reflect-2.11.12.jar:/opt/canal/bin/.. /lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/.. /lib/scala-library-2.11.12.jar:/opt/canal/bin/.. /lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/.. /lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/.. /lib/protobuf-java-3.6.1.jar:/opt/canal/bin/.. /lib/oro-2.0.8.jar:/opt/canal/bin/.. /lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/.. /lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/.. /lib/netty-3.2.2.Final.jar:/opt/canal/bin/.. /lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/.. /lib/metrics-core-2.2.0.jar:/opt/canal/bin/.. /lib/lz4-java-1.4.1.jar:/opt/canal/bin/.. /lib/logback-core-1.1.3.jar:/opt/canal/bin/.. /lib/logback-classic-1.1.3.jar:/opt/canal/bin/.. /lib/kafka-clients-1.1.1.jar:/opt/canal/bin/.. /lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/.. /lib/jsr305-3.0.2.jar:/opt/canal/bin/.. /lib/jopt-simple-5.0.4.jar:/opt/canal/bin/.. /lib/jctools-core-2.1.2.jar:/opt/canal/bin/.. /lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/.. /lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/.. /lib/jackson-databind-2.9.6.jar:/opt/canal/bin/.. /lib/jackson-core-2.9.6.jar:/opt/canal/bin/.. /lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/.. /lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/.. /lib/httpcore-4.4.3.jar:/opt/canal/bin/.. /lib/httpclient-4.5.1.jar:/opt/canal/bin/.. /lib/h2-1.4.196.jar:/opt/canal/bin/.. /lib/guava-18.0.jar:/opt/canal/bin/.. /lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/.. /lib/fastjson-1.2.58.jar:/opt/canal/bin/.. /lib/druid-1.1.9.jar:/opt/canal/bin/.. /lib/disruptor-3.4.2.jar:/opt/canal/bin/.. /lib/commons-logging-1.1.3.jar:/opt/canal/bin/.. /lib/commons-lang3-3.4.jar:/opt/canal/bin/.. /lib/commons-lang-2.6.jar:/opt/canal/bin/.. /lib/commons-io-2.4.jar:/opt/canal/bin/.. /lib/commons-compress-1.9.jar:/opt/canal/bin/.. /lib/commons-codec-1.9.jar:/opt/canal/bin/.. /lib/commons-cli-1.2.jar:/opt/canal/bin/.. /lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/.. /lib/canal.store-1.1.4.jar:/opt/canal/bin/.. /lib/canal.sink-1.1.4.jar:/opt/canal/bin/.. /lib/canal.server-1.1.4.jar:/opt/canal/bin/.. /lib/canal.protocol-1.1.4.jar:/opt/canal/bin/.. /lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/.. /lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/.. /lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/.. /lib/canal.parse-1.1.4.jar:/opt/canal/bin/.. /lib/canal.meta-1.1.4.jar:/opt/canal/bin/.. /lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/.. /lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/.. /lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/.. /lib/canal.filter-1.1.4.jar:/opt/canal/bin/.. /lib/canal.deployer-1.1.4.jar:/opt/canal/bin/.. /lib/canal.common-1.1.4.jar:/opt/canal/bin/.. /lib/aviator-2.2.1.jar:/opt/canal/bin/.. /lib/aopalliance-1.0.jar:

    cd to /opt/canal/bin for continue


    View the Canal log

    See the /opt/canal/logs/example/example

    .log 2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status

    2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=,timestamp=1614134832000] cost : 244ms , the next step is binlog dumpView


    consumer information in Kafka

    Insert a test message in MySQL:

    mysql >  insert into t2 values(1); Query OK, 1 row affected (0.00 sec)

    Looking at the consumer’s information, there is already the test data just inserted:

     /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning
    {"data":null,"database": "test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null, "pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts" :1614151725890,"type":"QUERY"}
    {"data":null,"database":"test","es":1614151746000, "id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql": "create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE" }
    {"data":[{"id":"1"}],"database":"test","es":1614151941000,"id" :4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null," sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type": "INSERT"}

    The Kafka -> Flink pathway

    creates a t2 table

    in Flink, connector type kafka

    ## create a test table t2 in FlinkFlink SQL> create table t2(id int)> WITH (

    >  'connector' = 'kafka',


    >  'topic' = 'canal-kafka',
    >   'properties.bootstrap.servers' = '192.168.12.22:9092',
    >  'properties.group.id' = 'canal-kafka-consumer-group',
    >  ' format' = 'canal-json',
    >  'scan.startup.mode' = 'latest-offset'> ); Flink SQL> select * from t1;

    Insert a test data in MySQL:

    mysql> insert into test.t2 values(2); Query OK, 1 row affected (0.00 sec)Data

    can be synchronized in real time from Flink:

    Flink SQL>  select * from t1; Refresh: 1 s                                                                                                             Page: Last of 1                                                                                                     Updated: 02:49:27.366 id 2

    Flink -> TiDB path

    Create a table for testing in TiDB downstream[root@r20 soft]# mysql -uroot -P14000 -hr21 mysql> create table
    t3 (id int); Query OK, 0 rows affected (0.31 sec)
    Create a test table
     in Flink Flink SQL> CREATE TABLE t3 (> id int> ) with (

    >      'connector' = 'jdbc',


    >     'url' = 'jdbc:mysql://192.168.12.21:14000/test',
    >     'table-name' = ' t3',
    >     'username' = 'root',
    >     'password' = 'mysql'> ); Flink SQL> insert into t3 values(3); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster:Job ID: a0827487030db177ee7e5c8575ef714e
    View the inserted data in the downstream TiDB
    mysql> select * from test.t3; +------+| id   |+------+|    3 |+------+

    1 row in set (0.00 sec)This


    article is transferred from: https://asktug.com/t/topic/68731
    Author: I understand everything