跳到主要内容
版本:Next

TiDB CDC

TiDB CDC模式的连接器

支持的引擎

SeaTunnel Zeta
Flink

主要功能

Description

TiDB-CDC连接器允许从 TiDB 数据库读取快照数据和增量数据。本文将介绍如何设置 TiDB-CDC 连接器,在 TiDB 数据库中对数据进行快照和捕获流事件。

支持的数据源信息

数据源支持的版本驱动Maven
MySQL
  • MySQL: 5.5, 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • com.mysql.cj.jdbc.Driverhttps://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28
    tikv-client-java3.2.0-https://mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0

    Using Dependency

    安装驱动

    1. 你需要确保 [jdbc 驱动 jar 包](https:/mvnrepository.com/artifact/mysql/mysql-connector-java) 和 [tikv-client-java jar 包](https:/mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0) 已经放在目录 '${SEATUNNEL_HOME}/plugins/'.

    在 SeaTunnel Zeta 引擎下

    1. 你需要确保 [jdbc 驱动 jar 包](https:/mvnrepository.com/artifact/mysql/mysql-connector-java) 和 [tikv-client-java jar 包](https:/mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0) 已经放在目录 ${SEATUNNEL_HOME}/lib/ .

    请下载Mysql驱动和tikv-java-client并将其放在${SEATUNNEL_HOME}/lib/目录中。例如:cp mysql-connector-java-xxx.jar$SEATNUNNEL_HOME/lib/

    数据类型映射

    Mysql Data TypeSeaTunnel Data Type
    BIT(1)
    TINYINT(1)
    BOOLEAN
    TINYINTTINYINT
    TINYINT UNSIGNED
    SMALLINT
    SMALLINT
    SMALLINT UNSIGNED
    MEDIUMINT
    MEDIUMINT UNSIGNED
    INT
    INTEGER
    YEAR
    INT
    INT UNSIGNED
    INTEGER UNSIGNED
    BIGINT
    BIGINT
    BIGINT UNSIGNEDDECIMAL(20,0)
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    DECIMAL(p,s)
    FLOAT
    FLOAT UNSIGNED
    FLOAT
    DOUBLE
    DOUBLE UNSIGNED
    REAL
    REAL UNSIGNED
    DOUBLE
    CHAR
    VARCHAR
    TINYTEXT
    MEDIUMTEXT
    TEXT
    LONGTEXT
    ENUM
    JSON
    ENUM
    STRING
    DATEDATE
    TIME(s)TIME(s)
    DATETIME
    TIMESTAMP(s)
    TIMESTAMP(s)
    BINARY
    VARBINAR
    BIT(p)
    TINYBLOB
    MEDIUMBLOB
    BLOB
    LONGBLOB
    GEOMETRY
    BYTES

    Source Options

    NameTypeRequiredDefaultDescription
    base-urlStringYes-The URL of the JDBC connection. Refer to a case: jdbc:mysql://tidb0:4000/inventory.
    usernameStringYes-Name of the database to use when connecting to the database server.
    passwordStringYes-Password to use when connecting to the database server.
    pd-addressesStringYes-TiKV cluster's PD address
    database-nameStringYes-Database name of the database to monitor.
    table-nameStringYes-Table name of the database to monitor. The table name needs to include the database name.
    startup.modeEnumNoINITIALOptional startup mode for TiDB CDC consumer, valid enumerations are initial, earliest, latest and specific.
    initial: Synchronize historical data at startup, and then synchronize incremental data.
    earliest: Startup from the earliest offset possible.
    latest: Startup from the latest offset.
    specific: Startup from user-supplied specific offsets.
    tikv.grpc.timeout_in_msLongNo-TiKV GRPC timeout in ms.
    tikv.grpc.scan_timeout_in_msLongNo-TiKV GRPC scan timeout in ms.
    tikv.batch_get_concurrencyIntegerNo-TiKV GRPC batch get concurrency
    tikv.batch_scan_concurrencyIntegerNo-TiKV GRPC batch scan concurrency

    任务示例

    简单示例

    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    TiDB-CDC {
    result_table_name = "products_tidb_cdc"
    base-url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    tikv.grpc.timeout_in_ms = 20000
    pd-addresses = "pd0:2379"
    username = "root"
    password = ""
    database-name = "inventory"
    table-name = "products"
    }
    }

    transform {
    }

    sink {
    jdbc {
    source_table_name = "products_tidb_cdc"
    url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = ""
    database = "inventory"
    table = "products_sink"
    generate_sink_sql = true
    primary_keys = ["id"]
    }
    }