跳到主要内容
版本:Next

OceanBase

JDBC OceanBase Sink 连接器

支持引擎

Spark
Flink
SeaTunnel Zeta

关键特性

描述

通过jdbc写入数据。支持批处理模式和流模式,支持并发写入,支持精确一次语义。

支持的数据源信息

数据源支持版本DriverUrlMaven
OceanBase所有OceanBase服务版本com.oceanbase.jdbc.Driverjdbc:oceanbase://localhost:2883/testDownload

数据库相关依赖

请下载“Maven”对应的支持列表,并将其复制到“$SEATUNNEL_HOME/plugins/jdbc/lib/”工作目录
例如: cp oceanbase-client-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

Mysql模式

Mysql Data typeSeaTunnel Data type
BIT(1)
INT UNSIGNED
BOOLEAN
TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR
INT
INT UNSIGNED
INTEGER UNSIGNED
BIGINT
BIGINT
BIGINT UNSIGNEDDECIMAL(20,0)
DECIMAL(x,y)(获取指定列的指定列大小<38)DECIMAL(x,y)
DECIMAL(x,y)(获取指定列的指定列大小>38)DECIMAL(38,18)
DECIMAL UNSIGNEDDECIMAL((获取指定列的指定列大小)+1,
(获取指定列小数点右侧的位数。)))
FLOAT
FLOAT UNSIGNED
FLOAT
DOUBLE
DOUBLE UNSIGNED
DOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
JSON
STRING
DATEDATE
TIMETIME
DATETIME
TIMESTAMP
TIMESTAMP
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BINARY
VARBINAR
BIT(n)
BYTES
GEOMETRY
UNK否WN
否t supported yet

Oracle 模式

Oracle Data typeSeaTunnel Data type
Number(p), p <= 9INT
Number(p), p <= 18BIGINT
Number(p), p > 18DECIMAL(38,18)
REAL
BINARY_FLOAT
FLOAT
BINARY_DOUBLEDOUBLE
CHAR
NCHAR
NVARCHAR2
NCLOB
CLOB
ROWID
STRING
DATEDATE
TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE
TIMESTAMP
BLOB
RAW
LONG RAW
BFILE
BYTES
UNK否WN否t supported yet

Sink 选项

NameTypeRequiredDefaultDescription
urlString-JDBC连接的URL。参考案例: jdbc:oceanbase://localhost:2883/test
driverString-用于连接到远程数据源的jdbc类名应为 com.oceanbase.jdbc.Driver.
userString-连接实例用户名
passwordString-连接实例密码
queryString-使用此sql将上游输入数据写入数据库。例如“insert…”查询具有更高的优先级
compatible_modeString-OceanBase的兼容模式可以是“mysql”或“oracle”。
databaseString-使用这个“database”和“table-name”自动生成sql并接收上游输入数据写入数据库
此选项与“query”互斥,具有更高的优先级。
tableString-使用数据库和此表名自动生成sql并接收上游输入数据写入数据库
此选项与“query”互斥,并且具有更高的 priority.
primary_keysArray-此选项用于在自动生成sql时支持“insert”、“delete”和“update”等操作。
support_upsert_by_query_primary_key_existBooleanfalse选择使用INSERT sql、UPDATE sql根据查询主键是否存在来处理更新事件(INSERT、UPDATE_AFTER)。此配置仅在数据库不支持升级语法时使用:此方法性能低
connection_check_timeout_secInt30等待用于验证连接的数据库操作完成的时间(秒)。
max_retriesInt0提交失败的重试次数(executeBatch)
batch_sizeInt1000对于批量写入,当缓冲记录的数量达到“batch_size”的数量或时间达到“checkpoint.interval”
时,数据将被刷新到数据库中
generate_sink_sqlBooleanfalse根据要写入的数据库表生成sql语句
max_commit_attemptsInt3事务提交失败的重试次数
transaction_timeout_secInt-1事务打开后的超时,默认值为-1(永不超时)。请注意,设置超时可能会影响<br/>精确一次语义
auto_commitBooleantrue默认情况下启用自动事务提交
propertiesMap-其他连接配置参数,当属性和URL具有相同的参数时,优先级由驱动程序的特定实现决定。例如,在MySQL中,属性优先于URL。
common-options-Sink插件常用参数,详见[Sink common Options](../sink-common-options.md)
enable_upsertBooleantrue通过primary_keys存在启用upsert,如果任务没有键重复数据,将此参数设置为“false”可以加快数据导入

提示

如果未设置partition_column,它将以单并发运行,如果设置了partition_column,它将根据任务的并发数并行执行。

任务示例

简单示例:

此示例定义了一个SeaTunnel同步任务,该任务通过FakeSource自动生成数据并将其发送到JDBC Sink。FakeSource总共生成16行数据(row.num=16),每行有两个字段,name(字符串类型)和age(int类型)。最终的目标表是test_table,表中也将有16行数据。在运行此作业之前,您需要在mysql中创建数据库测试和表test_table。如果您尚未安装和部署SeaTunnel,则需要按照[安装SeaTunnel](../../start-v2/local/deployment.md)中的说明安装和部署SeaTunnel。然后按照[快速启动SeaTunnel引擎](../../Start-v2/locale/Quick-Start-SeaTunnel-Engine.md)中的说明运行此作业。

# 定义运行环境
env {
parallelism = 1
job.mode = "BATCH"
}

source {
# 这是一个示例源插件,**仅用于测试和演示功能源插件**
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# 如果你想了解更多关于如何配置seatunnel的信息,并查看完整的source插件列表,
# 请前往https://seatunnel.apache.org/docs/connector-v2/source
}

transform {
# 如果你想了解更多关于如何配置seatunnel的信息,并查看transform插件的完整列表,
# 请前往https://seatunnel.apache.org/docs/transform-v2
}

sink {
jdbc {
url = "jdbc:oceanbase://localhost:2883/test"
driver = "com.oceanbase.jdbc.Driver"
user = "root"
password = "123456"
compatible_mode = "mysql"
query = "insert into test_table(name,age) values(?,?)"
}
# 如果你想了解更多关于如何配置seatunnel的信息,并查看完整的sink插件列表,
# 请前往https://seatunnel.apache.org/docs/connector-v2/sink
}

生成 Sink SQL

此示例不需要编写复杂的sql语句,您可以配置数据库名称表名以自动为您生成add语句

sink {
jdbc {
url = "jdbc:oceanbase://localhost:2883/test"
driver = "com.oceanbase.jdbc.Driver"
user = "root"
password = "123456"
compatible_mode = "mysql"
# 根据数据库表名自动生成sql语句
generate_sink_sql = true
database = test
table = test_table
}
}

CDC(Change Data Capture) 数据变更事件

我们也支持CDC变更数据。在这种情况下,您需要配置数据库、表和主键。

sink {
jdbc {
url = "jdbc:oceanbase://localhost:3306/test"
driver = "com.oceanbase.jdbc.Driver"
user = "root"
password = "123456"
compatible_mode = "mysql"
generate_sink_sql = true
# 您需要同时配置数据库和表
database = test
table = sink_table
primary_keys = ["id","name"]
}
}