跳到主要内容
版本:2.3.5

SelectDB Cloud

SelectDB Cloud sink connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Used to send data to SelectDB Cloud. Both support streaming and batch mode. The internal implementation of SelectDB Cloud sink connector upload after batch caching and commit the CopyInto sql to load data into the table.

Supported DataSource Info

提示

Version Supported

  • supported SelectDB Cloud version is >= 2.2.x

Sink Options

NameTypeRequiredDefaultDescription
load-urlStringYes-SelectDB Cloud warehouse http address, the format is warehouse_ip:http_port
jdbc-urlStringYes-SelectDB Cloud warehouse jdbc address, the format is warehouse_ip:mysql_port
cluster-nameStringYes-SelectDB Cloud cluster name
usernameStringYes-SelectDB Cloud user username
passwordStringYes-SelectDB Cloud user password
sink.enable-2pcboolNotrueWhether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. SelectDB uses cache files to load data. When the amount of data is large, cached data may become invalid (the default expiration time is 1 hour). If you encounter a large amount of data write loss, please configure sink.enable-2pc to false.
table.identifierStringYes-The name of SelectDB Cloud table, the format is database.table
sink.enable-deleteboolNofalseWhether to enable deletion. This option requires SelectDB Cloud table to enable batch delete function, and only supports Unique model.
sink.max-retriesintNo3the max retry times if writing records to database failed
sink.buffer-sizeintNo10 1024 1024 (1MB)the buffer size to cache data for stream load.
sink.buffer-countintNo10000the buffer count to cache data for stream load.
selectdb.configmapyes-This option is used to support operations such as insert, delete, and update when automatically generate sql,and supported formats.

Data Type Mapping

SelectDB Cloud Data typeSeaTunnel Data type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
TINYINT
INTINT
SMALLINT
TINYINT
BIGINTBIGINT
INT
SMALLINT
TINYINT
LARGEINTBIGINT
INT
SMALLINT
TINYINT
FLOATFLOAT
DOUBLEDOUBLE
FLOAT
DECIMALDECIMAL
DOUBLE
FLOAT
DATEDATE
DATETIMETIMESTAMP
CHARSTRING
VARCHARSTRING
STRINGSTRING
ARRAYARRAY
MAPMAP
JSONSTRING
HLLNot supported yet
BITMAPNot supported yet
QUANTILE_STATENot supported yet
STRUCTNot supported yet

Supported import data formats

The supported formats include CSV and JSON

Task Example

Simple:

The following example describes writing multiple data types to SelectDBCloud, and users need to create corresponding tables downstream

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}

Use JSON format to import data

sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}

Use CSV format to import data

sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "csv"
file.column_separator = ","
file.line_delimiter = "\n"
}
}
}