StarRocks
StarRocks 数据接收器
引擎支持
Spark
Flink
SeaTunnel Zeta
主要特性
描述
该接收器用于将数据写入到StarRocks中。支持批和流两种模式。 StarRocks数据接收器内部实现采用了缓存,通过stream load将数据批导入。
依赖
对于 Spark/Flink
- 你需要下载 jdbc driver jar package 并添加到目录
${SEATUNNEL_HOME}/plugins/
.
对于 SeaTunnel Zeta
- 你需要下载 jdbc driver jar package 并添加到目录
${SEATUNNEL_HOME}/lib/
.
接收器选项
名称 | 类型 | 是否必须 | 默认值 | Description |
---|---|---|---|---|
nodeUrls | list | yes | - | StarRocks 集群地址, 格式为 ["fe_ip:fe_http_port", ...] |
base-url | string | yes | - | JDBC URL样式的连接信息。如:jdbc:mysql://localhost:9030/ 或 jdbc:mysql://localhost:9030 或 jdbc:mysql://localhost:9030/db |
username | string | yes | - | 目标StarRocks 用户名 |
password | string | yes | - | 目标StarRocks 密码 |
database | string | yes | - | 指定目标 StarRocks 表所在的数据库的名称 |
table | string | no | - | 指定目标 StarRocks 表的名称, 如果没有设置该值,则表名与上游表名相同 |
labelPrefix | string | no | - | StarRocks stream load作业标签前缀 |
batch_max_rows | long | no | 1024 | 在批写情况下,当缓冲区数量达到batch_max_rows 数量或batch_max_bytes 字节大小或者时间达到checkpoint.interval 时,数据会被刷新到StarRocks |
batch_max_bytes | int | no | 5 1024 1024 | 在批写情况下,当缓冲区数量达到batch_max_rows 数量或batch_max_bytes 字节大小或者时间达到checkpoint.interval 时,数据会被刷新到StarRocks |
max_retries | int | no | - | 数据写入StarRocks失败后的重试次数 |
retry_backoff_multiplier_ms | int | no | - | 用作生成下一个退避延迟的乘数 |
max_retry_backoff_ms | int | no | - | 向StarRocks发送重试请求之前的等待时长 |
enable_upsert_delete | boolean | no | false | 是否开启upsert/delete事件的同步,仅仅支持主键模型的表 |
save_mode_create_template | string | no | 参见表下方的说明 | 参见表下方的说明 |
starrocks.config | map | no | - | stream load data_desc 参数 |
http_socket_timeout_ms | int | no | 180000 | http socket超时时间,默认为3分钟 |
schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | 在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法 |
data_save_mode | Enum | no | APPEND_DATA | 在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法 |
custom_sql | String | no | - | 当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行 |
save_mode_create_template
StarRocks数据接收器使用模板,在需求需要的时候也可以修改模板,并结合上游数据类型和结构生成表的创建语句来自动创建StarRocks表。当前仅在多表模式下有效。
默认模板如下:
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (
"replication_num" = "1"
)
在模板中添加自定义字段,比如说加上id
字段的修改模板如下:
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
id,
${rowtype_fields}
) ENGINE = OLAP
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
);
StarRocks数据接收器根据上游数据自动获取相应的信息来填充模板,并且会移除rowtype_fields
中的id字段信息。使用此方法可用来为自定义字段修改类型及相关属性。
可以使用的占位符有:
- database: 上游数据模式的库名称
- table_name: 上游数据模式的表名称
- rowtype_fields: 上游数据模式的所有字段信息,连接器会将字段信息自动映射到StarRocks对应的类型
- rowtype_primary_key: 上游数据模式的主键信息,结果可能是列表
- rowtype_unique_key: 上游数据模式的唯一键信息,结果可能是列表
- comment: 上游数据模式的注释信息
table [string]
使用选项参数database
和table-name
自动生成SQL,并接收上游输入数据写入StarRocks中。
此选项与 query
是互斥的,具具有更高的优先级。
table选项参数可以填入一任意表名,这个名字最终会被用作目标表的表名,并且支持变量(${table_name}
,${schema_name}
)。
替换规则如下:${schema_name}
将替换传递给目标端的 SCHEMA 名称,${table_name}
将替换传递给目标端的表名。
例如:
- test${schema_name}${table_name}_test
- sink_sinktable
- ss_${table_name}
schema_save_mode[Enum]
在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法。可选值有:
RECREATE_SCHEMA
:不存在的表会直接创建,已存在的表会删除并根据参数重新创建
CREATE_SCHEMA_WHEN_NOT_EXIST
:忽略已存在的表,不存在的表会直接创建
ERROR_WHEN_SCHEMA_NOT_EXIST
:当有不存在的表时会直接报错
IGNORE
:忽略对表的处理
data_save_mode[Enum]
在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法。可选值有:
DROP_DATA
: 保存数据库结构,但是会删除表中存量数据
APPEND_DATA
:保存数据库结构和相关的表存量数据
CUSTOM_PROCESSING
:自定义处理
ERROR_WHEN_DATA_EXISTS
:当对应表存在数据时直接报错
custom_sql[String]
当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行。
数据类型映射
StarRocks数据类型 | SeaTunnel数据类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
DATE | STRING |
TIME | STRING |
DATETIME | STRING |
STRING | STRING |
ARRAY | STRING |
MAP | STRING |
BYTES | STRING |
支持导入的数据格式
StarRocks数据接收器支持的格式有CSV和JSON格式。
任务示例
简单示例
接下来给出一个示例,该示例包含多种数据类型的数据写入,且用户需要为目标端下游创建相应表
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}
支持写入cdc变更事件(INSERT/UPDATE/DELETE)示例
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
...
// 支持upsert/delete事件的同步(需要将选项参数enable_upsert_delete设置为true),仅支持表引擎为主键模型
enable_upsert_delete = true
}
}
JSON格式数据导入示例
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}
CSV格式数据导入示例
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}
使用save_mode的示例
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "test_${schema_name}_${table_name}"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Fix][Connector-V2] Fix StarRocksCatalogTest#testCatalog() NPE (#8987) | https://github.com/apache/seatunnel/commit/53f0a9eb5 | 2.3.10 |
[Improve][Connector-V2] Random pick the starrocks fe address which can be connected (#8898) | https://github.com/apache/seatunnel/commit/bef76078f | 2.3.10 |
[Feature][Connector-v2] Support multi starrocks source (#8789) | https://github.com/apache/seatunnel/commit/26b5529aa | 2.3.10 |
[Fix][Connector-V2] Fix possible data loss in scenarios of request_tablet_size is less than the number of BUCKETS (#8768) | https://github.com/apache/seatunnel/commit/3c6f21613 | 2.3.10 |
[Fix][Connector-V2]Fix Descriptions for CUSTOM_SQL in Connector (#8778) | https://github.com/apache/seatunnel/commit/96b610eb7 | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 |
[improve] add StarRocks options (#8639) | https://github.com/apache/seatunnel/commit/da8d9cbd3 | 2.3.10 |
[Fix][Connector-V2] fix starRocks automatically creates tables with comment (#8568) | https://github.com/apache/seatunnel/commit/c4cb1fc4a | 2.3.10 |
[Fix][Connector-V2] Fixed adding table comments (#8514) | https://github.com/apache/seatunnel/commit/edca75b0d | 2.3.10 |
[Feature][Connector-V2] Starrocks implements multi table sink (#8467) | https://github.com/apache/seatunnel/commit/55eebfa8a | 2.3.9 |
[Improve][Connector-V2] Add pre-check starrocks version before exeucte alter table field name (#8237) | https://github.com/apache/seatunnel/commit/c24e3b12b | 2.3.9 |
[Fix][Connector-starrocks] Fix drop column bug for starrocks (#8216) | https://github.com/apache/seatunnel/commit/082814da1 | 2.3.9 |
[Feature][Core] Support read arrow data (#8137) | https://github.com/apache/seatunnel/commit/4710ea0f8 | 2.3.9 |
[Feature][Clickhouse] Support sink savemode (#8086) | https://github.com/apache/seatunnel/commit/e6f92fd79 | 2.3.9 |
[Feature][Connector-V2] StarRocks-sink support schema evolution (#8082) | https://github.com/apache/seatunnel/commit/d33b0da8a | 2.3.9 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 |
[Improve][Connector-V2] Add doris/starrocks create table with comment (#7847) | https://github.com/apache/seatunnel/commit/207b8c16f | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 |
[Improve][API] Move catalog open to SaveModeHandler (#7439) | https://github.com/apache/seatunnel/commit/8c2c5c79a | 2.3.8 |
[Improve][Connector-V2] Reuse connection in StarRocksCatalog (#7342) | https://github.com/apache/seatunnel/commit/8ee129d20 | 2.3.8 |
[Improve][Connector-V2] Remove system table limit (#7391) | https://github.com/apache/seatunnel/commit/adf888e00 | 2.3.8 |
[Improve][Connector-V2] Close all ResultSet after used (#7389) | https://github.com/apache/seatunnel/commit/853e97321 | 2.3.8 |
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122 | 2.3.6 |
[Fix][Connector-V2] Fix starrocks Content-Length header already present error (#7034) | https://github.com/apache/seatunnel/commit/a485a74ef | 2.3.6 |
[Feature][Connector-V2]Support StarRocks Fe Node HA | https://github.com/apache/seatunnel/commit/9c36c4581 | 2.3.6 |
[Fix][Connector-v2] Fix the sql statement error of create table for doris and starrocks (#6679) | https://github.com/apache/seatunnel/commit/88263cd69 | 2.3.6 |
[Fix][StarRocks] Fix NPE when upstream catalogtable table path only have table name part (#6540) | https://github.com/apache/seatunnel/commit/5795b265c | 2.3.5 |
[Fix][Connector-V2] Fixed doris/starrocks create table sql parse error (#6580) | https://github.com/apache/seatunnel/commit/f2ed1fbde | 2.3.5 |
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a | 2.3.5 |
[Improve] Add SaveMode log of process detail (#6375) | https://github.com/apache/seatunnel/commit/b0d70ce22 | 2.3.5 |
[Improve][Connector-V2] Support TableSourceFactory on StarRocks (#6498) | https://github.com/apache/seatunnel/commit/aded56299 | 2.3.5 |
[Improve] StarRocksSourceReader use the existing client (#6480) | https://github.com/apache/seatunnel/commit/1a02c571a | 2.3.5 |
[Improve][API] Unify type system api(data & type) (#5872) | https://github.com/apache/seatunnel/commit/b38c7edcc | 2.3.5 |
[Feature][Connector] add starrocks save_mode (#6029) | https://github.com/apache/seatunnel/commit/66b0f1e1d | 2.3.4 |
[Feature] Add unsupported datatype check for all catalog (#5890) | https://github.com/apache/seatunnel/commit/b9791285a | 2.3.4 |
[Improve] StarRocks support create table template with unique key (#5905) | https://github.com/apache/seatunnel/commit/25b01125e | 2.3.4 |
[Improve][StarRocksSink] add http socket timeout. (#5918) | https://github.com/apache/seatunnel/commit/febdb262b | 2.3.4 |
[Improve] Support create varchar field type in StarRocks (#5911) | https://github.com/apache/seatunnel/commit/602589516 | 2.3.4 |
[Improve]Change System.out.println to log output. (#5912) | https://github.com/apache/seatunnel/commit/bbedb07a9 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 |
[Improve][Connector] Add field name to DataTypeConvertor to improve error message (#5782) | https://github.com/apache/seatunnel/commit/ab60790f0 | 2.3.4 |
[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized (#5663) | https://github.com/apache/seatunnel/commit/eff17ccbe | 2.3.4 |
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682) | https://github.com/apache/seatunnel/commit/86cba8745 | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 |
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562) | https://github.com/apache/seatunnel/commit/41173357f | 2.3.4 |
[Hotfix][Connector-V2][StarRocks] fix starrocks template sql parser #5071 (#5332) | https://github.com/apache/seatunnel/commit/23d79b0d1 | 2.3.4 |
[Improve][Connector-V2] Remove scheduler in StarRocks sink (#5269) | https://github.com/apache/seatunnel/commit/cb7b79491 | 2.3.4 |
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709b | 2.3.4 |
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf7 | 2.3.3 |
Fix StarRocksJsonSerializer will transform array/map/row to string (#5281) | https://github.com/apache/seatunnel/commit/f94195377 | 2.3.3 |
[Improve] Improve savemode api (#4767) | https://github.com/apache/seatunnel/commit/4acd370d4 | 2.3.3 |
[Improve][Connector-V2] Improve StarRocks Auto Create Table To Support Use Primary Key Template In Field (#4487) | https://github.com/apache/seatunnel/commit/e601cd4c3 | 2.3.2 |
Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628) | https://github.com/apache/seatunnel/commit/2d1933195 | 2.3.2 |
[hotfix][starrocks] fix error on get starrocks source typeInfo (#4619) | https://github.com/apache/seatunnel/commit/f7b094f9e | 2.3.2 |
[Improve][Catalog] refactor catalog (#4540) | https://github.com/apache/seatunnel/commit/b0a701cb8 | 2.3.2 |
[Improve][Connector-V2] Throw StarRocks Serialize Error To Client (#4484) | https://github.com/apache/seatunnel/commit/e2c107323 | 2.3.2 |
[Improve][Connector-V2] Improve StarRocks Serialize Error Message (#4458) | https://github.com/apache/seatunnel/commit/465e75cbf | 2.3.2 |
[Hotfix][Zeta] Adapt StarRocks With Multi-Table And Single-Table Mode (#4324) | https://github.com/apache/seatunnel/commit/c11c171d3 | 2.3.1 |
[improve][zeta] fix zeta bugs | https://github.com/apache/seatunnel/commit/3a82e8b39 | 2.3.1 |
[Improve][Zeta] Improve Client Job Info Message | https://github.com/apache/seatunnel/commit/56febf011 | 2.3.1 |
[Fix][Connector-V2] Fix StarRocksSink Without Format Field In Header | https://github.com/apache/seatunnel/commit/463ae6437 | 2.3.1 |
[Improve] Support StarRocksCatalog Use JDBC URL With Custom Suffix | https://github.com/apache/seatunnel/commit/d00ced6ec | 2.3.1 |
[Improve] Support MySqlCatalog Use JDBC URL With Custom Suffix | https://github.com/apache/seatunnel/commit/210d0ff1f | 2.3.1 |
[Improve] Change StarRocks Sink Default Format To Json | https://github.com/apache/seatunnel/commit/870335783 | 2.3.1 |
[Fix] Fix StarRocks Default Url Can't Use | https://github.com/apache/seatunnel/commit/67c45d353 | 2.3.1 |
[hotfix] fixed schema options import error | https://github.com/apache/seatunnel/commit/656805f2d | 2.3.1 |
[chore] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/291214ad6 | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 |
[Fix] Fix StarRocks Default Url Can't Use (#4229) | https://github.com/apache/seatunnel/commit/ed74d1109 | 2.3.1 |
[Bug] Remove StarRocks Auto Creat Table Default Value (#4220) | https://github.com/apache/seatunnel/commit/80b5cd40a | 2.3.1 |
[Feature] Add SaveMode For StarRocks (#4217) | https://github.com/apache/seatunnel/commit/0674f10a5 | 2.3.1 |
[Improve] Improve StarRocks Catalog Base Url (#4215) | https://github.com/apache/seatunnel/commit/6632a4047 | 2.3.1 |
[Improve] Improve StarRocks Sink Config (#4212) | https://github.com/apache/seatunnel/commit/8d5712c1d | 2.3.1 |
[Hotfix][Zeta] keep deleteCheckpoint method synchronized (#4209) | https://github.com/apache/seatunnel/commit/061f9b587 | 2.3.1 |
[Improve] Improve StarRocks Auto Create Table (#4208) | https://github.com/apache/seatunnel/commit/bc9cd6bf6 | 2.3.1 |
[hotfix][zeta] fix zeta multi-table parser error (#4193) | https://github.com/apache/seatunnel/commit/98f2ad0c1 | 2.3.1 |
[feature][starrocks] add StarRocks factories (#4191) | https://github.com/apache/seatunnel/commit/c485d887e | 2.3.1 |
[Feature] Change StarRocks CreatTable Template (#4184) | https://github.com/apache/seatunnel/commit/4cf07f3be | 2.3.1 |
[Feature][Connector-V2] StarRocks source connector (#3679) | https://github.com/apache/seatunnel/commit/9681173b1 | 2.3.1 |
[Improve][Connector-V2] [StarRocks] Starrocks Support Auto Create Table (#4177) | https://github.com/apache/seatunnel/commit/7e0008e6f | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 |
[Feature][Connector-v2][StarRocks] Support write cdc changelog event(INSERT/UPDATE/DELETE) (#3865) | https://github.com/apache/seatunnel/commit/8e3d158c0 | 2.3.1 |
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb | 2.3.1 |
[Improve][Connector-V2][StarRocks] Unified exception for StarRocks source and sink (#3593) | https://github.com/apache/seatunnel/commit/612d0297a | 2.3.0 |
[Improve][Connector-V2][StarRocks] Delete the Mapper may not be used (#3579) | https://github.com/apache/seatunnel/commit/1e868ecf2 | 2.3.0 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 |
[Improve][Connector-V2][StarRocks]Add StarRocks connector option rules (#3402) | https://github.com/apache/seatunnel/commit/5d187f69b | 2.3.0 |
[Bugfix][Connector-V2][StarRocks]Fix StarRocks StreamLoad retry bug and fix doc (#3406) | https://github.com/apache/seatunnel/commit/071f9aa05 | 2.3.0 |
[Feature][Connector-V2] Starrocks sink connector (#3164) | https://github.com/apache/seatunnel/commit/3e6caf705 | 2.3.0 |