跳到主要内容
版本:2.3.10

StarRocks

StarRocks 数据接收器

引擎支持

Spark
Flink
SeaTunnel Zeta

主要特性

描述

该接收器用于将数据写入到StarRocks中。支持批和流两种模式。 StarRocks数据接收器内部实现采用了缓存,通过stream load将数据批导入。

依赖

  1. 你需要下载 jdbc driver jar package 并添加到目录 ${SEATUNNEL_HOME}/plugins/.

对于 SeaTunnel Zeta

  1. 你需要下载 jdbc driver jar package 并添加到目录 ${SEATUNNEL_HOME}/lib/.

接收器选项

名称类型是否必须默认值Description
nodeUrlslistyes-StarRocks集群地址, 格式为 ["fe_ip:fe_http_port", ...]
base-urlstringyes-JDBC URL样式的连接信息。如:jdbc:mysql://localhost:9030/jdbc:mysql://localhost:9030jdbc:mysql://localhost:9030/db
usernamestringyes-目标StarRocks 用户名
passwordstringyes-目标StarRocks 密码
databasestringyes-指定目标 StarRocks 表所在的数据库的名称
tablestringno-指定目标 StarRocks 表的名称, 如果没有设置该值,则表名与上游表名相同
labelPrefixstringno-StarRocks stream load作业标签前缀
batch_max_rowslongno1024在批写情况下,当缓冲区数量达到batch_max_rows数量或batch_max_bytes字节大小或者时间达到checkpoint.interval时,数据会被刷新到StarRocks
batch_max_bytesintno5 1024 1024在批写情况下,当缓冲区数量达到batch_max_rows数量或batch_max_bytes字节大小或者时间达到checkpoint.interval时,数据会被刷新到StarRocks
max_retriesintno-数据写入StarRocks失败后的重试次数
retry_backoff_multiplier_msintno-用作生成下一个退避延迟的乘数
max_retry_backoff_msintno-向StarRocks发送重试请求之前的等待时长
enable_upsert_deletebooleannofalse是否开启upsert/delete事件的同步,仅仅支持主键模型的表
save_mode_create_templatestringno参见表下方的说明参见表下方的说明
starrocks.configmapno-stream load data_desc参数
http_socket_timeout_msintno180000http socket超时时间,默认为3分钟
schema_save_modeEnumnoCREATE_SCHEMA_WHEN_NOT_EXIST在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法
data_save_modeEnumnoAPPEND_DATA在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法
custom_sqlStringno-当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行

save_mode_create_template

StarRocks数据接收器使用模板,在需求需要的时候也可以修改模板,并结合上游数据类型和结构生成表的创建语句来自动创建StarRocks表。当前仅在多表模式下有效。

默认模板如下:

CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (
"replication_num" = "1"
)

在模板中添加自定义字段,比如说加上id字段的修改模板如下:

CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
id,
${rowtype_fields}
) ENGINE = OLAP
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
);

StarRocks数据接收器根据上游数据自动获取相应的信息来填充模板,并且会移除rowtype_fields中的id字段信息。使用此方法可用来为自定义字段修改类型及相关属性。

可以使用的占位符有:

  • database: 上游数据模式的库名称
  • table_name: 上游数据模式的表名称
  • rowtype_fields: 上游数据模式的所有字段信息,连接器会将字段信息自动映射到StarRocks对应的类型
  • rowtype_primary_key: 上游数据模式的主键信息,结果可能是列表
  • rowtype_unique_key: 上游数据模式的唯一键信息,结果可能是列表
  • comment: 上游数据模式的注释信息

table [string]

使用选项参数databasetable-name自动生成SQL,并接收上游输入数据写入StarRocks中。

此选项与 query 是互斥的,具具有更高的优先级。

table选项参数可以填入一任意表名,这个名字最终会被用作目标表的表名,并且支持变量(${table_name}${schema_name})。 替换规则如下:${schema_name} 将替换传递给目标端的 SCHEMA 名称,${table_name} 将替换传递给目标端的表名。

例如:

  1. test${schema_name}${table_name}_test
  2. sink_sinktable
  3. ss_${table_name}

schema_save_mode[Enum]

