Doris
Doris 源连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要功能
描述
用于 Apache Doris 的源连接器。
支持的数据源信息
数据源 | 支持版本 | 驱动 | Url | Maven |
---|---|---|---|---|
Doris | 仅支持Doris2.0及以上版本. | - | - | - |
数据类型映射
Doris 数据类型 | SeaTunnel 数据类型 |
---|---|
INT | INT |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
BIGINT | BIGINT |
LARGEINT | STRING |
BOOLEAN | BOOLEAN |
DECIMAL | DECIMAL((Get the designated column's specified column size)+1, (Gets the designated column's number of digits to right of the decimal point.))) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
CHAR VARCHAR STRING TEXT | STRING |
DATE | DATE |
DATETIME DATETIME(p) | TIMESTAMP |
ARRAY | ARRAY |
源选项
基础配置:
名称 | 类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
fenodes | string | yes | - | FE 地址, 格式:"fe_host:fe_http_port" |
username | string | yes | - | 用户名 |
password | string | yes | - | 密码 |
doris.request.retries | int | no | 3 | 请求Doris FE的重试次数 |
doris.request.read.timeout.ms | int | no | 30000 | |
doris.request.connect.timeout.ms | int | no | 30000 | |
query-port | string | no | 9030 | Doris查询端口 |
doris.request.query.timeout.s | int | no | 3600 | Doris扫描数据的超时时间,单位秒 |
table_list | string | 否 | - | 表清单 |
表清单配置:
名称 | 类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
database | string | yes | - | 数据库 |
table | string | yes | - | 表名 |
doris.read.field | string | no | - | 选择要读取的Doris表字段 |
doris.filter.query | string | no | - | 数据过滤. 格式:"字段 = 值", 例如:doris.filter.query = "F_ID > 2" |
doris.batch.size | int | no | 1024 | 每次能够从BE中读取到的最大行数 |
doris.exec.mem.limit | long | no | 2147483648 | 单个be扫描请求可以使用的最大内存。默认内存为2G(2147483648) |
注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
提示
不建议随意修改高级参数
例子
单表
这是一个从doris读取数据后,输出到控制台的例子:
env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
}
使用doris.read.field
参数来选择需要读取的Doris表字段:
env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
}
使用doris.filter.query
来过滤数据,参数值将作为过滤条件直接传递到doris:
env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.filter.query = "F_ID > 2"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
}
多表
env{
parallelism = 1
job.mode = "BATCH"
}
source{
Doris {
fenodes = "xxxx:8030"
username = root
password = ""
table_list = [
{
database = "st_source_0"
table = "doris_table_0"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT"
doris.filter.query = "F_ID >= 50"
},
{
database = "st_source_1"
table = "doris_table_1"
}
]
}
}
transform {}
sink{
Doris {
fenodes = "xxxx:8030"
schema_save_mode = "RECREATE_SCHEMA"
username = root
password = ""
database = "st_sink"
table = "${table_name}"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}