跳到主要内容
版本:Next

PostgreSQL CDC

PostgreSQL CDC 源连接器

支持的引擎

SeaTunnel Zeta
Flink

主要特性

描述

Postgre CDC 连接器允许从 Postgre 数据库读取快照数据和增量数据。本文件描述了如何设置 Postgre CDC 连接器,以便对 Postgre 数据库执行 SQL 查询。

支持的数据源信息

数据源支持的版本驱动UrlMaven
PostgreSQL不同的依赖版本有不同的驱动类。org.postgresql.Driverjdbc:postgresql://localhost:5432/test下载
PostgreSQL如果您想在 PostgreSQL 中操作 GEOMETRY/GEOGRAPHY 类型。org.postgresql.Driverjdbc:postgresql://localhost:5432/test下载

使用依赖

安装 Jdbc 驱动

  1. 您需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/plugins/ 中。

对于 SeaTunnel Zeta 引擎

  1. 您需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/lib/ 中。

请下载并将 PostgreSQL 驱动放入 ${SEATUNNEL_HOME}/lib/ 目录。例如:cp postgresql-xxx.jar $SEATUNNEL_HOME/lib/

以下是启用 PostgreSQL 中的 CDC(变化数据捕获)的步骤:

  1. 确保 wal_level 设置为 logical:通过在 postgresql.conf 配置文件中添加 "wal_level = logical" 来修改,重启 PostgreSQL 服务器以使更改生效。 或者,您可以使用 SQL 命令直接修改配置:
ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();
  1. 将指定表的 REPLICA 策略更改为 FULL,除非 require-replica-identity-full 设置为 false
ALTER TABLE your_table_name REPLICA IDENTITY FULL;

数据类型映射

PostgreSQL 数据类型SeaTunnel 数据类型
BOOL
BOOLEAN
_BOOL
ARRAY<BOOLEAN>
BYTEA
BYTES
_BYTEA
ARRAY<TINYINT>
INT2
SMALLSERIAL
INT4
SERIAL
INT
_INT2
_INT4
ARRAY<INT>
INT8
BIGSERIAL
BIGINT
_INT8
ARRAY<BIGINT>
FLOAT4
FLOAT
_FLOAT4
ARRAY<FLOAT>
FLOAT8
DOUBLE
_FLOAT8
ARRAY<DOUBLE>
NUMERIC(指定列的列大小>0)DECIMAL(指定列的列大小, 获取指定列小数点右侧的位数)
NUMERIC(指定列的列大小<0)DECIMAL(38, 18)
BPCHAR
CHARACTER
VARCHAR
TEXT
GEOMETRY
GEOGRAPHY
JSON
JSONB
STRING
_BPCHAR
_CHARACTER
_VARCHAR
_TEXT
ARRAY<STRING>
TIMESTAMP
TIMESTAMP
TIME
TIME
DATE
DATE
其他数据类型尚不支持

源选项

名称类型必需默认描述
urlString-JDBC 连接的 URL。参考案例:jdbc:postgresql://localhost:5432/postgres_cdc?loggerLevel=OFF
usernameString-连接到数据库服务器时使用的数据库名称。
passwordString-连接到数据库服务器时使用的密码。
database-namesList-需要监控的数据库名称。
table-namesList-需要监控的数据库表名称。表名称需要包含数据库名称,例如:database_name.table_name
table-names-configList-表配置列表。例如: [{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]。 snapshotSplitColumn 选项必须配置为唯一键(主键或唯一索引)。 如果指定了非唯一列,该配置将被忽略,SeaTunnel 会在内部自动选择合适的拆分列。
startup.modeListINITIALPostgreSQL CDC 消费者的可选启动模式,有效枚举为 initialearliestlatest
initial: 启动时同步历史数据,然后同步增量数据。
earliest: 从可能的最早偏移量启动。
latest: 从最新偏移量启动。
snapshot.split.sizeInteger8096表快照的拆分大小(行数),捕获的表在读取表快照时被拆分成多个拆分。
snapshot.fetch.sizeInteger1024读取表快照时每次轮询的最大获取大小。
slot.nameString-为特定数据库/模式创建的用于流式传输更改的 PostgreSQL 逻辑解码槽的名称。服务器使用此槽将事件流式传输到您正在配置的连接器。默认值为 seatunnel。
decoding.plugin.nameStringpgoutput安装在服务器上的 Postgres 逻辑解码插件的名称,支持的值有 decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2json_rds_streaming 和 pgoutput。
server-time-zoneStringUTC数据库服务器中的会话时区。如果未设置,则使用 ZoneId.systemDefault() 来确定服务器时区。
connect.timeout.msDuration30000连接器在尝试连接到数据库服务器后应等待的最大时间,以防超时。
connect.max-retriesInteger3连接器应重试建立数据库服务器连接的最大重试次数。
connection.pool.sizeInteger20JDBC 连接池大小。
chunk-key.even-distribution.factor.upper-boundDouble100块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即 (MAX(id) - MIN(id) + 1) / 行数),则将优化表块以实现均匀分布。否则,如果分布因子更大,则将认为该表分布不均匀,并且如果估计的分片数量超过 sample-sharding.threshold 指定的值,则将使用基于采样的分片策略。默认值为 100.0。
chunk-key.even-distribution.factor.lower-boundDouble0.05块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即 (MAX(id) - MIN(id) + 1) / 行数),则将优化表块以实现均匀分布。否则,如果分布因子更小,则将认为该表分布不均匀,并且如果估计的分片数量超过 sample-sharding.threshold 指定的值,则将使用基于采样的分片策略。默认值为 0.05。
sample-sharding.thresholdInteger1000此配置指定触发采样分片策略的估计分片数量阈值。当分布因子超出由 chunk-key.even-distribution.factor.upper-boundchunk-key.even-distribution.factor.lower-bound 指定的范围,且估计的分片数量(计算为近似行数 / 块大小)超过此阈值时,将使用采样分片策略。这可以帮助更有效地处理大数据集。默认值为 1000 个分片。
inverse-sampling.rateInteger1000在采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则意味着在采样过程中应用 1/1000 的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理非常大数据集时,较低的采样率尤为有用。默认值为 1000。
split.allow-samplingBooleantrue是否允许基于采样的分片策略。当设置为 false 时,无论预估分片数是否超过阈值,系统都将回退到非均匀分片方式(迭代查询方式)。默认值为 true。
exactly_onceBooleanfalse启用精确一次语义。
formatEnumDEFAULTPostgreSQL CDC 的可选输出格式,有效枚举为 DEFAULTCOMPATIBLE_DEBEZIUM_JSON
require-replica-identity-fullBooleantrue要求表具有 REPLICA IDENTITY FULL。设置为 false 时,允许表使用其他副本标识设置,但 UPDATE/DELETE 事件可能不包含之前的状态。此选项仅应用于仅追加的表(例如 outbox 模式)。默认为 true 以保持向后兼容性。
debeziumConfig-Debezium 的属性 传递给用于捕获 PostgreSQL 服务器数据更改的 Debezium 嵌入式引擎。
common-options-源插件的公共参数,请参阅 源公共选项 获取详细信息。

