Databend
Databend sink 连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
描述
用于向 Databend 写入数据的 sink 连接器。支持批处理和流处理模式。 Databend sink 内部通过 stage attachment 实现数据的批量导入。
依赖
对于 Spark/Flink
- 你需要下载 Databend JDBC driver jar package 并添加到目录
${SEATUNNEL_HOME}/plugins/
.
对于 SeaTunnel Zeta
- 你需要下载 Databend JDBC driver jar package 并添加到目录
${SEATUNNEL_HOME}/lib/
.
Sink 选项
名称 | 类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
url | String | 是 | - | Databend JDBC 连接 URL |
username | String | 是 | - | Databend 数据库用户名 |
password | String | 是 | - | Databend 数据库密码 |
database | String | 否 | - | Databend 数据库名称,默认使用连接 URL 中指定的数据库名 |
table | String | 否 | - | Databend 表名称 |
batch_size | Integer | 否 | 1000 | 批量写入的记录数 |
auto_commit | Boolean | 否 | true | 是否自动提交事务 |
max_retries | Integer | 否 | 3 | 写入失败时的最大重试次数 |
schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 保存 Schema 的模式 |
data_save_mode | Enum | 否 | APPEND_DATA | 保存数据的模式 |
custom_sql | String | 否 | - | 自定义写入 SQL,通常用于复杂的写入场景 |
execute_timeout_sec | Integer | 否 | 300 | 执行SQL的超时时间(秒) |
jdbc_config | Map | 否 | - | 额外的 JDBC 连接配置,如连接超时参数等 |
schema_save_mode[Enum]
在开启同步任务之前,针对现有的表结构选择不同的处理方案。
选项介绍:
RECREATE_SCHEMA
:表不存在时创建,表存在时删除并重建。
CREATE_SCHEMA_WHEN_NOT_EXIST
:表不存在时会创建,表存在时跳过。
ERROR_WHEN_SCHEMA_NOT_EXIST
:表不存在时会报错。
IGNORE
:忽略对表的处理。
data_save_mode[Enum]
在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。
选项介绍:
DROP_DATA
: 保留数据库结构并删除数据。
APPEND_DATA
:保留数据库结构,保留数据。
CUSTOM_PROCESSING
:用户自定义处理。
ERROR_WHEN_DATA_EXISTS
:有数据时报错。
数据类型映射
SeaTunnel 数据类型 | Databend 数据类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
STRING | STRING |
BYTES | VARBINARY |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
任务示例
简单示例
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 10
schema = {
fields {
name = string
age = int
score = double
}
}
}
}
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
batch_size = 1000
}
}
使用自定义 SQL 写入
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
custom_sql = "INSERT INTO default.target_table(name, age, score) VALUES(?, ?, ?)"
}
}
使用 Schema 保存模式
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
schema_save_mode = "RECREATE_SCHEMA"
data_save_mode = "APPEND_DATA"
}
}
相关链接
Changelog
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Connector-V2]Support Databend sink/source (#9331) | https://github.com/apache/seatunnel/pull/9331 | TODO |