> background This

topic describes how to import data in TiDB into Kafka through TiCDC and then consume it by Flink.

In order to quickly verify the functionality of the entire process, all components are deployed as stand-alone machines. If you need to deploy in a production environment, we recommend that you replace each component with a highly available cluster deployment solution.

Among them, we created a separate set of Zookeeper single-node environment, and Flink, Kafka, and other components share this Zookeeper environment.

For all components that require JRE, such as Flink, Kafka, Zookeeper, we choose to use each component independently of its own JRE environment, considering that upgrading the JRE may affect other applications.

This article is divided into two parts, the first five sections mainly introduce the basic environment construction, and the last section introduces how data flows in each component.

The application scenarios introduce the structure of TiDB + Flink, which supports the

development and operation of many different kinds of applications.

At present, the main features mainly include:

  • batch flow integration
  • and precision state management
  • event time support
  • Accurate first-time state consistency guarantee

Flink can run on a variety of resource management frameworks including YARN, Mesos, Kubernetes, and supports independent deployment on bare metal clusters. TiDB can be deployed on AWS, Kubernetes, GCP GKE, and also supports independent deployment on bare metal clusters using TiUP.

The common types of applications of the TiDB + Flink structure are as follows:

  • event-driven application
  • anti-fraud
  • anomaly detection
  • Rule-based alerting
  • Business process
  • monitoring data analysis

  • application
  • network quality monitoring
  • product update and test evaluation

    analysis

  • factual data ad-hoc analysis
  • Large-scale graph analysis
  • data pipeline applies
  • e-commerce

  • real-time query index to build e-commerce
  • continuous ETL

Operating System Environment

[root@r20 topology]# cat /etc/redhat-releaseCentOS Stream release 8

Software environment

machine assignment

Compared with traditional stand-alone databases, TiDB Cluster has the following advantages:

  • pure distributed architecture, good scalability, and support elastic scaling and contracting
  • Support SQL, expose MySQL network protocols to the outside world, and compatible with most MySQL syntax, can be directly replaced in most scenarios MySQL
  • supports high availability by default, in the case of a few replica failures, the database itself can automatically repair and fail over
  • , transparent

  • support for ACID transactions, For some scenarios with strong consistent requirements, for example: bank transfer
  • has a rich tool chain ecology, covering data migration, synchronization, backup and other scenarios

In terms of kernel design, TiDB distributed database splits the overall architecture into multiple modules, and each module communicates with each other to form a complete TiDB system. The corresponding architecture diagram is as follows:

In this article, we only do the simplest functional tests, so we deploy a set of single-node but replica TiDB, involving the following three modules

:

  • TiDB Server: SQL layer, which exposes the connection endpoint of the MySQL protocol, is responsible for accepting client connections and executing SQL Parse and optimize, and finally generate a distributed execution plan.
  • PD (Placement Driver) Server: The meta-information management module of the entire TiDB cluster, responsible for storing the real-time data distribution of each TiKV node and the overall topology of the cluster, providing the TiDB Dashboard management and control interface, and assigning transaction IDs to distributed transactions.
  • TiKV Server: responsible for storing data, from the outside TiKV is a distributed key-value storage engine that provides transactions.

TiUP Deployment Template File

 # # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing. global:

  user: "tidb"

  ssh_port: 22

  deploy_dir: "/opt/tidb-c1/"


  data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
