跳到主要内容
版本:Next

SQL Server CDC

Sql Server CDC 源连接器

支持 SQL Server 版本

  • server:2019(或更高版本,仅供参考)

支持的引擎

SeaTunnel Zeta
Flink

主要功能

描述

Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量数据。本文档描述了如何设置 Sql Server CDC 连接器来对 SqlServer 数据库运行 SQL 查询。

支持的数据源信息

数据源支持版本驱动UrlMaven
SqlServer
  • server:2019(或更高版本,仅供参考)
  • com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433;databaseName=column_type_testhttps://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc

    需要的依赖项

    安装 Jdbc 驱动

    1. 你需要确保 jdbc 驱动 jar 包 已经放置在 ${SEATUNNEL_HOME}/plugins/ 目录中。

    对于 SeaTunnel Zeta 引擎

    1. 你需要确保 jdbc 驱动 jar 包 已经放置在 ${SEATUNNEL_HOME}/lib/ 目录中。

    数据类型映射

    SQLserver 数据类型SeaTunnel 数据类型
    CHAR
    VARCHAR
    NCHAR
    NVARCHAR
    TEXT
    NTEXT
    XML
    STRING
    BINARY
    VARBINARY
    IMAGE
    BYTES
    INTEGER
    INT
    INT
    SMALLINT
    TINYINT
    SMALLINT
    BIGINTBIGINT
    FLOAT(1~24)
    REAL
    FLOAT
    DOUBLE
    FLOAT(>24)
    DOUBLE
    NUMERIC(p,s)
    DECIMAL(p,s)
    MONEY
    SMALLMONEY
    DECIMAL(p, s)
    TIMESTAMPBYTES
    DATEDATE
    TIME(s)TIME(s)
    DATETIME(s)
    DATETIME2(s)
    DATETIMEOFFSET(s)
    SMALLDATETIME
    TIMESTAMP(s)
    BOOLEAN
    BIT
    BOOLEAN

    数据源参数

    名称类型是否必填默认值描述
    usernameString-连接数据库服务器时使用的数据库名称。
    passwordString-连接数据库服务器时使用的密码。
    database-namesList-要监控的数据库名称。
    table-namesList-表名是模式名和表名的组合 (databaseName.schemaName.tableName)。
    table-names-configList-表配置列表。例如:[{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]
    urlString-URL 必须包含数据库,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。
    startup.modeEnumINITIALSqlServer CDC 消费者的可选启动模式,有效枚举为 "initial"、"earliest"、"latest" 和 "specific"。
    startup.timestampLong-从指定的纪元时间戳(以毫秒为单位)开始。
    注意,当 "startup.mode" 选项使用 'timestamp' 时,此选项是必需的。
    startup.specific-offset.fileString-从指定的 binlog 文件名开始。
    注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。
    startup.specific-offset.posLong-从指定的 binlog 文件位置开始。
    注意,当 "startup.mode" 选项使用 'specific' 时,此选项是必需的。
    stop.modeEnumNEVERSqlServer CDC 消费者的可选停止模式,有效枚举为 "never"。
    stop.timestampLong-在指定的纪元时间戳(以毫秒为单位)停止。
    注意,当 "stop.mode" 选项使用 'timestamp' 时,此选项是必需的。
    stop.specific-offset.fileString-在指定的 binlog 文件名停止。
    注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。
    stop.specific-offset.posLong-在指定的 binlog 文件位置停止。
    注意,当 "stop.mode" 选项使用 'specific' 时,此选项是必需的。
    incremental.parallelismInteger1增量阶段中并行读取器的数量。
    snapshot.split.sizeInteger8096表快照的分割大小(行数),读取表快照时,捕获的表会被分割为多个分割。
    snapshot.fetch.sizeInteger1024读取表快照时每次轮询的最大获取大小。
    server-time-zoneStringUTC数据库服务器中的会话时区。
    connect.timeoutDuration30s连接器尝试连接到数据库服务器后在超时之前应该等待的最长时间。
    connect.max-retriesInteger3连接器应该重试建立数据库服务器连接的最大重试次数。
    connection.pool.sizeInteger20连接池大小。
    chunk-key.even-distribution.factor.upper-boundDouble100分块键分布因子的上界。此因子用于确定表数据是否均匀分布。如果计算的分布因子小于或等于此上界(即,(MAX(id) - MIN(id) + 1) / 行数),表分块将被优化以实现均匀分布。否则,如果分布因子较大,如果估计的分片数超过 sample-sharding.threshold 指定的值,表将被视为不均匀分布并使用基于采样的分片策略。默认值为 100.0。
    chunk-key.even-distribution.factor.lower-boundDouble0.05分块键分布因子的下界。此因子用于确定表数据是否均匀分布。如果计算的分布因子大于或等于此下界(即,(MAX(id) - MIN(id) + 1) / 行数),表分块将被优化以实现均匀分布。否则,如果分布因子较小,如果估计的分片数超过 sample-sharding.threshold 指定的值,表将被视为不均匀分布并使用基于采样的分片策略。默认值为 0.05。
    sample-sharding.thresholdint1000此配置指定了触发采样分片策略的估计分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-boundchunk-key.even-distribution.factor.lower-bound 指定的范围,并且估计的分片数(计算为近似行数 / 分块大小)超过此阈值时,将使用采样分片策略。这可以帮助更有效地处理大型数据集。默认值为 1000 分片。
    inverse-sampling.rateint1000采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则意味着在采样过程中应用 1/1000 的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。对于非常大的数据集,首选较低的采样率时,此选项特别有用。默认值为 1000。
    exactly_onceBooleanfalse启用精确一次语义。
    debezium.*config-将 Debezium 的属性传递给 Debezium Embedded Engine,用于捕获来自 SqlServer 服务器的数据变更。
    了解更多关于
    Debezium 的 SqlServer 连接器属性
    formatEnumDEFAULTSqlServer CDC 的可选输出格式,有效枚举为 "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON"。
    common-options-源插件通用参数,请参考 源通用选项 获取详细信息。

    启用 Sql Server CDC

    1. 检查 CDC 代理是否启用

    EXEC xp_servicecontrol N'querystate', N'SQLServerAGENT';
    如果结果是运行中,证明它已经启用。否则,您需要手动启用它

    1. 启用 CDC 代理

    /opt/mssql/bin/mssql-conf setup

    1. 结果如下

    1) 评估版(免费,无生产使用权,180天限制) 2) 开发者版(免费,无生产使用权) 3) 快速版(免费) 4) Web 版(付费) 5) 标准版(付费) 6) 企业版(付费) 7) 企业核心版(付费) 8) 我通过零售销售渠道购买了许可证,并有产品密钥要输入。

    1. 在数据库级别设置 CDC 在下面的数据库级别设置以启用 CDC。在此级别,启用 CDC 的数据库下的所有表都会自动启用 CDC

    USE TestDB; -- 替换为实际的数据库名称
    EXEC sys.sp_cdc_enable_db;
    SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'table'; -- table 替换为您要检查的表名

    任务示例

    初始读取简单示例

    这是一个流模式 CDC,初始化读取表数据,成功读取后将进行增量读取。以下 SQL DDL 仅供参考

    env {
    # 您可以在这里设置引擎配置
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    # 这是一个示例源插件 **仅用于测试和演示源插件功能**
    SqlServer-CDC {
    plugin_output = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="initial"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    }
    }

    transform {
    }

    sink {
    console {
    plugin_input = "customers"
    }
    }

    增量读取简单示例

    这是一个增量读取,读取变更的数据进行打印

    env {
    # 您可以在这里设置引擎配置
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    # 这是一个示例源插件 **仅用于测试和演示源插件功能**
    SqlServer-CDC {
    # 设置精确一次读取
    exactly_once=true
    plugin_output = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="latest"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    }
    }

    transform {
    }

    sink {
    console {
    plugin_input = "customers"
    }
    }

    支持表的自定义主键

    env {
    parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    }

    source {
    SqlServer-CDC {
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    username = "sa"
    password = "Y.sa123456"
    database-names = ["column_type_test"]

    table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"]
    table-names-config = [
    {
    table = "column_type_test.dbo.full_types"
    primaryKeys = ["id"]
    }
    ]
    }
    }

    sink {
    console {
    }
    }

    变更日志

    Change Log
    ChangeCommitVersion
    [Feature][Core] Add plugin directory support for each connector (#9650)https://github.com/apache/seatunnel/commit/4beb2b93362.3.12
    [improve] jdbc options (#9541)https://github.com/apache/seatunnel/commit/d041e5fb322.3.12
    [Feature][Connectors-v2] Optimize the size of CDC JAR Files (#9546)https://github.com/apache/seatunnel/commit/1dd19c68232.3.12
    [Improve][CDC] Extract duplicate code (#8906)https://github.com/apache/seatunnel/commit/b922bb90e62.3.10
    [Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
    [Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
    [Improve][Connector-V2] Add pre-check for table enable cdc (#8152)https://github.com/apache/seatunnel/commit/9a5da781762.3.9
    [Improve][Connector-V2] Fix SqlServer cdc memory leak (#8083)https://github.com/apache/seatunnel/commit/69cd4ae1a22.3.9
    [Feature][Connector-V2]Jdbc chunk split add snapshotSplitColumn config #7794 (#7840)https://github.com/apache/seatunnel/commit/b6c6dc04382.3.9
    [Feature][Core] Support cdc task ddl restore for zeta (#7463)https://github.com/apache/seatunnel/commit/8e322281ed2.3.9
    [Feature][Connector-V2] SqlServer support user-defined type (#7706)https://github.com/apache/seatunnel/commit/fb890332732.3.8
    [Improve][Connector-V2] Optimize sqlserver package structure (#7715)https://github.com/apache/seatunnel/commit/9720f118e52.3.8
    [Hotfix][CDC] Fix package name spelling mistake (#7415)https://github.com/apache/seatunnel/commit/469112fa642.3.8
    [Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740)https://github.com/apache/seatunnel/commit/c3ac9535242.3.6
    [Improve][CDC] Close idle subtasks gorup(reader/writer) in increment phase (#6526)https://github.com/apache/seatunnel/commit/454c339b9c2.3.6
    [Improve][JDBC Source] Fix Split can not be cancel (#6825)https://github.com/apache/seatunnel/commit/ee3b7c37232.3.6
    [Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read (#6684)https://github.com/apache/seatunnel/commit/868ba4d7c72.3.6
    [Improve] Improve read table schema in cdc connector (#6702)https://github.com/apache/seatunnel/commit/a8c6cc6e0c2.3.6
    [Improve][Jdbc] Add quote identifier for sql (#6669)https://github.com/apache/seatunnel/commit/849d748d3d2.3.5
    [Improve][CDC] Optimize split state memory allocation in increment phase (#6554)https://github.com/apache/seatunnel/commit/fe334221612.3.5
    [Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a52.3.5
    [Improve][CDC-Connector]Fix CDC option rule. (#6454)https://github.com/apache/seatunnel/commit/1ea27afa872.3.5
    [Improve][CDC] Optimize memory allocation for snapshot split reading (#6281)https://github.com/apache/seatunnel/commit/48566458372.3.5
    [Improve][API] Unify type system api(data & type) (#5872)https://github.com/apache/seatunnel/commit/b38c7edcc92.3.5
    [Improve] Support int identity type in sql server (#6186)https://github.com/apache/seatunnel/commit/1a8da1c8432.3.4
    [Feature][CDC] Support custom table primary key (#6106)https://github.com/apache/seatunnel/commit/1312a1dd272.3.4
    [Feature][CDC] Support read no primary key table (#6098)https://github.com/apache/seatunnel/commit/b42d78de3f2.3.4
    [Hotfix][Jdbc] Fix jdbc setFetchSize error (#6005)https://github.com/apache/seatunnel/commit/d41af8a6ed2.3.4
    [Bug][CDC] Fix state recovery error when switching a single table to multiple tables (#5784)https://github.com/apache/seatunnel/commit/37fcff347e2.3.4
    [Improve][CDC] Clean unused code (#5785)https://github.com/apache/seatunnel/commit/b5a66d3dbe2.3.4
    [Improve][Jdbc] Fix database identifier (#5756)https://github.com/apache/seatunnel/commit/dbfc8a670a2.3.4
    [improve][connector-v2][sqlserver-cdc]Unified sqlserver TypeUtils type conversion mode (#5668)https://github.com/apache/seatunnel/commit/75b814bc3d2.3.4
    [feature][connector-cdc-sqlserver] add dataType datetimeoffset (#5548)https://github.com/apache/seatunnel/commit/0cf63eed6d2.3.4
    [Improve] Remove catalog tag for config file (#5645)https://github.com/apache/seatunnel/commit/dc509aa0802.3.4
    [Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562)https://github.com/apache/seatunnel/commit/41173357f82.3.4
    [Imporve][CDC Base] Add a fast sampling method that supports character types (#5179)https://github.com/apache/seatunnel/commit/c0422dbfeb2.3.3
    [improve][CDC Base] Add some split parameters to the optionRule (#5161)https://github.com/apache/seatunnel/commit/94fd6755e62.3.3
    [Feature][Connector-V2][CDC] Support string type shard fields. (#5147)https://github.com/apache/seatunnel/commit/e1be9d7f8a2.3.3
    [Feature][CDC] Support tables without primary keys (with unique keys) (#163) (#5150)https://github.com/apache/seatunnel/commit/32b7f2b6902.3.3
    [Bugfix][zeta] Fix cdc connection does not close (#4922)https://github.com/apache/seatunnel/commit/a2d2f2dda82.3.3
    [Feature][CDC] Support disable/enable exactly once for INITIAL (#4921)https://github.com/apache/seatunnel/commit/6d9a3e59572.3.3
    [Improve][CDC]change driver scope to provider (#5002)https://github.com/apache/seatunnel/commit/745c0b9e922.3.3
    [Improve][CDC]Remove driver for cdc connector (#4952)https://github.com/apache/seatunnel/commit/b65f40c3c92.3.3
    [Bugfix][zeta] Fix the deadlock issue with JDBC driver loading (#4878)https://github.com/apache/seatunnel/commit/c30a2a1b1c2.3.2
    [improve][CDC base] Implement Sample-based Sharding Strategy with Configurable Sampling Rate (#4856)https://github.com/apache/seatunnel/commit/d827c700f02.3.2
    [Bugfix][CDC Base] Solving the ConcurrentModificationException caused by snapshotState being modified concurrently. (#4877)https://github.com/apache/seatunnel/commit/9a2efa51c72.3.2
    [Hotfix][CDC] Fix chunk start/end parameter type error (#4777)https://github.com/apache/seatunnel/commit/c13c0319952.3.2
    [Feature][CDC][SqlServer] Support multi-table read (#4377)https://github.com/apache/seatunnel/commit/c4e3f2dc032.3.2
    [Improve][CDC] Optimize jdbc fetch-size options (#4352)https://github.com/apache/seatunnel/commit/fbb60ce1be2.3.1
    [Improve][CDC] Improve startup.mode/stop.mode options (#4360)https://github.com/apache/seatunnel/commit/b71d8739d52.3.1
    [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)https://github.com/apache/seatunnel/commit/336f5904982.3.1
    Update CDC StartupMode and StopMode option to SingleChoiceOption (#4357)https://github.com/apache/seatunnel/commit/f60ac1a5e92.3.1
    [bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)https://github.com/apache/seatunnel/commit/ac61409bd82.3.1
    [improve][zeta] fix zeta bugshttps://github.com/apache/seatunnel/commit/3a82e8b39f2.3.1
    [Improve] Support MySqlCatalog Use JDBC URL With Custom Suffixhttps://github.com/apache/seatunnel/commit/210d0ff1f82.3.1
    Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
    [Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
    [improve][cdc] support sharding-tables (#4207)https://github.com/apache/seatunnel/commit/5c3f0c9b002.3.1
    [Hotfix][CDC] Fix multiple-table data read (#4200)https://github.com/apache/seatunnel/commit/7f5671d2ce2.3.1
    [Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
    [Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
    [Improve][Connector-V2][SQLServer-CDC] Add sqlserver cdc optionRule (#4019)https://github.com/apache/seatunnel/commit/78df5033922.3.1
    [Improve][CDC][base] Guaranteed to be exactly-once in the process of switching from SnapshotTask to IncrementalTask (#3837)https://github.com/apache/seatunnel/commit/8379aaf8762.3.1
    [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
    [Improve][CDC] Add mysql-cdc source factory (#3791)https://github.com/apache/seatunnel/commit/356538de8a2.3.1
    [feature][connector-v2] add sqlServer CDC (#3686)https://github.com/apache/seatunnel/commit/0f0afb58af2.3.0