SQL Server CDC
Sql Server CDC 源连接器
支持 SQL Server 版本
- server:2019(或更高版本,仅供参考)
支持的引擎
SeaTunnel Zeta
Flink
主要功能
描述
Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量数据。本文档描述了如何设置 Sql Server CDC 连接器来对 SqlServer 数据库运行 SQL 查询。
支持的数据源信息
| 数据源 | 支持版本 | 驱动 | Url | Maven |
|---|---|---|---|---|
| SqlServer | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433;databaseName=column_type_test | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
需要的依赖项
安装 Jdbc 驱动
对于 Spark/Flink 引擎
- 你需要确保 jdbc 驱动 jar 包 已经放置在
${SEATUNNEL_HOME}/plugins/目录中。
对于 SeaTunnel Zeta 引擎
- 你需要确保 jdbc 驱动 jar 包 已经放置在
${SEATUNNEL_HOME}/lib/目录中。
数据类型映射
| SQLserver 数据类型 | SeaTunnel 数据类型 |
|---|---|
| CHAR VARCHAR NCHAR NVARCHAR TEXT NTEXT XML | STRING |
| BINARY VARBINARY IMAGE | BYTES |
| INTEGER INT | INT |
| SMALLINT TINYINT | SMALLINT |
| BIGINT | BIGINT |
| FLOAT(1~24) REAL | FLOAT |
| DOUBLE FLOAT(>24) | DOUBLE |
| NUMERIC(p,s) DECIMAL(p,s) MONEY SMALLMONEY | DECIMAL(p, s) |
| TIMESTAMP | BYTES |
| DATE | DATE |
| TIME(s) | TIME(s) |
| DATETIME(s) DATETIME2(s) DATETIMEOFFSET(s) SMALLDATETIME | TIMESTAMP(s) |
| BOOLEAN BIT | BOOLEAN |
数据源参数
| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| username | String | 是 | - | 连接数据库服务器时使用的数据库名称。 |
| password | String | 是 | - | 连接数据库服务器时使用的密码。 |
| database-names | List | 是 | - | 要监控的数据库名称。 |
| table-names | List | 是 | - | 表名是模式名和表名的组合 (databaseName.schemaName.tableName)。 |
| table-names-config | List | 否 | - | 表配置列表。例如:[{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}] |
| url | String | 是 | - | URL 必须包含数据库,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。 |
| startup.mode | Enum | 否 | INITIAL | SqlServer CDC 消费者的可选启动模式,有效枚举为 "initial"、"earliest"、"latest" 和 "specific"。 |
| startup.timestamp | Long | 否 | - | 从指定的纪元时间戳(以毫秒为单位)开始。 注意,当 "startup.mode" 选项使用 'timestamp' 时,此选项是必需的。 |
| startup.specific-offset.file | String | 否 | - | 从指定的 binlog 文件名开始。 注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。 |
| startup.specific-offset.pos | Long | 否 | - | 从指定的 binlog 文件位置开始。 注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。 |
| stop.mode | Enum | 否 | NEVER | SqlServer CDC 消费者的可选停止模式,有效枚举为 "never"。 |
| stop.timestamp | Long | 否 | - | 在指定的纪元时间戳(以毫秒为单位)停止。 注意,当 "stop.mode" 选项使用 'timestamp' 时,此选项是必需的。 |
| stop.specific-offset.file | String | 否 | - | 在指定的 binlog 文件名停止。 注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。 |
| stop.specific-offset.pos | Long | 否 | - | 在指定的 binlog 文件位置停止。 注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。 |
| incremental.parallelism | Integer | 否 | 1 | 增量阶段中并行读取器的数量。 |
| snapshot.split.size | Integer | 否 | 8096 | 表快照的分割大小(行数),读取表快照时,捕获的表会被分割为多个分割。 |
| snapshot.fetch.size | Integer | 否 | 1024 | 读取表快照时每次轮询的最大获取大小。 |
| server-time-zone | String | 否 | UTC | 数据库服务器中的会话时区。 |
| connect.timeout | Duration | 否 | 30s | 连接器尝试连接到数据库服务器后在超时之前应该等待的最长时间。 |
| connect.max-retries | Integer | 否 | 3 | 连接器应该重试建立数据库服务器连接的最大重试次数。 |
| connection.pool.size | Integer | 否 | 20 | 连接池大小。 |
| chunk-key.even-distribution.factor.upper-bound | Double | 否 | 100 | 分块键分布因子的上界。此因子用于确定表数据是否均匀分布。如果计算的分布因子小于或等于此上界(即,(MAX(id) - MIN(id) + 1) / 行数),表分块将被优化以实现均匀分布。否则,如果分布因子较大,如果估计的分片数超过 sample-sharding.threshold 指定的值,表将被视为不均匀分布并使用基于采样的分片策略。默认值为 100.0。 |
| chunk-key.even-distribution.factor.lower-bound | Double | 否 | 0.05 | 分块键分布因子的下界。此因子用于确定表数据是否均匀分布。如果计算的分布因子大于或等于此下界(即,(MAX(id) - MIN(id) + 1) / 行数),表分块将被优化以实现均匀分布。否则,如果分布因子较小,如果估计的分片数超过 sample-sharding.threshold 指定的值,表将被视为不均匀分布并使用基于采样的分片策略。默认值为 0.05。 |
| sample-sharding.threshold | int | 否 | 1000 | 此配置指定了触发采样分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-bound 和 chunk-key.even-distribution.factor.lower-bound 指定的范围,并且估计的分片数(计算为近似行数 / 分块大小)超过此阈值时,将使用采样分片策略。这可以帮助更有效地处理大型数据集。默认值为 1000 分片。 |
| inverse-sampling.rate | int | 否 | 1000 | 采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则意味着在采样过程中应用 1/1000 的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。对于非常大的数据集,首选较低的采样率时,此选项特别有用。默认值为 1000。 |
| exactly_once | Boolean | 否 | false | 启用精确一次语义。 |
| debezium.* | config | 否 | - | 将 Debezium 的属性传递给 Debezium Embedded Engine,用于捕获来自 SqlServer 服务器的数据变更。 了解更多关于 Debezium 的 SqlServer 连接器属性 |
| format | Enum | 否 | DEFAULT | SqlServer CDC 的可选输出格式,有效枚举为 "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON"。 |
| common-options | 否 | - | 源插件通用参数,请参考 源通用选项 获取详细信息。 |
启用 Sql Server CDC
- 检查 CDC 代理是否启用
EXEC xp_servicecontrol N'querystate', N'SQLServerAGENT';
如果结果是运行中,证明它已经启用。否则,您需要手动启用它
- 启用 CDC 代理
/opt/mssql/bin/mssql-conf setup
- 结果如下
1) 评估版(免费,无生产使用权,180天限制) 2) 开发者版(免费,无生产使用权) 3) 快速版(免费) 4) Web 版(付费) 5) 标准版(付费) 6) 企业版(付费) 7) 企业核心版(付费) 8) 我通过零售销售渠道购买了许可证,并有产品密钥要输入。
- 在数据库级别设置 CDC 在下面的数据库级别设置以启用 CDC。在此级别,启用 CDC 的数据库下的所有表都会自动启用 CDC
USE TestDB; -- 替换为实际的数据库名称
EXEC sys.sp_cdc_enable_db;
SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'table'; -- table 替换为您要检查的表名
任务示例
初始读取简单示例
这是一个流模式 CDC,初始化读取表数据,成功读取后将进行增量读取。以下 SQL DDL 仅供参考
env {
# 您可以在这里设置引擎配置
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# 这是一个示例源插件 **仅用于测试和演示源插件功能**
SqlServer-CDC {
plugin_output = "customers"
username = "sa"
password = "Y.sa123456"
startup.mode="initial"
database-names = ["column_type_test"]
table-names = ["column_type_test.dbo.full_types"]
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
}
}
transform {
}
sink {
console {
plugin_input = "customers"
}
}
增量读取简单示例
这是一个增量读取,读取变更的数据进行打印
env {
# 您可以在这里设置引擎配置
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# 这是一个示例源插件 **仅用于测试和演示源插件功能**
SqlServer-CDC {
# 设置精确一次读取
exactly_once=true
plugin_output = "customers"
username = "sa"
password = "Y.sa123456"
startup.mode="latest"
database-names = ["column_type_test"]
table-names = ["column_type_test.dbo.full_types"]
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
}
}
transform {
}
sink {
console {
plugin_input = "customers"
}
}
支持表的自定义主键
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
SqlServer-CDC {
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
username = "sa"
password = "Y.sa123456"
database-names = ["column_type_test"]
table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"]
table-names-config = [
{
table = "column_type_test.dbo.full_types"
primaryKeys = ["id"]
}
]
}
}
sink {
console {
}
}
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Feature][Core] Add plugin directory support for each connector (#9650) | https://github.com/apache/seatunnel/commit/4beb2b9336 | 2.3.12 |
| [improve] jdbc options (#9541) | https://github.com/apache/seatunnel/commit/d041e5fb32 | 2.3.12 |
| [Feature][Connectors-v2] Optimize the size of CDC JAR Files (#9546) | https://github.com/apache/seatunnel/commit/1dd19c6823 | 2.3.12 |
| [Improve][CDC] Extract duplicate code (#8906) | https://github.com/apache/seatunnel/commit/b922bb90e6 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Improve][Connector-V2] Add pre-check for table enable cdc (#8152) | https://github.com/apache/seatunnel/commit/9a5da78176 | 2.3.9 |
| [Improve][Connector-V2] Fix SqlServer cdc memory leak (#8083) | https://github.com/apache/seatunnel/commit/69cd4ae1a2 | 2.3.9 |
| [Feature][Connector-V2]Jdbc chunk split add snapshotSplitColumn config #7794 (#7840) | https://github.com/apache/seatunnel/commit/b6c6dc0438 | 2.3.9 |
| [Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281ed | 2.3.9 |
| [Feature][Connector-V2] SqlServer support user-defined type (#7706) | https://github.com/apache/seatunnel/commit/fb89033273 | 2.3.8 |
| [Improve][Connector-V2] Optimize sqlserver package structure (#7715) | https://github.com/apache/seatunnel/commit/9720f118e5 | 2.3.8 |
| [Hotfix][CDC] Fix package name spelling mistake (#7415) | https://github.com/apache/seatunnel/commit/469112fa64 | 2.3.8 |
| [Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740) | https://github.com/apache/seatunnel/commit/c3ac953524 | 2.3.6 |
| [Improve][CDC] Close idle subtasks gorup(reader/writer) in increment phase (#6526) | https://github.com/apache/seatunnel/commit/454c339b9c | 2.3.6 |
| [Improve][JDBC Source] Fix Split can not be cancel (#6825) | https://github.com/apache/seatunnel/commit/ee3b7c3723 | 2.3.6 |
| [Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read (#6684) | https://github.com/apache/seatunnel/commit/868ba4d7c7 | 2.3.6 |
| [Improve] Improve read table schema in cdc connector (#6702) | https://github.com/apache/seatunnel/commit/a8c6cc6e0c | 2.3.6 |
| [Improve][Jdbc] Add quote identifier for sql (#6669) | https://github.com/apache/seatunnel/commit/849d748d3d | 2.3.5 |
| [Improve][CDC] Optimize split state memory allocation in increment phase (#6554) | https://github.com/apache/seatunnel/commit/fe33422161 | 2.3.5 |
| [Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a5 | 2.3.5 |
| [Improve][CDC-Connector]Fix CDC option rule. (#6454) | https://github.com/apache/seatunnel/commit/1ea27afa87 | 2.3.5 |
| [Improve][CDC] Optimize memory allocation for snapshot split reading (#6281) | https://github.com/apache/seatunnel/commit/4856645837 | 2.3.5 |
| [Improve][API] Unify type system api(data & type) (#5872) | https://github.com/apache/seatunnel/commit/b38c7edcc9 | 2.3.5 |
[Improve] Support int identity type in sql server (#6186) | https://github.com/apache/seatunnel/commit/1a8da1c843 | 2.3.4 |
| [Feature][CDC] Support custom table primary key (#6106) | https://github.com/apache/seatunnel/commit/1312a1dd27 | 2.3.4 |
| [Feature][CDC] Support read no primary key table (#6098) | https://github.com/apache/seatunnel/commit/b42d78de3f | 2.3.4 |
| [Hotfix][Jdbc] Fix jdbc setFetchSize error (#6005) | https://github.com/apache/seatunnel/commit/d41af8a6ed | 2.3.4 |
| [Bug][CDC] Fix state recovery error when switching a single table to multiple tables (#5784) | https://github.com/apache/seatunnel/commit/37fcff347e | 2.3.4 |
| [Improve][CDC] Clean unused code (#5785) | https://github.com/apache/seatunnel/commit/b5a66d3dbe | 2.3.4 |
| [Improve][Jdbc] Fix database identifier (#5756) | https://github.com/apache/seatunnel/commit/dbfc8a670a | 2.3.4 |
| [improve][connector-v2][sqlserver-cdc]Unified sqlserver TypeUtils type conversion mode (#5668) | https://github.com/apache/seatunnel/commit/75b814bc3d | 2.3.4 |
| [feature][connector-cdc-sqlserver] add dataType datetimeoffset (#5548) | https://github.com/apache/seatunnel/commit/0cf63eed6d | 2.3.4 |
| [Improve] Remove catalog tag for config file (#5645) | https://github.com/apache/seatunnel/commit/dc509aa080 | 2.3.4 |
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562) | https://github.com/apache/seatunnel/commit/41173357f8 | 2.3.4 |
| [Imporve][CDC Base] Add a fast sampling method that supports character types (#5179) | https://github.com/apache/seatunnel/commit/c0422dbfeb | 2.3.3 |
| [improve][CDC Base] Add some split parameters to the optionRule (#5161) | https://github.com/apache/seatunnel/commit/94fd6755e6 | 2.3.3 |
| [Feature][Connector-V2][CDC] Support string type shard fields. (#5147) | https://github.com/apache/seatunnel/commit/e1be9d7f8a | 2.3.3 |
| [Feature][CDC] Support tables without primary keys (with unique keys) (#163) (#5150) | https://github.com/apache/seatunnel/commit/32b7f2b690 | 2.3.3 |
| [Bugfix][zeta] Fix cdc connection does not close (#4922) | https://github.com/apache/seatunnel/commit/a2d2f2dda8 | 2.3.3 |
| [Feature][CDC] Support disable/enable exactly once for INITIAL (#4921) | https://github.com/apache/seatunnel/commit/6d9a3e5957 | 2.3.3 |
| [Improve][CDC]change driver scope to provider (#5002) | https://github.com/apache/seatunnel/commit/745c0b9e92 | 2.3.3 |
| [Improve][CDC]Remove driver for cdc connector (#4952) | https://github.com/apache/seatunnel/commit/b65f40c3c9 | 2.3.3 |
| [Bugfix][zeta] Fix the deadlock issue with JDBC driver loading (#4878) | https://github.com/apache/seatunnel/commit/c30a2a1b1c | 2.3.2 |
| [improve][CDC base] Implement Sample-based Sharding Strategy with Configurable Sampling Rate (#4856) | https://github.com/apache/seatunnel/commit/d827c700f0 | 2.3.2 |
| [Bugfix][CDC Base] Solving the ConcurrentModificationException caused by snapshotState being modified concurrently. (#4877) | https://github.com/apache/seatunnel/commit/9a2efa51c7 | 2.3.2 |
| [Hotfix][CDC] Fix chunk start/end parameter type error (#4777) | https://github.com/apache/seatunnel/commit/c13c031995 | 2.3.2 |
| [Feature][CDC][SqlServer] Support multi-table read (#4377) | https://github.com/apache/seatunnel/commit/c4e3f2dc03 | 2.3.2 |
| [Improve][CDC] Optimize jdbc fetch-size options (#4352) | https://github.com/apache/seatunnel/commit/fbb60ce1be | 2.3.1 |
| [Improve][CDC] Improve startup.mode/stop.mode options (#4360) | https://github.com/apache/seatunnel/commit/b71d8739d5 | 2.3.1 |
| [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351) | https://github.com/apache/seatunnel/commit/336f590498 | 2.3.1 |
| Update CDC StartupMode and StopMode option to SingleChoiceOption (#4357) | https://github.com/apache/seatunnel/commit/f60ac1a5e9 | 2.3.1 |
| [bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327) | https://github.com/apache/seatunnel/commit/ac61409bd8 | 2.3.1 |
| [improve][zeta] fix zeta bugs | https://github.com/apache/seatunnel/commit/3a82e8b39f | 2.3.1 |
| [Improve] Support MySqlCatalog Use JDBC URL With Custom Suffix | https://github.com/apache/seatunnel/commit/210d0ff1f8 | 2.3.1 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [improve][cdc] support sharding-tables (#4207) | https://github.com/apache/seatunnel/commit/5c3f0c9b00 | 2.3.1 |
| [Hotfix][CDC] Fix multiple-table data read (#4200) | https://github.com/apache/seatunnel/commit/7f5671d2ce | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Improve][Connector-V2][SQLServer-CDC] Add sqlserver cdc optionRule (#4019) | https://github.com/apache/seatunnel/commit/78df503392 | 2.3.1 |
| [Improve][CDC][base] Guaranteed to be exactly-once in the process of switching from SnapshotTask to IncrementalTask (#3837) | https://github.com/apache/seatunnel/commit/8379aaf876 | 2.3.1 |
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
| [Improve][CDC] Add mysql-cdc source factory (#3791) | https://github.com/apache/seatunnel/commit/356538de8a | 2.3.1 |
| [feature][connector-v2] add sqlServer CDC (#3686) | https://github.com/apache/seatunnel/commit/0f0afb58af | 2.3.0 |