跳到主要内容
版本:Next

Oracle

JDBC Oracle Sink 连接器

支持这些引擎

Spark
Flink
SeaTunnel Zeta

描述

通过jdbc写入数据。支持批处理模式和流模式,支持并发写入,支持“精确一次” 语义(使用XA事务保证)。

依赖

  1. 您需要确保 jdbc driver jar package已经添加到目录 ${SEATUNNEL_HOME}/plugins/.

对于 SeaTunnel Zeta 引擎

  1. 您需要确保 jdbc driver jar package 已经添加到目录 ${SEATUNNEL_HOME}/lib/.

主要特性

使用“Xa事务”来确保“精确一次”。因此,数据库只支持“精确一次”,即 支持“Xa事务”。您可以设置is_exactly_once=true 来启用它。

支持的数据源信息

数据源支持的版本驱动器网址Maven下载链接
Oracle不同的依赖版本具有不同的驱动程序类。oracle.jdbc.OracleDriverjdbc:oracle:thin:@datasource01:1523:xehttps://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8

数据库依赖关系

请下载“Maven”对应的支持列表,并将其复制到“$SEATUNNEL_HOME/plugins/jdbc/lib/”工作目录
例如,Oracle数据源:cp ojdbc8-xxxx.jar$SEATUNNEL_HOME/lib/
要支持i18n字符集,请将orai18n.jar复制到$SEATUNNEL_HOME/lib/目录。

数据类型映射

Oracle 数据类型SeaTunnel 数据类型
INTEGERINT
FLOATDECIMAL(38, 18)
NUMBER(precision <= 9, scale == 0)INT
NUMBER(9 < precision <= 18, scale == 0)BIGINT
NUMBER(18 < precision, scale == 0)DECIMAL(38, 0)
NUMBER(scale != 0)DECIMAL(38, 18)
BINARY_DOUBLEDOUBLE
BINARY_FLOAT
REAL
FLOAT
CHAR
NCHAR
NVARCHAR2
VARCHAR2
LONG
ROWID
NCLOB
CLOB
STRING
DATEDATE
TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE
TIMESTAMP
BLOB
RAW
LONG RAW
BFILE
BYTES

参数