任务示例

简单

支持多表读取



env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}

source {
Postgres-CDC {
plugin_output = "customers_Postgre_cdc"
username = "postgres"
password = "postgres"
database-names = ["postgres_cdc"]
schema-names = ["inventory"]
table-names = ["postgres_cdc.inventory.postgres_cdc_table_1,postgres_cdc.inventory.postgres_cdc_table_2"]
url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
}
}

transform {

}

sink {
jdbc {
plugin_input = "customers_Postgre_cdc"
url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
driver = "org.postgresql.Driver"
username = "postgres"
password = "postgres"

generate_sink_sql = true
# You need to configure both database and table
database = postgres_cdc
schema = "inventory"
tablePrefix = "sink_"
primary_keys = ["id"]
}
}

支持自定义表的主键

source {
Postgres-CDC {
plugin_output = "customers_mysql_cdc"
username = "postgres"
password = "postgres"
database-names = ["postgres_cdc"]
schema-names = ["inventory"]
table-names = ["postgres_cdc.inventory.full_types_no_primary_key"]
url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
decoding.plugin.name = "decoderbufs"
exactly_once = false
table-names-config = [
{
table = "postgres_cdc.inventory.full_types_no_primary_key"
primaryKeys = ["id"]
}
]
}
}

常见问题

PostgreSQL CDC 需要哪些权限?

CDC 用户需要具备 REPLICATION 角色以及对监控表的 SELECT 权限:

CREATE USER replication_user REPLICATION LOGIN PASSWORD 'password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;

同时需要在 postgresql.conf 中设置 wal_level = logical,并在 pg_hba.conf 中添加允许该用户复制连接的条目。

支持哪些逻辑解码插件?

SeaTunnel PostgreSQL CDC 支持 pgoutput(PostgreSQL 10 起内置)、wal2jsondecoderbufs,默认使用 pgoutput。通过 decoding.plugin.name 参数指定插件。

SeaTunnel 能从 PostgreSQL 备库读取 CDC 数据吗?

不能。PostgreSQL 逻辑复制槽必须在主库上创建和消费,SeaTunnel 无法直接从备库读取逻辑复制槽,需将 CDC 连接器指向主库实例。

PostgreSQL CDC 是否支持无主键表?

默认需要主键。如果表有可作为唯一标识的列,可以通过 table-names-config 中的 primaryKeys 字段自定义主键。

复制槽如何管理?

SeaTunnel 在任务启动时会创建或复用 slot.name 指定的复制槽。未使用的复制槽会持续占用 磁盘上的 WAL 段,导致 WAL 持续增长。当 CDC 任务永久下线时,应在 PostgreSQL 侧手动删除 不再使用的复制槽。

PostgreSQL CDC 为什么会滞后?

滞后可能由逻辑解码插件处理慢或 WAL sender 负载过高引起。可通过监控 pg_replication_slots 中的 confirmed_flush_lsn 漂移情况来排查。确保 CDC 任务持续消费事件,并保持 SeaTunnel 与 PostgreSQL 之间的网络低延迟。

另请参阅

若需要一份面向生产的端到端实践指南,涵盖全量 + 增量同步生命周期、2PC sink 配置、Schema 演进与常见故障排查,请参阅 CDC 生产实战手册

变更日志