#  node_exporter_port: 19100
#  blackbox_exporter_port: 39115
#  deploy_dir: "/opt/tidb-c3/monitored"
#  data_dir: "/opt/tidb-c3/data/monitored"
#  log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
# # All configuration items use points to represent the hierarchy, e.g:
# #   readpool.storage.use-unified-pool
# #
# # You can overwrite this configuration via the instance-level `config` field. server_configs:  tidb:    log.slow-threshold: 300

    binlog.enable: false


    binlog.ignore-error: false
    tikv-client.copr-cache.enable:  true  tikv:    server.grpc-concurrency: 4    raftstore.apply-pool-size: 2    raftstore.store-pool-size: 2    rocksdb.max-sub-compactions: 1

    storage.block-cache.capacity: " 16GB"

    readpool.unified.max-thread-count: 12

    readpool.storage.use-unified-pool: false


    readpool.coprocessor.use-unified-pool: true     raftdb.rate-bytes-per-sec: 0  pd:    schedule.leader-schedule-limit: 4    schedule.region-schedule-limit: 2048    schedule.replica-schedule-limit: 64pd_servers:  - host: 192.168.12.21    ssh_ port: 22

    name: "pd-2"

    client_port: 12379    peer_port: 12380

    deploy_dir: "/opt/tidb-c1/pd-12379"


    data_dir: "/opt/tidb-c1/data/ pd-12379"
    log_dir: "/opt/tidb-c1/log/pd-12379"
    numa_node: "0"
    # # The following configs are used to overwrite the `server_ configs.pd` values.    config:      schedule.max-merge-region-size: 20      schedule.max-merge-region-keys: 200000tidb_servers:  - host: 192.168.12.21    ssh_port: 22    port: 14000    status_port: 12080

    deploy_dir:  "/opt/tidb-c1/tidb-14000"


    log_dir: "/opt/tidb-c1/log/tidb-14000"
    numa_node: "0"
     # # The following configs are used to overwrite the `server_configs.tidb` values.    config:      log.slow-query-file: tidb-slow-overwrited.log

      tikv-client.copr-cache.enable: true

tikv_servers:  - host: 192.168.12.21    ssh_port: 22    port: 12160    status_ port: 12180

    deploy_dir: "/opt/tidb-c1/tikv-12160"


    data_dir: "/opt/tidb-c1/data/tikv-12160"
    log_dir: "/opt/tidb-c1/log/ tikv-12160"
    numa_node: "0"
    # # The following configs are used to overwrite the `server_configs.tikv` values.    config:      server.grpc-concurrency: 4

      #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }


#monitoring_servers:
#  - host: 192.168.12.21
#    ssh_port: 22
#    port: 19090
#    deploy_dir: "/opt/tidb-c1/prometheus-19090"
#    data_dir: "/opt/tidb-c1/data/prometheus-19090"
#    log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
#  - host: 192.168.12.21
#    port: 13000
#    deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
#  - host: 192.168.12.21
#    ssh_port: 22
#    web_port: 19093
#    cluster_port: 19094
#    deploy_dir: "/opt/tidb-c1/ alertmanager-19093"
#    data_dir: "/opt/tidb-c1/data/alertmanager-19093"
#    log_dir: "/opt/tidb-c1/log/alertmanager-19093"

TiDB Cluster Environment

This article does not focus on deploying TiDB Cluster, as a quick experimental environment, only a single-copy TiDB Cluster cluster is deployed on one machine. You do not need to deploy a monitoring environment.

[root@r20 topology]# tiup cluster display tidb-c1-v409Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/ tiup-cluster display tidb-c1-v409

Cluster type:       tidb

Cluster name:       tidb-c1-v409Cluster version:    v4.0.9

SSH type:           builtin

Dashboard URL:      http://192.168.12.21:12379/dashboardID                   Role  Host           Ports        OS/Arch       Status   Data Dir                      Deploy Dir--                   ----  ----           -----        -------       ------   --------                      ----------192.168.12.21:12379  pd    192.168.12.21  12379/12380  linux/x86_64  Up| L|UI  /opt/tidb-c1/data/pd-12379    /opt/tidb-c1/pd-12379192.168.12.21:14000  tidb  192.168.12.21  14000/12080  linux/x86_64  Up       -                             /opt/tidb-c1/tidb-14000192.168.12.21:12160  tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160Total nodes: 4Create

a table