在同步任务打开之前,针对目标端已存在的表结构选择不同的处理方法。可选值有:
RECREATE_SCHEMA :不存在的表会直接创建,已存在的表会删除并根据参数重新创建
CREATE_SCHEMA_WHEN_NOT_EXIST :忽略已存在的表,不存在的表会直接创建
ERROR_WHEN_SCHEMA_NOT_EXIST :当有不存在的表时会直接报错
IGNORE :忽略对表的处理

data_save_mode[Enum]

在同步任务打开之前,针对目标端已存在的数据选择不同的处理方法。可选值有: DROP_DATA: 保存数据库结构,但是会删除表中存量数据 APPEND_DATA:保存数据库结构和相关的表存量数据 CUSTOM_PROCESSING:自定义处理 ERROR_WHEN_DATA_EXISTS:当对应表存在数据时直接报错

custom_sql[String]

当data_save_mode设置为CUSTOM_PROCESSING时,必须同时设置CUSTOM_SQL参数。CUSTOM_SQL的值为可执行的SQL语句,在同步任务开启前SQL将会被执行。

数据类型映射

StarRocks数据类型SeaTunnel数据类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
DATESTRING
TIMESTRING
DATETIMESTRING
STRINGSTRING
ARRAYSTRING
MAPSTRING
BYTESSTRING

支持导入的数据格式

StarRocks数据接收器支持的格式有CSV和JSON格式。

任务示例

简单示例

接下来给出一个示例,该示例包含多种数据类型的数据写入,且用户需要为目标端下游创建相应表

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}

支持写入cdc变更事件(INSERT/UPDATE/DELETE)示例

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
...

// 支持upsert/delete事件的同步(需要将选项参数enable_upsert_delete设置为true),仅支持表引擎为主键模型
enable_upsert_delete = true
}
}

JSON格式数据导入示例

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}

CSV格式数据导入示例

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}

使用save_mode的示例

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "test_${schema_name}_${table_name}"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}

变更日志

