SelectDB Cloud
SelectDB Cloud Sink 连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
描述
用于将数据发送到 SelectDB Cloud。支持流式和批处理模式。
SelectDB Cloud 接收器连接器的内部实现是在批量缓存后上传数据,并提交 CopyInto SQL 以将数据加载到表中。
支持的数据源信息
:::提示
支持的版本
- 支持的
SelectDB Cloud 版本 >= 2.2.x
:::
接收器选项
名称 | 类型 | 是否必填 | 默认值 | 描述 |
---|---|---|---|---|
load-url | String | 是 | - | SelectDB Cloud 仓库的 HTTP 地址,格式为 warehouse_ip:http_port |
jdbc-url | String | 是 | - | SelectDB Cloud 仓库的 JDBC 地址,格式为 warehouse_ip:mysql_port |
cluster-name | String | 是 | - | SelectDB Cloud 集群名称 |
username | String | 是 | - | SelectDB Cloud 用户名 |
password | String | 是 | - | SelectDB Cloud 用户密码 |
sink.enable-2pc | bool | 否 | true | 是否启用两阶段提交(2pc),默认为 true,以确保 Exactly-Once 语义。SelectDB 使用缓存文件加载数据。当数据量较大时,缓存数据可能会失效(默认过期时间为 1 小时)。如果遇到大量数据写入丢失的情况,请将 sink.enable-2pc 配置为 false。 |
table.identifier | String | 是 | - | SelectDB Cloud 表的名称,格式为 database.table |
sink.enable-delete | bool | 否 | false | 是否启用删除功能。此选项要求 SelectDB Cloud 表启用批量删除功能,并且仅支持 Unique 模型。 |
sink.max-retries | int | 否 | 3 | 写入数据库失败时的最大重试次数 |
sink.buffer-size | int | 否 | 10 1024 1024 (1MB) | 用于流式加载的数据缓存缓冲区大小 |
sink.buffer-count | int | 否 | 10000 | 用于流式加载的数据缓存缓冲区数量 |
selectdb.config | map | 是 | - | 此选项用于在自动生成 SQL 时支持 insert 、delete 和 update 等操作,并支持多种格式。 |
数据类型映射
SelectDB Cloud 数据类型 | SeaTunnel 数据类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT TINYINT |
INT | INT SMALLINT TINYINT |
BIGINT | BIGINT INT SMALLINT TINYINT |
LARGEINT | BIGINT INT SMALLINT TINYINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE FLOAT |
DECIMAL | DECIMAL DOUBLE FLOAT |
DATE | DATE |
DATETIME | TIMESTAMP |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
ARRAY | ARRAY |
MAP | MAP |
JSON | STRING |
HLL | 尚未支持 |
BITMAP | 尚未支持 |
QUANTILE_STATE | 尚未支持 |
STRUCT | 尚未支持 |
支持的导入数据格式
支持的格式包括 CSV 和 JSON
任务示例
简单示例:
以下示例描述了将多种数据类型写入 SelectDBCloud,用户需要在下游创建相应的表
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}
使用 JSON 格式导入数据
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}
使用 CSV 格式导入数据
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "csv"
file.column_separator = ","
file.line_delimiter = "\n"
}
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 |
[Improve] Add disable 2pc in SelectDB cloud sink (#6266) | https://github.com/apache/seatunnel/commit/aa0b2119a | 2.3.5 |
[Feature] Support nanosecond in SelectDB DateTimeV2 type (#6332) | https://github.com/apache/seatunnel/commit/a0ef5dac9 | 2.3.5 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 |
[improve][SelectDB] Add a jobId to the selectDB label to distinguish between tasks (#4864) | https://github.com/apache/seatunnel/commit/84be0f9fd | 2.3.2 |
[Improve][Connector-V2][SelectDB Cloud]Refactor some SelectDB Cloud Sink code as well as support copy into batch and async flush and cdc (#4312) | https://github.com/apache/seatunnel/commit/11e94b216 | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 |
[Feature][Connector-V2][SelectDB Cloud] Support SelectDB Cloud Sink Connector (#3958) | https://github.com/apache/seatunnel/commit/79a134a03 | 2.3.1 |