跳到主要内容
版本:Next

Doris

Doris 源连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要功能

描述

用于 Apache Doris 的源连接器。

支持的数据源信息

数据源支持版本驱动UrlMaven
Doris仅支持Doris2.0及以上版本.---

数据类型映射

Doris 数据类型SeaTunnel 数据类型
INTINT
TINYINTTINYINT
SMALLINTSMALLINT
BIGINTBIGINT
LARGEINTSTRING
BOOLEANBOOLEAN
DECIMALDECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.)))
FLOATFLOAT
DOUBLEDOUBLE
CHAR
VARCHAR
STRING
TEXT
STRING
DATEDATE
DATETIME
DATETIME(p)
TIMESTAMP
ARRAYARRAY

源选项

基础配置:

名称类型是否必须默认值描述
fenodesstringyes-FE 地址, 格式:"fe_host:fe_http_port"
usernamestringyes-用户名
passwordstringyes-密码
doris.request.retriesintno3请求Doris FE的重试次数
doris.request.read.timeout.msintno30000
doris.request.connect.timeout.msintno30000
query-portstringno9030Doris查询端口
doris.request.query.timeout.sintno3600Doris扫描数据的超时时间,单位秒
table_liststring-表清单

表清单配置:

名称类型是否必须默认值描述
databasestringyes-数据库
tablestringyes-表名
doris.read.fieldstringno-选择要读取的Doris表字段
doris.filter.querystringno-数据过滤. 格式:"字段 = 值", 例如:doris.filter.query = "F_ID > 2"
doris.batch.sizeintno1024每次能够从BE中读取到的最大行数
doris.exec.mem.limitlongno2147483648单个be扫描请求可以使用的最大内存。默认内存为2G(2147483648)

注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。

提示

不建议随意修改高级参数

例子

单表

这是一个从doris读取数据后,输出到控制台的例子:

env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Console {}
}

使用doris.read.field参数来选择需要读取的Doris表字段:

env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Console {}
}

使用doris.filter.query来过滤数据,参数值将作为过滤条件直接传递到doris:

env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.filter.query = "F_ID > 2"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Console {}
}

多表

env{
parallelism = 1
job.mode = "BATCH"
}

source{
Doris {
fenodes = "xxxx:8030"
username = root
password = ""
table_list = [
{
database = "st_source_0"
table = "doris_table_0"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT"
doris.filter.query = "F_ID >= 50"
},
{
database = "st_source_1"
table = "doris_table_1"
}
]
}
}

transform {}

sink{
Doris {
fenodes = "xxxx:8030"
schema_save_mode = "RECREATE_SCHEMA"
username = root
password = ""
database = "st_sink"
table = "${table_name}"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}