跳到主要内容
版本:Next

Databend

Databend sink 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

用于向 Databend 写入数据的 sink 连接器。支持批处理和流处理模式。 Databend sink 内部通过 stage attachment 实现数据的批量导入。

依赖

  1. 你需要下载 Databend JDBC driver jar package 并添加到目录 ${SEATUNNEL_HOME}/plugins/.

对于 SeaTunnel Zeta

  1. 你需要下载 Databend JDBC driver jar package 并添加到目录 ${SEATUNNEL_HOME}/lib/.

Sink 选项

名称类型是否必须默认值描述
urlString-Databend JDBC 连接 URL
usernameString-Databend 数据库用户名
passwordString-Databend 数据库密码
databaseString-Databend 数据库名称,默认使用连接 URL 中指定的数据库名
tableString-Databend 表名称
batch_sizeInteger1000批量写入的记录数
auto_commitBooleantrue是否自动提交事务
max_retriesInteger3写入失败时的最大重试次数
schema_save_modeEnumCREATE_SCHEMA_WHEN_NOT_EXIST保存 Schema 的模式
data_save_modeEnumAPPEND_DATA保存数据的模式
custom_sqlString-自定义写入 SQL,通常用于复杂的写入场景
execute_timeout_secInteger300执行SQL的超时时间(秒)
jdbc_configMap-额外的 JDBC 连接配置,如连接超时参数等

schema_save_mode[Enum]

在开启同步任务之前,针对现有的表结构选择不同的处理方案。 选项介绍:
RECREATE_SCHEMA :表不存在时创建,表存在时删除并重建。
CREATE_SCHEMA_WHEN_NOT_EXIST :表不存在时会创建,表存在时跳过。
ERROR_WHEN_SCHEMA_NOT_EXIST :表不存在时会报错。
IGNORE :忽略对表的处理。

data_save_mode[Enum]

在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。 选项介绍:
DROP_DATA: 保留数据库结构并删除数据。
APPEND_DATA:保留数据库结构,保留数据。
CUSTOM_PROCESSING:用户自定义处理。
ERROR_WHEN_DATA_EXISTS:有数据时报错。

数据类型映射

SeaTunnel 数据类型Databend 数据类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
STRINGSTRING
BYTESVARBINARY
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP

任务示例

简单示例

env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 10
schema = {
fields {
name = string
age = int
score = double
}
}
}
}

sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
batch_size = 1000
}
}

使用自定义 SQL 写入

sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
custom_sql = "INSERT INTO default.target_table(name, age, score) VALUES(?, ?, ?)"
}
}

使用 Schema 保存模式

sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
schema_save_mode = "RECREATE_SCHEMA"
data_save_mode = "APPEND_DATA"
}
}

相关链接

Changelog

Change Log
ChangeCommitVersion
[Feature][Connector-V2]Support Databend sink/source (#9331)https://github.com/apache/seatunnel/pull/9331TODO