TiDB CDC
TiDB CDC模式的连接器
支持的引擎
SeaTunnel Zeta
Flink
主要功能
描述
TiDB-CDC连接器允许从 TiDB 数据库读取快照数据和增量数据。本文将介绍如何设置 TiDB-CDC 连接器,在 TiDB 数据库中对数据进行快照和捕获流事件。
支持的数据源信息
数据源 | 支持的版本 | 驱动 | Maven |
---|---|---|---|
MySQL | com.mysql.cj.jdbc.Driver | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 | |
tikv-client-java | 3.2.0 | - | https://mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0 |
Using Dependency
安装驱动
在 Flink 引擎下
- 你需要确保 jdbc 驱动 jar 包 和 tikv-client-java jar 包 已经放在目录
${SEATUNNEL_HOME}/plugins/
。
在 SeaTunnel Zeta 引擎下
- 你需要确保 jdbc 驱动 jar 包 和 tikv-client-java jar 包 已经放在目录
${SEATUNNEL_HOME}/lib/
。
请下载 Mysql 驱动和 tikv-java-client 并将其放在 ${SEATUNNEL_HOME}/lib/
目录中。例如:
cp mysql-connector-java-xxx.jar ${SEATUNNEL_HOME}/lib/
数据类型映射
Mysql 数据类型 | SeaTunnel 数据类型 |
---|---|
BIT(1) TINYINT(1) | BOOLEAN |
TINYINT | TINYINT |
TINYINT UNSIGNED SMALLINT | SMALLINT |
SMALLINT UNSIGNED MEDIUMINT MEDIUMINT UNSIGNED INT INTEGER YEAR | INT |
INT UNSIGNED INTEGER UNSIGNED BIGINT | BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) |
DECIMAL(p, s) DECIMAL(p, s) UNSIGNED NUMERIC(p, s) NUMERIC(p, s) UNSIGNED | DECIMAL(p,s) |
FLOAT FLOAT UNSIGNED | FLOAT |
DOUBLE DOUBLE UNSIGNED REAL REAL UNSIGNED | DOUBLE |
CHAR VARCHAR TINYTEXT MEDIUMTEXT TEXT LONGTEXT ENUM JSON ENUM | STRING |
DATE | DATE |
TIME(s) | TIME(s) |
DATETIME TIMESTAMP(s) | TIMESTAMP(s) |
BINARY VARBINAR BIT(p) TINYBLOB MEDIUMBLOB BLOB LONGBLOB GEOMETRY | BYTES |
源选项
名称 | 类型 | 必需 | 默认 | 描述 |
---|---|---|---|---|
base-url | String | 是 | - | JDBC 连接的 URL,例如:jdbc:mysql://tidb0:4000/inventory 。 |
username | String | 是 | - | 连接数据库服务器时使用的用户名。 |
password | String | 是 | - | 连接数据库服务器时使用的密码。 |
pd-addresses | String | 是 | - | TiKV 集群的 PD 地址。 |
database-name | String | 是 | - | 要监控的数据库名称。 |
table-name | String | 是 | - | 要监控的表名称。表名称需要包含数据库名称。 |
startup.mode | Enum | 否 | INITIAL | TiDB CDC 消费器的可选启动模式,可选值有 initial 、earliest 、latest 和 specific 。initial :启动时同步历史数据,然后同步增量数据。earliest :从最早的可用偏移量开始启动。latest :从最新的偏移量开始启动。specific :从用户提供的特定偏移量开始启动。 |
batch-size-per-scan | Int | 否 | 1000 | 每次扫描的大小。 |
tikv.grpc.timeout_in_ms | Long | 否 | - | TiKV GRPC 超时时间(毫秒)。 |
tikv.grpc.scan_timeout_in_ms | Long | 否 | - | TiKV GRPC 扫描超时时间(毫秒)。 |
tikv.batch_get_concurrency | Integer | 否 | - | TiKV GRPC 批量获取并发度。 |
tikv.batch_scan_concurrency | Integer | 否 | - | TiKV GRPC 批量扫描并发度。 |
任务示例
简单示例
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
TiDB-CDC {
plugin_output = "products_tidb_cdc"
base-url = "jdbc:mysql://tidb0:4000/inventory"
driver = "com.mysql.cj.jdbc.Driver"
tikv.grpc.timeout_in_ms = 20000
pd-addresses = "pd0:2379"
username = "root"
password = ""
database-name = "inventory"
table-name = "products"
}
}
transform {
}
sink {
jdbc {
plugin_input = "products_tidb_cdc"
url = "jdbc:mysql://tidb0:4000/inventory"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = ""
database = "inventory"
table = "products_sink"
generate_sink_sql = true
primary_keys = ["id"]
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
[Feature] Support tidb cdc connector source #7199 (#7477) | https://github.com/apache/seatunnel/commit/87ec786bd6 | 2.3.8 |