名称类型是否必填默认值描述
urlString-JDBC 连接的 URL。参见示例: jdbc:oracle:thin:@datasource01:1523:xe
driverString-用于连接远程数据源的 JDBC 类名,
如果使用 Oracle,值为 oracle.jdbc.OracleDriver
userString-连接实例用户名。
passwordString-连接实例密码。
queryString-使用此sql将上游输入数据写入数据库。例如: INSERT ...,query 具有更高的优先级
databaseString-使用此 databasetable-name 自动生成sql并接收上游输入数据写入数据库。
此选项与query 互斥,具有更高的优先级
tableString-使用数据库和此表名自动生成sql并接收上游输入数据写入数据库。
此选项与query 互斥,具有更高的优先级
primary_keysArray-此选项用于支持以下操作,例如 insert, delete, 和 update 当自动生成sql.
support_upsert_by_query_primary_key_existBooleanfalse选择使用INSERT sql、UPDATE sql根据查询主键是否存在来处理更新事件(INSERT、UPDATE_AFTER)。此配置仅在数据库不支持升级语法时使用:此方法性能低
connection_check_timeout_secInt30等待用于验证连接的数据库操作完成的时间(秒)。
max_retriesInt0提交失败的重试次数(executeBatch)
batch_sizeInt1000对于批量写入,当缓冲记录的数量达到“batch_size”的数量或时间达到“checkpoint.interval”
时,数据将被刷新到数据库中。
batch_interval_msInt1000对于批写入,当缓冲区的数量达到“batch_size”的数量或时间达到“batch-interval_ms”时,数据将被刷新到数据库中。
is_exactly_onceBooleanfalse是否启用精确一次语义,这将使用Xa事务。如果启用,则需要
设置xa_data_source_class_name
generate_sink_sqlBooleanfalse根据要写入的数据库表生成sql语句
xa_data_source_class_nameString-数据库Driver的xa数据源类名,例如Oracle,是`Oracle.jdbc.xa.client。OracleXADataSource和
请参阅附录了解其他数据源
max_commit_attemptsInt3事务提交失败的重试次数
transaction_timeout_secInt-1事务打开后的超时,默认值为-1(永不超时)。请注意,设置超时可能会影响<br/>精确一次语义
auto_commitBooleantrue默认情况下启用自动事务提交
propertiesMap-其他连接配置参数,当属性和URL具有相同的参数时,优先级由驱动程序的特定实现决定。例如,在MySQL中,属性优先于URL。
common-options-Sink插件常用参数,请参考 Sink Common Options
schema_save_modeEnumCREATE_SCHEMA_WHEN_NOT_EXIST在启动同步任务之前,对目标侧的现有表面结构选择不同的处理方案。
data_save_modeEnumAPPEND_DATA在启动同步任务之前,对目标端的现有数据选择不同的处理方案。
custom_sqlString-当data_save_mode选择CUSTOM_PROCESSING时,您应该填写CUSTOM_SQL参数。此参数通常填充可以执行的SQL。SQL将在同步任务之前执行。
enable_upsertBooleantrue通过primary_keys存在启用upstart,如果任务只有“插入”,将此参数设置为“false”可以加快数据导入

提示

如果未设置partition_column,它将以单并发运行,如果设置了partition_column,它将根据任务的并发数并行执行。

任务示例

简单的例子:

此示例定义了一个SeaTunnel同步任务,该任务通过FakeSource自动生成数据并将其发送到JDBC Sink。FakeSource总共生成16行数据(row.num=16),每行有两个字段,name(字符串类型)和age(int类型)。最终的目标表是test_table,表中也将有16行数据。在运行此作业之前,您需要在Oracle中创建测试数据库和表test_table。如果您尚未安装和部署SeaTunnel,则需要按照[安装SeaTunnel](../../start-v2/local/deployment.md)中的说明安装和部署SeaTunnel。然后按照[快速启动SeaTunnel引擎](../../Start-v2/locale/Quick-Start-SeaTunnel-Engine.md)中的说明运行此作业。

# 定义运行环境
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
#如果你想了解更多关于如何配置seatunnel的信息,并查看完整的源插件列表,
#请前往https://seatunnel.apache.org/docs/connector-v2/source
}

transform {
#如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表,
#请前往https://seatunnel.apache.org/docs/transform-v2
}

sink {
jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = root
password = 123456
query = "INSERT INTO TEST.TEST_TABLE(NAME,AGE) VALUES(?,?)"
}
#如果你想了解更多关于如何配置seatunnel的信息,并查看完整的sink插件列表,
#请前往https://seatunnel.apache.org/docs/connector-v2/sink
}

生成Sink SQL

此示例不需要编写复杂的sql语句,您可以配置数据库名称表名以自动为您生成add语句

sink {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = root
password = 123456

generate_sink_sql = true
database = XE
table = "TEST.TEST_TABLE"
}
}

精确一次:

为了准确的写入场景,我们保证一次准确

sink {
jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"

max_retries = 0
user = root
password = 123456
query = "INSERT INTO TEST.TEST_TABLE(NAME,AGE) VALUES(?,?)"

is_exactly_once = "true"

xa_data_source_class_name = "oracle.jdbc.xa.client.OracleXADataSource"
}
}

CDC(变更数据捕获)事件

我们也支持CDC更改数据。在这种情况下,您需要配置数据库、表和主键。

sink {
jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = root
password = 123456

generate_sink_sql = true
# You need to configure both database and table
database = XE
table = "TEST.TEST_TABLE"
primary_keys = ["ID"]
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}
}