Skip to main content
Version: 2.3.8

TiDB CDC

TiDB CDC source connector

Support Those Engines

SeaTunnel Zeta
Flink

Key features

Description

The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to set up the TiDB CDC connector to snapshot data and capture streaming event in TiDB database.

Supported DataSource Info

DatasourceSupported versionsDriverUrlMaven
MySQL
  • MySQL: 5.5, 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • com.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306/testhttps://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28
    tikv-client-java3.2.0--https://mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0

    Using Dependency

    Install Jdbc Driver

    1. You need to ensure that the jdbc driver jar package and the tikv-client-java jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

    For SeaTunnel Zeta Engine

    1. You need to ensure that the jdbc driver jar package and the tikv-client-java jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

    Please download and put Mysql driver and tikv-java-client in ${SEATUNNEL_HOME}/lib/ dir. For example: cp mysql-connector-java-xxx.jar $SEATNUNNEL_HOME/lib/

    Data Type Mapping

    Mysql Data TypeSeaTunnel Data Type
    BIT(1)
    TINYINT(1)
    BOOLEAN
    TINYINTTINYINT
    TINYINT UNSIGNED
    SMALLINT
    SMALLINT
    SMALLINT UNSIGNED
    MEDIUMINT
    MEDIUMINT UNSIGNED
    INT
    INTEGER
    YEAR
    INT
    INT UNSIGNED
    INTEGER UNSIGNED
    BIGINT
    BIGINT
    BIGINT UNSIGNEDDECIMAL(20,0)
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    DECIMAL(p,s)
    FLOAT
    FLOAT UNSIGNED
    FLOAT
    DOUBLE
    DOUBLE UNSIGNED
    REAL
    REAL UNSIGNED
    DOUBLE
    CHAR
    VARCHAR
    TINYTEXT
    MEDIUMTEXT
    TEXT
    LONGTEXT
    ENUM
    JSON
    ENUM
    STRING
    DATEDATE
    TIME(s)TIME(s)
    DATETIME
    TIMESTAMP(s)
    TIMESTAMP(s)
    BINARY
    VARBINAR
    BIT(p)
    TINYBLOB
    MEDIUMBLOB
    BLOB
    LONGBLOB
    GEOMETRY
    BYTES

    Source Options

    NameTypeRequiredDefaultDescription
    base-urlStringYes-The URL of the JDBC connection. Refer to a case: jdbc:mysql://tidb0:4000/inventory.
    usernameStringYes-Name of the database to use when connecting to the database server.
    passwordStringYes-Password to use when connecting to the database server.
    pd-addressesStringYes-TiKV cluster's PD address
    database-nameStringYes-Database name of the database to monitor.
    table-nameStringYes-Table name of the database to monitor. The table name needs to include the database name.
    startup.modeEnumNoINITIALOptional startup mode for TiDB CDC consumer, valid enumerations are initial, earliest, latest and specific.
    initial: Synchronize historical data at startup, and then synchronize incremental data.
    earliest: Startup from the earliest offset possible.
    latest: Startup from the latest offset.
    specific: Startup from user-supplied specific offsets.
    tikv.grpc.timeout_in_msLongNo-TiKV GRPC timeout in ms.
    tikv.grpc.scan_timeout_in_msLongNo-TiKV GRPC scan timeout in ms.
    tikv.batch_get_concurrencyIntegerNo-TiKV GRPC batch get concurrency
    tikv.batch_scan_concurrencyIntegerNo-TiKV GRPC batch scan concurrency

    Task Example

    Simple

    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    # This is a example source plugin **only for test and demonstrate the feature source plugin**
    TiDB-CDC {
    result_table_name = "products_tidb_cdc"
    base-url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    tikv.grpc.timeout_in_ms = 20000
    pd-addresses = "pd0:2379"
    username = "root"
    password = ""
    database-name = "inventory"
    table-name = "products"
    }
    }

    transform {
    }

    sink {
    jdbc {
    source_table_name = "products_tidb_cdc"
    url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = ""
    database = "inventory"
    table = "products_sink"
    generate_sink_sql = true
    primary_keys = ["id"]
    }
    }

    Changelog

    • Add TiDB CDC Source Connector

    next version