Databend
Databend sink connector
Supported Engines
Spark
Flink
SeaTunnel Zeta
Key Features
Description
A sink connector for writing data to Databend. Supports both batch and streaming processing modes. The Databend sink internally implements bulk data import through stage attachment.
Dependencies
For Spark/Flink
- You need to download the Databend JDBC driver jar package and add it to the directory
${SEATUNNEL_HOME}/plugins/.
For SeaTunnel Zeta
- You need to download the Databend JDBC driver jar package and add it to the directory
${SEATUNNEL_HOME}/lib/.
Sink Options
| Name | Type | Required | Default Value | Description |
|---|---|---|---|---|
| url | String | Yes | - | Databend JDBC connection URL |
| username | String | Yes | - | Databend database username |
| password | String | Yes | - | Databend database password |
| database | String | No | - | Databend database name, defaults to the database name specified in the connection URL |
| table | String | No | - | Databend table name |
| batch_size | Integer | No | 1000 | Number of records for batch writing |
| auto_commit | Boolean | No | true | Whether to auto-commit transactions |
| max_retries | Integer | No | 3 | Maximum retry attempts on write failure |
| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save mode |
| data_save_mode | Enum | No | APPEND_DATA | Data save mode |
| custom_sql | String | No | - | Custom write SQL, typically used for complex write scenarios |
| execute_timeout_sec | Integer | No | 300 | SQL execution timeout (seconds) |
| jdbc_config | Map | No | - | Additional JDBC connection configuration, such as connection timeout parameters |
schema_save_mode [Enum]
Before starting the synchronization task, choose different processing schemes for existing table structures.
Option descriptions:
RECREATE_SCHEMA: Create when table doesn't exist, drop and recreate when table exists.
CREATE_SCHEMA_WHEN_NOT_EXIST: Create when table doesn't exist, skip when table exists.
ERROR_WHEN_SCHEMA_NOT_EXIST: Report error when table doesn't exist.
IGNORE: Ignore table processing.
data_save_mode [Enum]
Before starting the synchronization task, choose different processing schemes for existing data on the target side.
Option descriptions:
DROP_DATA: Retain database structure and delete data.
APPEND_DATA: Retain database structure and data.
CUSTOM_PROCESSING: User-defined processing.
ERROR_WHEN_DATA_EXISTS: Report error when data exists.
Data Type Mapping
| SeaTunnel Data Type | Databend Data Type |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| STRING | STRING |
| BYTES | VARBINARY |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
Task Examples
Simple Example
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 10
schema = {
fields {
name = string
age = int
score = double
}
}
}
}
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
batch_size = 1000
}
}
Writing with Custom SQL
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
custom_sql = "INSERT INTO default.target_table(name, age, score) VALUES(?, ?, ?)"
}
}
Using Schema Save Mode
sink {
Databend {
url = "jdbc:databend://localhost:8000"
username = "root"
password = ""
database = "default"
table = "target_table"
schema_save_mode = "RECREATE_SCHEMA"
data_save_mode = "APPEND_DATA"
}
}
Related Links
Changelog
Change Log
| Change | Commit | Version |
|---|---|---|
| [Feature][Connector-V2] Support databend source/sink connector (#9331) | https://github.com/apache/seatunnel/commit/2f96f2e46c | 2.3.12 |