Change Log
ChangeCommitVersion
[Fix][Connector-V2] Fix StarRocksCatalogTest#testCatalog() NPE (#8987)https://github.com/apache/seatunnel/commit/53f0a9eb52.3.10
[Improve][Connector-V2] Random pick the starrocks fe address which can be connected (#8898)https://github.com/apache/seatunnel/commit/bef76078f2.3.10
[Feature][Connector-v2] Support multi starrocks source (#8789)https://github.com/apache/seatunnel/commit/26b5529aa2.3.10
[Fix][Connector-V2] Fix possible data loss in scenarios of request_tablet_size is less than the number of BUCKETS (#8768)https://github.com/apache/seatunnel/commit/3c6f216132.3.10
[Fix][Connector-V2]Fix Descriptions for CUSTOM_SQL in Connector (#8778)https://github.com/apache/seatunnel/commit/96b610eb72.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[improve] add StarRocks options (#8639)https://github.com/apache/seatunnel/commit/da8d9cbd32.3.10
[Fix][Connector-V2] fix starRocks automatically creates tables with comment (#8568)https://github.com/apache/seatunnel/commit/c4cb1fc4a2.3.10
[Fix][Connector-V2] Fixed adding table comments (#8514)https://github.com/apache/seatunnel/commit/edca75b0d2.3.10
[Feature][Connector-V2] Starrocks implements multi table sink (#8467)https://github.com/apache/seatunnel/commit/55eebfa8a2.3.9
[Improve][Connector-V2] Add pre-check starrocks version before exeucte alter table field name (#8237)https://github.com/apache/seatunnel/commit/c24e3b12b2.3.9
[Fix][Connector-starrocks] Fix drop column bug for starrocks (#8216)https://github.com/apache/seatunnel/commit/082814da12.3.9
[Feature][Core] Support read arrow data (#8137)https://github.com/apache/seatunnel/commit/4710ea0f82.3.9
[Feature][Clickhouse] Support sink savemode (#8086)https://github.com/apache/seatunnel/commit/e6f92fd792.3.9
[Feature][Connector-V2] StarRocks-sink support schema evolution (#8082)https://github.com/apache/seatunnel/commit/d33b0da8a2.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Improve][Connector-V2] Add doris/starrocks create table with comment (#7847)https://github.com/apache/seatunnel/commit/207b8c16f2.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Improve][API] Move catalog open to SaveModeHandler (#7439)https://github.com/apache/seatunnel/commit/8c2c5c79a2.3.8
[Improve][Connector-V2] Reuse connection in StarRocksCatalog (#7342)https://github.com/apache/seatunnel/commit/8ee129d202.3.8
[Improve][Connector-V2] Remove system table limit (#7391)https://github.com/apache/seatunnel/commit/adf888e002.3.8
[Improve][Connector-V2] Close all ResultSet after used (#7389)https://github.com/apache/seatunnel/commit/853e973212.3.8
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[Fix][Connector-V2] Fix starrocks Content-Length header already present error (#7034)https://github.com/apache/seatunnel/commit/a485a74ef2.3.6
[Feature][Connector-V2]Support StarRocks Fe Node HAhttps://github.com/apache/seatunnel/commit/9c36c45812.3.6
[Fix][Connector-v2] Fix the sql statement error of create table for doris and starrocks (#6679)https://github.com/apache/seatunnel/commit/88263cd692.3.6
[Fix][StarRocks] Fix NPE when upstream catalogtable table path only have table name part (#6540)https://github.com/apache/seatunnel/commit/5795b265c2.3.5
[Fix][Connector-V2] Fixed doris/starrocks create table sql parse error (#6580)https://github.com/apache/seatunnel/commit/f2ed1fbde2.3.5
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a2.3.5
[Improve] Add SaveMode log of process detail (#6375)https://github.com/apache/seatunnel/commit/b0d70ce222.3.5
[Improve][Connector-V2] Support TableSourceFactory on StarRocks (#6498)https://github.com/apache/seatunnel/commit/aded562992.3.5
[Improve] StarRocksSourceReader use the existing client (#6480)https://github.com/apache/seatunnel/commit/1a02c571a2.3.5
[Improve][API] Unify type system api(data & type) (#5872)https://github.com/apache/seatunnel/commit/b38c7edcc2.3.5
[Feature][Connector] add starrocks save_mode (#6029)https://github.com/apache/seatunnel/commit/66b0f1e1d2.3.4
[Feature] Add unsupported datatype check for all catalog (#5890)https://github.com/apache/seatunnel/commit/b9791285a2.3.4
[Improve] StarRocks support create table template with unique key (#5905)https://github.com/apache/seatunnel/commit/25b01125e2.3.4
[Improve][StarRocksSink] add http socket timeout. (#5918)https://github.com/apache/seatunnel/commit/febdb262b2.3.4
[Improve] Support create varchar field type in StarRocks (#5911)https://github.com/apache/seatunnel/commit/6025895162.3.4
[Improve]Change System.out.println to log output. (#5912)https://github.com/apache/seatunnel/commit/bbedb07a92.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de7408102.3.4
[Improve][Connector] Add field name to DataTypeConvertor to improve error message (#5782)https://github.com/apache/seatunnel/commit/ab60790f02.3.4
[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized (#5663)https://github.com/apache/seatunnel/commit/eff17ccbe2.3.4
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682)https://github.com/apache/seatunnel/commit/86cba87452.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562)https://github.com/apache/seatunnel/commit/41173357f2.3.4
[Hotfix][Connector-V2][StarRocks] fix starrocks template sql parser #5071 (#5332)https://github.com/apache/seatunnel/commit/23d79b0d12.3.4
[Improve][Connector-V2] Remove scheduler in StarRocks sink (#5269)https://github.com/apache/seatunnel/commit/cb7b794912.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf72.3.3
Fix StarRocksJsonSerializer will transform array/map/row to string (#5281)https://github.com/apache/seatunnel/commit/f941953772.3.3
[Improve] Improve savemode api (#4767)https://github.com/apache/seatunnel/commit/4acd370d42.3.3
[Improve][Connector-V2] Improve StarRocks Auto Create Table To Support Use Primary Key Template In Field (#4487)https://github.com/apache/seatunnel/commit/e601cd4c32.3.2
Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628)https://github.com/apache/seatunnel/commit/2d19331952.3.2
[hotfix][starrocks] fix error on get starrocks source typeInfo (#4619)https://github.com/apache/seatunnel/commit/f7b094f9e2.3.2
[Improve][Catalog] refactor catalog (#4540)https://github.com/apache/seatunnel/commit/b0a701cb82.3.2
[Improve][Connector-V2] Throw StarRocks Serialize Error To Client (#4484)https://github.com/apache/seatunnel/commit/e2c1073232.3.2
[Improve][Connector-V2] Improve StarRocks Serialize Error Message (#4458)https://github.com/apache/seatunnel/commit/465e75cbf2.3.2
[Hotfix][Zeta] Adapt StarRocks With Multi-Table And Single-Table Mode (#4324)https://github.com/apache/seatunnel/commit/c11c171d32.3.1
[improve][zeta] fix zeta bugshttps://github.com/apache/seatunnel/commit/3a82e8b392.3.1
[Improve][Zeta] Improve Client Job Info Messagehttps://github.com/apache/seatunnel/commit/56febf0112.3.1
[Fix][Connector-V2] Fix StarRocksSink Without Format Field In Headerhttps://github.com/apache/seatunnel/commit/463ae64372.3.1
[Improve] Support StarRocksCatalog Use JDBC URL With Custom Suffixhttps://github.com/apache/seatunnel/commit/d00ced6ec2.3.1
[Improve] Support MySqlCatalog Use JDBC URL With Custom Suffixhttps://github.com/apache/seatunnel/commit/210d0ff1f2.3.1
[Improve] Change StarRocks Sink Default Format To Jsonhttps://github.com/apache/seatunnel/commit/8703357832.3.1
[Fix] Fix StarRocks Default Url Can't Usehttps://github.com/apache/seatunnel/commit/67c45d3532.3.1
[hotfix] fixed schema options import errorhttps://github.com/apache/seatunnel/commit/656805f2d2.3.1
[chore] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/291214ad62.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[Fix] Fix StarRocks Default Url Can't Use (#4229)https://github.com/apache/seatunnel/commit/ed74d11092.3.1
[Bug] Remove StarRocks Auto Creat Table Default Value (#4220)https://github.com/apache/seatunnel/commit/80b5cd40a2.3.1
[Feature] Add SaveMode For StarRocks (#4217)https://github.com/apache/seatunnel/commit/0674f10a52.3.1
[Improve] Improve StarRocks Catalog Base Url (#4215)https://github.com/apache/seatunnel/commit/6632a40472.3.1
[Improve] Improve StarRocks Sink Config (#4212)https://github.com/apache/seatunnel/commit/8d5712c1d2.3.1
[Hotfix][Zeta] keep deleteCheckpoint method synchronized (#4209)https://github.com/apache/seatunnel/commit/061f9b5872.3.1
[Improve] Improve StarRocks Auto Create Table (#4208)https://github.com/apache/seatunnel/commit/bc9cd6bf62.3.1
[hotfix][zeta] fix zeta multi-table parser error (#4193)https://github.com/apache/seatunnel/commit/98f2ad0c12.3.1
[feature][starrocks] add StarRocks factories (#4191)https://github.com/apache/seatunnel/commit/c485d887e2.3.1
[Feature] Change StarRocks CreatTable Template (#4184)https://github.com/apache/seatunnel/commit/4cf07f3be2.3.1
[Feature][Connector-V2] StarRocks source connector (#3679)https://github.com/apache/seatunnel/commit/9681173b12.3.1
[Improve][Connector-V2] [StarRocks] Starrocks Support Auto Create Table (#4177)https://github.com/apache/seatunnel/commit/7e0008e6f2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Feature][Connector-v2][StarRocks] Support write cdc changelog event(INSERT/UPDATE/DELETE) (#3865)https://github.com/apache/seatunnel/commit/8e3d158c02.3.1
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719)https://github.com/apache/seatunnel/commit/ef1b8b1bb2.3.1
[Improve][Connector-V2][StarRocks] Unified exception for StarRocks source and sink (#3593)https://github.com/apache/seatunnel/commit/612d0297a2.3.0
[Improve][Connector-V2][StarRocks] Delete the Mapper may not be used (#3579)https://github.com/apache/seatunnel/commit/1e868ecf22.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Improve][Connector-V2][StarRocks]Add StarRocks connector option rules (#3402)https://github.com/apache/seatunnel/commit/5d187f69b2.3.0
[Bugfix][Connector-V2][StarRocks]Fix StarRocks StreamLoad retry bug and fix doc (#3406)https://github.com/apache/seatunnel/commit/071f9aa052.3.0
[Feature][Connector-V2] Starrocks sink connector (#3164)https://github.com/apache/seatunnel/commit/3e6caf7052.3.0