mysql > for testing  show create table t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+| Table | Create Table                                                                                                                  |+-------+-------------------------------------------------------------------------------------------------------------------------------+| t1    | CREATE TABLE `t1` (  `id` int(11) NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin  |+-------+-------------------------------------------------------------------------------------------------------------------------------+

1 row in  set (0.00 sec)


Deploy the Zookeeper environment In this lab, the

Zookeeper environment

is configured separately to serve the Kafka and Flink environments.

As an experimental demonstration protocol, only a stand-alone environment is deployed.

Unzip the Zookeeper package

[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper deploys jre[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz for Zookeeper

 
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre

modifies the /opt/zookeeper/bin/zkEnv.sh file to add JAVA_HOME environment variables

 ## add bellowing env var in the head of zkEnv.shJAVA_HOME=/opt/zookeeper/jre Create Zookeeper's

configuration file

[root@r24 conf]# cat zoo.cfg | grep -v "#"tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper/dataclientPort=2181

Start Zookeeper

[root@r24 bin]. # /opt/zookeeper/bin/zkServer.sh start check

the status of Zookeeper

##

check zk status
[root@r24 bin]# ./ zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper/bin/.. /conf/zoo.cfg

Client port found: 2181. Client address: localhost. Client SSL: false.

Mode: standalone

## check OS port status


[root@r24 bin]# netstat -ntlp Active Internet connections (only servers)Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program nametcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      942/sshdtcp6       0      0 :::2181                 :::*                    LISTEN      15062/javatcp6       0      0 :::8080                 :::*                    LISTEN      15062/javatcp6       0      0 :::22                   :::*                    LISTEN      942/sshdtcp6       0      0 :::44505                :::*                    LISTEN      15062/java

## use zkCli tool to check zk connection


[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181

Advice about Zookeeper

I personally have a suggestion about Zookeeper’s immature tip:

The

Zookeeper cluster version must enable network monitoring.

In particular, pay attention to the network bandwidth in System Metrics.

Deployment Kafka

is a distributed stream processing platform that is mainly used in two types of applications:

  • constructing real-time streaming data pipelines that can reliably obtain data between systems or applications. (equivalent to message queue).
  • Build real-time streaming applications to transform or influence this streaming data. (i.e. stream processing, which varies internally between the Kafka Stream topic and the topic).

Kafka There are four core APIs:

  • The Producer API allows an application to publish a stream of data to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process streaming data published to them.
  • The Streams API allows an application to act as a stream processor, consuming input streams generated by one or more topics, and then producing an output stream into one or more topics, effectively transforming the input and output streams.
  • The Connector API allows you to build and run reusable producers or consumers, connecting Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to a table.

In this experiment, only functional verification is performed, and only a stand-alone version of the Kafka environment is built.

Download and unzip Kafka

[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz

[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka

部署用於 Kafka 的

jre[root@r22 soft]#

tar vxzf jre1.8.0_281.tar.gz[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre

Modify Kafka’s jre environment variable

[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh#
# add bellowing line in the head of kafka-run-class.shJAVA_HOME=

/opt/kafka/jre modify Kafka configuration

file modify Kafka configuration file /opt/kafka/config/server.properties

## change bellowing variable in /opt/kafka/config/server.properties

broker.id=0listeners=PLAINTEXT://192.168.12.22:9092log.dirs=/opt/kafka/logszookeeper.connect= i192.168.12.24:2181

starts Kafka

[root@r22 bin]# /opt/kafka/bin/

kafka-server-start.sh /opt/kafka/config/server.properties 

View Kafka version information

Kafka does not provide the --version optional to view Kafka's version information. 

[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka

-rw-r--r-- 1 root root  4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc-rw-r--r-- 1 root root    41793 Dec 16 09:02 kafka_2.13-2.7.0- javadoc.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc-rw-r--r-- 1 root root   892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar-rw-r--r-- 1 root root      821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc... ...

where 2.13 is the version information of scale and 2.7.0 is the version information of Kafka.

Apache

Flink is a framework and distributed processing engine for stateful computation on borderless and bounded data streams. Flink runs in all common cluster environments and can compute at memory speed and at any scale.

Apache Flink, a distributed processing framework

that supports high throughput, low latency, and high performance, is a framework and distributed processing engine for stateful computing of unbounded and bounded data streams. Flink is designed to run in all common clustered environments, performing computations at in-memory execution speeds and at any scale.

This lab only performs functional testing and only deploys a stand-alone Flink environment.

Download and distribute Flink

[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz

[root@r23 soft]# mv flink-1.12.1 /opt/flink

部署 Flink 的 jre

[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/ flink/jre

to add lib

 Flink to consume Kafka data, flink-sql-connector-kafka package Flink MySQL/TiDB, flink-connector-jdbc package is required

[root@r23 soft] # mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/


[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/

Modify the Flink configuration file

 ## Add or modify bellowing lines in/opt/flink/conf/flink-conf.yaml jobmanager.rpc.address: 192.168.12.23env.java.home: /opt/flink/jre

Start Flink

 [root@r23 ~]# /opt/flink/bin/start-cluster.shStarting cluster. Starting standalonesession daemon on host r23. Starting taskexecutor daemon on host r23.

Check out the Flink GUI

The TiCDC runtime is a stateless node that achieves high availability through etcd inside PD. TiCDC clusters support creating multiple synchronization tasks to synchronize data to different downstreams.

The system architecture of TiCDC is shown in the following figure

TiCDC System role

:

    >TiKV CDC component: outputs only key-value (KV) change log.

    • internal logic assembles KV change log.
    • Provides an interface for output KV change log, and sends data including real-time change log and incremental scan change log.
    • Capture: TiCDC runs the process, and multiple captures form a TiCDC cluster, which is responsible for the synchronization of KV change log.
  • Each capture is responsible for pulling a portion of the KV change log.

    • sort one or more KV change logs pulled.
    • Restore transactions downstream or output according to TiCDC Open Protocol.

Create Kafka Topic ticdc-test

[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --create \ > --zookeeper 192.168.12.24:2181 \> --config max.message.bytes=12800000 \> --config flush.messages=1 \> --replication-factor 1 \> --partitions 1 \> --topic ticdc-testCreated topic ticdc-test.

View all topic

[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181 ticdc-testview

Information about the Topic

ticdc-test in Kafka

[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic ticdc-test Topic: ticdc-test       PartitionCount: 1       ReplicationFactor: 1    Configs: max.message.bytes=12800000,flush.messages=1        Topic: ticdc-test       Partition: 0    Leader: 0       Replicas: 0     Isr:  0Create

Kafka’s changefeed in TiCDC Create a changefeed configuration file, open enable-old-value:## create a changefeed

 configuration file
[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
enable-old-value=true

Create Kafka’s

changefeed:

 ## create a changefeed for kafka
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed create \> --pd=http://192.168.12.21:12379  \

> --sink-uri= "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-old-value=true&protocol=canal-json"  \


> --changefeed-id="ticdc-kafka" \> --config=/opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.confCreate changefeed successfully!ID: ticdc-kafka

Info: {"sink-uri": "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\u0026artition-num=1\u0026max-message-bytes=67108864\u0026replication-factor=1\u0026enable-old-value=true\ u0026protocol=canal-json","opts":{"max-message-bytes":"67108864"},"create-time":"2021-02-22T00:0 8:50.185073755-05:00","start-ts":423092690661933057,"target-ts":0,"admin-job-type":0,"sort-engine":" memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value": true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":[ "*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{ "dispatchers":null,"protocol":"canal-json"},"cyclic-replication":{"enable":false," replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{" type":"table-number","polling-time":-1}},"state":"normal","history":null,"error": null,"sync-point-enabled":false,"sync-point-interval":600000000000}


[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/ cdc-changefeed-old-value-enabled.conf

where Kafka’s sink url parameter is configured as follows:

view what has been created changefeed:

[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 list[  {

    "id""ticdc-kafka",


    "summary": {
      "state""normal",
      "tso": 423092789699936258,
      " checkpoint": "2021-02-22 00:15:07.974",
"error": null } }]

View information about the ticdc-kafka changefeed:

[ root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 query -c ticdc-kafka{

  "info": {


    "sink-uri" "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\u0026artition-num=1\u0026max-message-bytes=67108864\u0026replication-factor=1\ u0026enable-old-value=true\u0026protocol=canal                                                                            -json",
    "opts": {
      "max-message-bytes""67108864"    },

    "create-time""2021-02-22T00:08:50.185073755-05:00",


    "start-ts": 423092690661933057,
     "target-ts": 0,
    "admin-job-type": 0,
    "sort-engine""memory",
    "sort-dir"".",
    " config": {
      "case-sensitive"true,
      "enable-old-value"true,
      "force-replicate" false,
      "check-gc-safe-point"true,
      "filter": {
        "rules": [
          "*.*"        ],

         "ignore-txn-start-ts": null,


        "ddl-allow-list": null      },

      "mounter": {


        "worker-num": 16      },

      "sink" : {


        "dispatchers": null,
        "protocol""canal-json"      },

      "cyclic-replication": {


        "enable"false,
        "replica-id": 0,
        "filter-replica-ids": null,
        "id-buckets": 0,
        "sync-ddl"false       },

      "scheduler": {


        "type""table-number",
        "polling-time": -1      }    },

    "state""normal" ,


    "history": null,
    "error": null,
    "sync-point-enabled"false,
    "sync-point-interval" : 600000000000  },

  "status": {


    "resolved-ts": 423093295690285057,
    "checkpoint-ts": 423093295428403201,
    " admin-job-type": 0 }, "

count": 0,


"task-status": []}View

consumer information in Kafka

After creating a Kafka changefeed in TiCDC and streaming data to the ticdc-test topic in Kafka, the TiCDC -> Kafka channel is established.

Insert a piece of data to test:

mysql> insert into t1 values(1); Query OK, 1 row affected (0.00 sec)You

can see the following information in the

 log output of TiCDC: [2021/02/22 01:14:02.816 -05:00] [INFO] [statistics.go:118] ["sink replication status"] [name=MQ] [changefeed=ticdc-kafka] [captureaddr=192.168.12.21:18300] [count=1] [qps=0]

At this time, look at Kafka’s consumer information, you can see that the data has come:

[root@r22 bin]# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic ticdc-test --from-beginning
{ "id":0,"database":"test","table":"t1","pkNames":["id"],"isDdl" :false,"type":"INSERT","es":1613974420325,"ts":0,"sql":"","sqlType" :{"id":-5},"mysqlType":{"id":"int"},"data":[{"id":"1"}],"old" :[null]}

Kafka -> The Flink pathIn Flink’s

sql-client, create the t1 table, and the connector uses the kafka type:

[root@r23 ~ ]# /opt/flink/bin/sql-client.sh embedded
## create a test table t1 in Flink SQL> create table t1(id int)> WITH (

>  'connector' = ' kafka',


>  'topic' = 'ticdc-test',
>  'properties.bootstrap.servers' = '192.168.12.22:9092',
>  ' properties.group.id' = 'cdc-test-consumer-group',
>  'format' = 'canal-json',
>  'scan.startup.mode' = ' latest-offset'> ); Flink SQL> select * from t1;

Insert data in TiDB

and query from Flink:

 ## insert a test row in TiDBmysql> insert into test.t1 values(4); Query OK, 1 row affected (0.00 sec)

## check the result from Flink

                                                                                             SQL Query Result (Table) Refresh: 1 s                                                                                    Page: Last of 1                                                                            Updated: 03:02:28.838                        id                         4

This article is transferred from: https://asktug.com/t/topic/68884
Author: Everyone who understands

knows end



public number (zhisheng) reply to Face, ClickHouse, ES, Flink, Spring, Java, Kafka, Monitoring < keywords such as span class="js_darkmode__148"> to view more articles corresponding to keywords.

like + Looking, less bugs 👇