Databend
Databend sink connector
Supported Engines
Spark
Flink
SeaTunnel Zeta
Key Features
Description
A sink connector for writing data to Databend. Supports both batch and streaming processing modes. The Databend sink internally implements bulk data import through stage attachment.
Dependencies
For Spark/Flink
- You need to download the Databend JDBC driver jar package and add it to the directory
${SEATUNNEL_HOME}/plugins/
.
For SeaTunnel Zeta
- You need to download the Databend JDBC driver jar package and add it to the directory
${SEATUNNEL_HOME}/lib/
.
Sink Options
Name | Type | Required | Default Value | Description |
---|---|---|---|---|
url | String | Yes | - | Databend JDBC connection URL |
username | String | Yes | - | Databend database username |
password | String | Yes | - | Databend database password |
database | String | No | - | Databend database name, defaults to the database name specified in the connection URL |
table | String | No | - | Databend table name |
batch_size | Integer | No | 1000 | Number of records for batch writing |
auto_commit | Boolean | No | true | Whether to auto-commit transactions |
max_retries | Integer | No | 3 | Maximum retry attempts on write failure |
schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save mode |
data_save_mode | Enum | No | APPEND_DATA | Data save mode |
custom_sql | String | No | - | Custom write SQL, typically used for complex write scenarios |
execute_timeout_sec | Integer | No | 300 | SQL execution timeout (seconds) |
jdbc_config | Map | No | - | Additional JDBC connection configuration, such as connection timeout parameters |
schema_save_mode[Enum]
Before starting the synchronization task, choose different processing schemes for existing table structures.
Option descriptions:
RECREATE_SCHEMA
: Create when table doesn't exist, drop and recreate when table exists.
CREATE_SCHEMA_WHEN_NOT_EXIST
: Create when table doesn't exist, skip when table exists.
ERROR_WHEN_SCHEMA_NOT_EXIST
: Report error when table doesn't exist.
IGNORE
: Ignore table processing.
data_save_mode[Enum]
Before starting the synchronization task, choose different processing schemes for existing data on the target side.
Option descriptions:
DROP_DATA
: Retain database structure and delete data.
APPEND_DATA
: Retain database structure and data.
CUSTOM_PROCESSING
: User-defined processing.
ERROR_WHEN_DATA_EXISTS
: Report error when data exists.
Data Type Mapping
SeaTunnel Data Type | Databend Data Type |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
STRING | STRING |
BYTES | VARBINARY |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
Task Examples
Simple Example
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 10
schema = {
fields {
name = string
age = int
score = double
}
}
}
}
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
batch_size = 1000
}
}
Writing with Custom SQL
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
custom_sql = "INSERT INTO default.target_table(name, age, score) VALUES(?, ?, ?)"
}
}
Using Schema Save Mode
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
schema_save_mode = "RECREATE_SCHEMA"
data_save_mode = "APPEND_DATA"
}
}
Related Links
Changelog
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Connector-V2]Support Databend sink/source (#9331) | https://github.com/apache/seatunnel/pull/9331 | TODO |