Skip to main content
Version: Next

Doris

Doris source connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key features

Description

Used to read data from Apache Doris.

Supported DataSource Info

DatasourceSupported versionsDriverUrlMaven
DorisOnly Doris2.0 or later is supported.---

Data Type Mapping

Doris Data typeSeaTunnel Data type
INTINT
TINYINTTINYINT
SMALLINTSMALLINT
BIGINTBIGINT
LARGEINTSTRING
BOOLEANBOOLEAN
DECIMALDECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.)))
FLOATFLOAT
DOUBLEDOUBLE
CHAR
VARCHAR
STRING
TEXT
STRING
DATEDATE
DATETIME
DATETIME(p)
TIMESTAMP
ARRAYARRAY

Source Options

Base configuration:

NameTypeRequiredDefaultDescription
fenodesstringyes-FE address, the format is "fe_host:fe_http_port"
usernamestringyes-User username
passwordstringyes-User password
doris.request.retriesintno3Number of retries to send requests to Doris FE.
doris.request.read.timeout.msintno30000
doris.request.connect.timeout.msintno30000
query-portstringno9030Doris QueryPort
doris.request.query.timeout.sintno3600Timeout period of Doris scan data, expressed in seconds.
table_liststring-table list

Table list configuration:

NameTypeRequiredDefaultDescription
databasestringyes-The name of Doris database
tablestringyes-The name of Doris table
doris.read.fieldstringno-Use the 'doris.read.field' parameter to select the doris table columns to read
doris.filter.querystringno-Data filtering in doris. the format is "field = value",example : doris.filter.query = "F_ID > 2"
doris.batch.sizeintno1024The maximum value that can be obtained by reading Doris BE once.
doris.exec.mem.limitlongno2147483648Maximum memory that can be used by a single be scan request. The default memory is 2G (2147483648).

Note: When this configuration corresponds to a single table, you can flatten the configuration items in table_list to the outer layer.

Tips

It is not recommended to modify advanced parameters at will

Example

single table

This is an example of reading a Doris table and writing to Console.

env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Console {}
}

Use the 'doris.read.field' parameter to select the doris table columns to read

env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Console {}
}

Use 'doris.filter.query' to filter the data, and the parameter values are passed directly to doris

env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.filter.query = "F_ID > 2"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Console {}
}

Multiple table

env{
parallelism = 1
job.mode = "BATCH"
}

source{
Doris {
fenodes = "xxxx:8030"
username = root
password = ""
table_list = [
{
database = "st_source_0"
table = "doris_table_0"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT"
doris.filter.query = "F_ID >= 50"
},
{
database = "st_source_1"
table = "doris_table_1"
}
]
}
}

transform {}

sink{
Doris {
fenodes = "xxxx:8030"
schema_save_mode = "RECREATE_SCHEMA"
username = root
password = ""
database = "st_sink"
table = "${table_name}"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}