跳到主要内容
版本:2.3.3

Doris

Doris sink connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Used to send data to Doris. Both support streaming and batch mode. The internal implementation of Doris sink connector is cached and imported by stream load in batches.

Supported DataSource Info

提示

Version Supported

  • exactly-once & cdc supported Doris version is >= 1.1.x
  • Array data type supported Doris version is >= 1.2.x
  • Map data type will be support in Doris version is 2.x

Sink Options

NameTypeRequiredDefaultDescription
fenodesStringYes-Doris cluster fenodes address, the format is "fe_ip:fe_http_port, ..."
usernameStringYes-Doris user username
passwordStringYes-Doris user password
table.identifierStringYes-The name of Doris table
sink.label-prefixStringYes-The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.
sink.enable-2pcboolNo-Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to here.
sink.enable-deleteboolNo-Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link
sink.check-intervalintNo10000check exception with the interval while loading
sink.max-retriesintNo3the max retry times if writing records to database failed
sink.buffer-sizeintNo256 * 1024the buffer size to cache data for stream load.
sink.buffer-countintNo3the buffer count to cache data for stream load.
doris.configmapyes-This option is used to support operations such as insert, delete, and update when automatically generate sql,and supported formats.

Data Type Mapping

Doris Data typeSeaTunnel Data type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
TINYINT
INTINT
SMALLINT
TINYINT
BIGINTBIGINT
INT
SMALLINT
TINYINT
LARGEINTBIGINT
INT
SMALLINT
TINYINT
FLOATFLOAT
DOUBLEDOUBLE
FLOAT
DECIMALDECIMAL
DOUBLE
FLOAT
DATEDATE
DATETIMETIMESTAMP
CHARSTRING
VARCHARSTRING
STRINGSTRING
ARRAYARRAY
MAPMAP
JSONSTRING
HLLNot supported yet
BITMAPNot supported yet
QUANTILE_STATENot supported yet
STRUCTNot supported yet

Supported import data formats

The supported formats include CSV and JSON

Task Example

Simple:

The following example describes writing multiple data types to Doris, and users need to create corresponding tables downstream

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
Doris {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
table.identifier = "test.e2e_table_sink"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}

CDC(Change Data Capture) Event:

This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Doris Sink,FakeSource simulates CDC data with schema, score (int type),Doris needs to create a table sink named test.e2e_table_sink and a corresponding table for it.

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
sex = boolean
number = tinyint
height = float
sight = double
create_time = date
update_time = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [3, "C", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
},
{
kind = DELETE
fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
}
]
}
}

sink {
Doris {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
table.identifier = "test.e2e_table_sink"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}

Use JSON format to import data

sink {
Doris {
fenodes = "e2e_dorisdb:8030"
username = root
password = ""
table.identifier = "test.e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}

Use CSV format to import data

sink {
Doris {
fenodes = "e2e_dorisdb:8030"
username = root
password = ""
table.identifier = "test.e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "csv"
column_separator = ","
}
}
}

Changelog

2.3.0-beta 2022-10-20

  • Add Doris Sink Connector

Next version

  • [Improve] Change Doris Config Prefix 3856

  • [Improve] Refactor some Doris Sink code as well as support 2pc and cdc 4235

提示

PR 4235 is an incompatible modification to PR 3856. Please refer to PR 4235 to use the new Doris connector