跳到主要内容
版本:Next

SelectDB Cloud

SelectDB Cloud Sink 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

用于将数据发送到 SelectDB Cloud。支持流式和批处理模式。

SelectDB Cloud 接收器连接器的内部实现是在批量缓存后上传数据,并提交 CopyInto SQL 以将数据加载到表中。

支持的数据源信息

:::提示

支持的版本

  • 支持的 SelectDB Cloud 版本 >= 2.2.x

:::

接收器选项

名称类型是否必填默认值描述
load-urlString-SelectDB Cloud 仓库的 HTTP 地址,格式为 warehouse_ip:http_port
jdbc-urlString-SelectDB Cloud 仓库的 JDBC 地址,格式为 warehouse_ip:mysql_port
cluster-nameString-SelectDB Cloud 集群名称
usernameString-SelectDB Cloud 用户名
passwordString-SelectDB Cloud 用户密码
sink.enable-2pcbooltrue是否启用两阶段提交(2pc),默认为 true,以确保 Exactly-Once 语义。SelectDB 使用缓存文件加载数据。当数据量较大时,缓存数据可能会失效(默认过期时间为 1 小时)。如果遇到大量数据写入丢失的情况,请将 sink.enable-2pc 配置为 false。
table.identifierString-SelectDB Cloud 表的名称,格式为 database.table
sink.enable-deleteboolfalse是否启用删除功能。此选项要求 SelectDB Cloud 表启用批量删除功能,并且仅支持 Unique 模型。
sink.max-retriesint3写入数据库失败时的最大重试次数
sink.buffer-sizeint10 1024 1024 (1MB)用于流式加载的数据缓存缓冲区大小
sink.buffer-countint10000用于流式加载的数据缓存缓冲区数量
selectdb.configmap-此选项用于在自动生成 SQL 时支持 insertdeleteupdate 等操作,并支持多种格式。

数据类型映射

SelectDB Cloud 数据类型SeaTunnel 数据类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
TINYINT
INTINT
SMALLINT
TINYINT
BIGINTBIGINT
INT
SMALLINT
TINYINT
LARGEINTBIGINT
INT
SMALLINT
TINYINT
FLOATFLOAT
DOUBLEDOUBLE
FLOAT
DECIMALDECIMAL
DOUBLE
FLOAT
DATEDATE
DATETIMETIMESTAMP
CHARSTRING
VARCHARSTRING
STRINGSTRING
ARRAYARRAY
MAPMAP
JSONSTRING
HLL尚未支持
BITMAP尚未支持
QUANTILE_STATE尚未支持
STRUCT尚未支持

支持的导入数据格式

支持的格式包括 CSV 和 JSON

任务示例

简单示例:

以下示例描述了将多种数据类型写入 SelectDBCloud,用户需要在下游创建相应的表

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}

使用 JSON 格式导入数据

sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}

使用 CSV 格式导入数据

sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "csv"
file.column_separator = ","
file.line_delimiter = "\n"
}
}
}