跳到主要内容
版本:Next

Oracle CDC

Oracle CDC 数据源连接器

支持的引擎

SeaTunnel Zeta
Flink

关键特性

描述

Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据。本文档描述了如何设置 Oracle CDC 连接器以针对 Oracle 数据库运行 SQL 查询。

注意

Debezium Oracle 连接器不依赖于连续挖掘(continuous mining)选项。该连接器负责检测日志切换并自动调整正在挖掘的日志,这正是连续挖掘选项自动为您完成的工作。 因此,您不能在 debezium 中设置名为 log.mining.continuous.mine 的属性。

支持的数据源信息

数据源支持的版本驱动类UrlMaven
Oracle不同的依赖版本有不同的驱动类。oracle.jdbc.OracleDriverjdbc:oracle:thin:@datasource01:1523:xehttps://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8

数据库依赖

安装 Jdbc 驱动

  1. 您需要确保 jdbc 驱动 jar 包 已放置在 ${SEATUNNEL_HOME}/plugins/ 目录下。
  2. 为了支持 i18n 字符集,请将 orai18n.jar 复制到 $SEATUNNEL_HOME/plugins/ 目录。

适用于 SeaTunnel Zeta 引擎

  1. 您需要确保 jdbc 驱动 jar 包 已放置在 ${SEATUNNEL_HOME}/lib/ 目录下。
  2. 为了支持 i18n 字符集,请将 orai18n.jar 复制到 $SEATUNNEL_HOME/lib/ 目录。

启用 Oracle Logminer

要在 Seatunnel 中使用 Logminer(Oracle 提供的内置工具)启用 Oracle CDC(变更数据捕获),请按照以下步骤操作:

在非 CDB(容器数据库)模式下启用 Logminer。

  1. 操作系统创建一个空的目录来存储 Oracle 归档日志和用户表空间。
