SelectDB Cloud
SelectDB Cloud Sink 连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
描述
用于将数据发送到 SelectDB Cloud。支持流式和批处理模式。
SelectDB Cloud 接收器连接器的内部实现是在批量缓存后上传数据,并提交 CopyInto SQL 以将数据加载到表中。
支持的数据源信息
:::提示
支持的版本
- 支持的 
SelectDB Cloud 版本 >= 2.2.x 
:::
接收器选项
| 名称 | 类型 | 是否必填 | 默认值 | 描述 | 
|---|---|---|---|---|
| load-url | String | 是 | - | SelectDB Cloud 仓库的 HTTP 地址,格式为 warehouse_ip:http_port | 
| jdbc-url | String | 是 | - | SelectDB Cloud 仓库的 JDBC 地址,格式为 warehouse_ip:mysql_port | 
| cluster-name | String | 是 | - | SelectDB Cloud 集群名称 | 
| username | String | 是 | - | SelectDB Cloud 用户名 | 
| password | String | 是 | - | SelectDB Cloud 用户密码 | 
| sink.enable-2pc | bool | 否 | true | 是否启用两阶段提交(2pc),默认为 true,以确保 Exactly-Once 语义。SelectDB 使用缓存文件加载数据。当数据量较大时,缓存数据可能会失效(默认过期时间为 1 小时)。如果遇到大量数据写入丢失的情况,请将 sink.enable-2pc 配置为 false。 | 
| table.identifier | String | 是 | - | SelectDB Cloud 表的名称,格式为 database.table | 
| sink.enable-delete | bool | 否 | false | 是否启用删除功能。此选项要求 SelectDB Cloud 表启用批量删除功能,并且仅支持 Unique 模型。 | 
| sink.max-retries | int | 否 | 3 | 写入数据库失败时的最大重试次数 | 
| sink.buffer-size | int | 否 | 10 1024 1024 (1MB) | 用于流式加载的数据缓存缓冲区大小 | 
| sink.buffer-count | int | 否 | 10000 | 用于流式加载的数据缓存缓冲区数量 | 
| selectdb.config | map | 是 | - | 此选项用于在自动生成 SQL 时支持 insert、delete 和 update 等操作,并支持多种格式。 | 
数据类型映射
| SelectDB Cloud 数据类型 | SeaTunnel 数据类型 | 
|---|---|
| BOOLEAN | BOOLEAN | 
| TINYINT | TINYINT | 
| SMALLINT | SMALLINT TINYINT  | 
| INT | INT SMALLINT TINYINT  | 
| BIGINT | BIGINT INT SMALLINT TINYINT  | 
| LARGEINT | BIGINT INT SMALLINT TINYINT  | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE FLOAT  | 
| DECIMAL | DECIMAL DOUBLE FLOAT  | 
| DATE | DATE | 
| DATETIME | TIMESTAMP | 
| CHAR | STRING | 
| VARCHAR | STRING | 
| STRING | STRING | 
| ARRAY | ARRAY | 
| MAP | MAP | 
| JSON | STRING | 
| HLL | 尚未支持 | 
| BITMAP | 尚未支持 | 
| QUANTILE_STATE | 尚未支持 | 
| STRUCT | 尚未支持 | 
支持的导入数据格式
支持的格式包括 CSV 和 JSON
任务示例
简单示例:
以下示例描述了将多种数据类型写入 SelectDBCloud,用户需要在下游创建相应的表
env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 10000
}
source {
  FakeSource {
    row.num = 10
    map.size = 10
    array.size = 10
    bytes.length = 10
    string.length = 10
    schema = {
      fields {
        c_map = "map<string, array<int>>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(16, 1)"
        c_null = "null"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
    }
}
sink {
  SelectDBCloud {
    load-url = "warehouse_ip:http_port"
    jdbc-url = "warehouse_ip:mysql_port"
    cluster-name = "Cluster"
    table.identifier = "test.test"
    username = "admin"
    password = "******"
    selectdb.config {
        file.type = "json"
    }
  }
}
使用 JSON 格式导入数据
sink {
  SelectDBCloud {
    load-url = "warehouse_ip:http_port"
    jdbc-url = "warehouse_ip:mysql_port"
    cluster-name = "Cluster"
    table.identifier = "test.test"
    username = "admin"
    password = "******"
    selectdb.config {
        file.type = "json"
    }
  }
}
使用 CSV 格式导入数据
sink {
  SelectDBCloud {
    load-url = "warehouse_ip:http_port"
    jdbc-url = "warehouse_ip:mysql_port"
    cluster-name = "Cluster"
    table.identifier = "test.test"
    username = "admin"
    password = "******"
    selectdb.config {
        file.type = "csv"
        file.column_separator = "," 
        file.line_delimiter = "\n" 
    }
  }
}
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Improve] Add disable 2pc in SelectDB cloud sink (#6266) | https://github.com/apache/seatunnel/commit/aa0b2119a | 2.3.5 | 
| [Feature] Support nanosecond in SelectDB DateTimeV2 type (#6332) | https://github.com/apache/seatunnel/commit/a0ef5dac9 | 2.3.5 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 | 
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 | 
| [improve][SelectDB] Add a jobId to the selectDB label to distinguish between tasks (#4864) | https://github.com/apache/seatunnel/commit/84be0f9fd | 2.3.2 | 
| [Improve][Connector-V2][SelectDB Cloud]Refactor some SelectDB Cloud Sink code as well as support copy into batch and async flush and cdc (#4312) | https://github.com/apache/seatunnel/commit/11e94b216 | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 | 
| [Feature][Connector-V2][SelectDB Cloud] Support SelectDB Cloud Sink Connector (#3958) | https://github.com/apache/seatunnel/commit/79a134a03 | 2.3.1 |