mkdir -p /opt/oracle/oradata/recovery_area
mkdir -p /opt/oracle/oradata/ORCLCDB
chown -R oracle /opt/oracle/***
  1. 以管理员身份登录并启用 Oracle 归档日志。
sqlplus /nolog;
connect sys as sysdba;
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
archive log list;
  1. 以管理员身份登录并创建一个名为 logminer_user 的账户,密码为 "oracle",并授予其读取表和日志的权限。
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;

GRANT CREATE SESSION TO logminer_user;
GRANT SELECT ON V_$DATABASE to logminer_user;
GRANT SELECT ON V_$LOG TO logminer_user;
GRANT SELECT ON V_$LOGFILE TO logminer_user;
GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
GRANT SELECT ANY TRANSACTION TO logminer_user;
GRANT SELECT ON V_$TRANSACTION TO logminer_user;
注意:Oracle 11g 不支持以下命令
GRANT LOGMINING TO logminer_user;
仅授予需要采集的表的权限
GRANT SELECT ANY TABLE TO logminer_user;
GRANT ANALYZE ANY TO logminer_user;

在 Oracle CDB (容器数据库) + PDB (可插拔数据库) 模式下启用 Logminer

  1. 操作系统创建一个空的目录来存储 Oracle 归档日志和用户表空间。
mkdir -p /opt/oracle/oradata/recovery_area
mkdir -p /opt/oracle/oradata/ORCLCDB
mkdir -p /opt/oracle/oradata/ORCLCDB/ORCLPDB1
chown -R oracle /opt/oracle/***
  1. 以管理员身份登录并启用日志记录
sqlplus /nolog
connect sys as sysdba; # 密码: oracle
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
archive log list;
  1. 在 CDB 中执行
ALTER TABLE TEST.* ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE TEST.T2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  1. 创建 debeziume 账户

在 CDB 中操作

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;

在 PDB 中操作

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
  1. 在 CDB 中操作
sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba

CREATE USER c##dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs
CONTAINER=ALL;

GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;

GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;

GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;

GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
GRANT analyze any TO debeziume_1 CONTAINER=ALL;

exit;

数据类型映射

Oracle 数据类型SeaTunnel 数据类型
INTEGERINT
FLOATDECIMAL(38, 18)
NUMBER(precision <= 9, scale == 0)INT
NUMBER(9 < precision <= 18, scale == 0)BIGINT
NUMBER(18 < precision, scale == 0)DECIMAL(38, 0)
NUMBER(precision == 0, scale == 0)DECIMAL(38, 18)
NUMBER(scale != 0)DECIMAL(38, 18)
BINARY_DOUBLEDOUBLE
BINARY_FLOAT
REAL
FLOAT
CHAR
NCHAR
NVARCHAR2
VARCHAR2
LONG
ROWID
NCLOB
CLOB
STRING
DATEDATE
TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE
TIMESTAMP
BLOB
RAW
LONG RAW
BFILE
BYTES

源端选项

参数名称类型是否必选默认值描述
urlString-JDBC 连接的 URL。例如:jdbc:oracle:thin:datasource01:1523:xe
usernameString-连接数据库服务器时使用的数据库用户名。
passwordString-连接数据库服务器时使用的数据库密码。
database-namesList-要监控的数据库名称。
schema-namesList-要监控的数据库 Schema 名称。
table-namesList-要监控的数据库表名。表名需要包含数据库名,例如:database_name.table_name
table-names-configList-表配置列表。例如:[{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}]
startup.modeEnumINITIALOracle CDC 使用者的可选启动模式,有效枚举值为 initialearliestlatesttimestampspecific
initial:启动时同步历史数据,然后同步增量数据。
earliest:从尽可能早的偏移量启动。
latest:从最新的偏移量启动。
specific:从用户提供的特定偏移量启动。
startup.timestampLong-从指定的时间戳(自 Unix 纪元以来的毫秒数)启动。当 startup.mode = timestamp 时,该时间戳会按 server-time-zone 转换。注意,当 startup.mode 选项使用 timestamp 时,此选项是必需的。
startup.specific-offset.fileString-从指定的 binlog 文件名启动。注意,当 startup.mode 选项使用 specific 时,此选项是必需的。
startup.specific-offset.posLong-从指定的 binlog 文件位置启动。注意,当 startup.mode 选项使用 specific 时,此选项是必需的。
stop.modeEnumNEVEROracle CDC 使用者的可选停止模式,有效枚举值为 neverlatestspecific
never:实时任务不停止源。
latest:从最新的偏移量停止。
specific:从用户提供的特定偏移量停止。
stop.specific-offset.fileString-从指定的 binlog 文件名停止。注意,当 stop.mode 选项使用 specific 时,此选项是必需的。
stop.specific-offset.posLong-从指定的 binlog 文件位置停止。注意,当 stop.mode 选项使用 specific 时,此选项是必需的。
snapshot.split.sizeInteger8096表快照的拆分大小(行数),在读取表快照时,捕获的表将被拆分为多个拆分块。
snapshot.fetch.sizeInteger1024读取表快照时每次轮询的最大获取大小。
server-time-zoneStringUTC数据库服务器中的会话时区。如果未设置,则使用 ZoneId.systemDefault() 来确定服务器时区。该参数也用于将 startup.timestamp 转换为 SCN。若数据库时区与 JVM 时区不同,建议显式配置。
connect.timeout.msDuration30000连接器在尝试连接数据库服务器后超时的最大等待时间。
connect.max-retriesInteger3连接器尝试建立数据库服务器连接的最大重试次数。
connection.pool.sizeInteger20JDBC 连接池大小。
chunk-key.even-distribution.factor.upper-boundDouble100分块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即 (MAX(id) - MIN(id) + 1) / 行数),则表分块将针对均匀分布进行优化。否则,如果分布因子较大,则表将被视为分布不均,如果估计的分片数超过 sample-sharding.threshold 指定的值,则将使用基于采样的分片策略。默认值为 100.0。
chunk-key.even-distribution.factor.lower-boundDouble0.05分块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即 (MAX(id) - MIN(id) + 1) / 行数),则表分块将针对均匀分布进行优化。否则,如果分布因子较小,则表将被视为分布不均,如果估计的分片数超过 sample-sharding.threshold 指定的值,则将使用基于采样的分片策略。默认值为 0.05。
sample-sharding.thresholdInteger1000此配置指定触发采样分片策略的预估分片数阈值。当分布因子超出 chunk-key.even-distribution.factor.upper-boundchunk-key.even-distribution.factor.lower-bound 指定的范围,并且预估的分片数(计算为近似行数 / 分块大小)超过此阈值时,将使用采样分片策略。这有助于更有效地处理大型数据集。默认值为 1000 个分片。
inverse-sampling.rateInteger1000采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则意味着在采样过程中应用 1/1000 的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理首选较低采样率的极大型数据集时,它特别有用。默认值为 1000。
exactly_onceBooleanfalse启用精确一次语义。
use_select_countBooleanfalse使用 select count 统计表行数,而不是在全量阶段使用其他方法。在这种情况下,当通过分析表使用 SQL 更新统计信息更快时,直接使用 select count
skip_analyzeBooleanfalse在全量阶段跳过表行数的分析。在这种情况下,您需要定期调度分析表 SQL 以更新相关表统计信息,或者您的表数据更改不频繁。
formatEnumDEFAULTOracle CDC 的可选输出格式,有效枚举值为 DEFAULTCOMPATIBLE_DEBEZIUM_JSON
schema-changes.enabledBooleanfalseSchema 演进默认禁用。目前我们仅支持 add columndrop columnrename columnmodify column
debeziumConfig-透传 Debezium 属性 给 Debezium Embedded Engine,该引擎用于捕获 Oracle 服务器的数据更改。
common-options-源端插件常用参数,详情请参阅 源端常用选项
decimal_type_narrowingBooleantrue数值类型收缩,如果为 true,则在不损失精度的情况下,将 decimal 类型收缩为 int 或 long 类型。目前仅支持 Oracle。请参阅下文的 decimal_type_narrowing

decimal_type_narrowing

数值类型收缩,如果为 true,则在不损失精度的情况下,将 decimal 类型收缩为 int 或 long 类型。目前仅支持 Oracle。

例如:

decimal_type_narrowing = true

OracleSeaTunnel
NUMBER(1, 0)Boolean
NUMBER(6, 0)INT
NUMBER(10, 0)BIGINT

decimal_type_narrowing = false

OracleSeaTunnel
NUMBER(1, 0)Decimal(1, 0)
NUMBER(6, 0)Decimal(6, 0)
NUMBER(10, 0)Decimal(10, 0)

任务示例

简单示例

支持多表读取

source {
# 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
Oracle-CDC {
plugin_output = "customers"
username = "system"
password = "oracle"
database-names = ["XE"]
schema-names = ["DEBEZIUM"]
table-names = ["XE.DEBEZIUM.FULL_TYPES", "XE.DEBEZIUM.FULL_TYPES2"]
url = "jdbc:oracle:thin:@oracle-host:1521:xe"
source.reader.close.timeout = 120000
}
}

在全量阶段使用 select count(*) 代替 analysis table 来统计表行数

source {
# 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
Oracle-CDC {
plugin_output = "customers"
use_select_count = true
username = "system"
password = "oracle"
database-names = ["XE"]
schema-names = ["DEBEZIUM"]
table-names = ["XE.DEBEZIUM.FULL_TYPES"]
url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
source.reader.close.timeout = 120000
}
}

使用 select NUM_ROWS from all_tables 获取表行数,但跳过 analyze table 操作。

source {
# 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
Oracle-CDC {
plugin_output = "customers"
skip_analyze = true
username = "system"
password = "oracle"
database-names = ["XE"]
schema-names = ["DEBEZIUM"]
table-names = ["XE.DEBEZIUM.FULL_TYPES"]
url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
source.reader.close.timeout = 120000
}
}

支持表的自定义主键

source {
Oracle-CDC {
plugin_output = "customers"
url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
source.reader.close.timeout = 120000
username = "system"
password = "oracle"
database-names = ["XE"]
schema-names = ["DEBEZIUM"]
table-names = ["XE.DEBEZIUM.FULL_TYPES"]
table-names-config = [
{
table = "XE.DEBEZIUM.FULL_TYPES"
primaryKeys = ["ID"]
}
]
}
}

支持以兼容 debezium 的格式发送到 kafka

必须与 kafka 连接器 sink 配合使用,详情请参阅 兼容 debezium 格式

更新日志