Cloudberry
JDBC Cloudberry Sink Connector
Support Those Engines
Spark
Flink
SeaTunnel Zeta
Description
Write data through JDBC. Cloudberry currently does not have its own native driver. It uses PostgreSQL's driver for connectivity and follows PostgreSQL's implementation.
Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics (using XA transaction guarantee).
Using Dependency
For Spark/Flink Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/
.
For SeaTunnel Zeta Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/lib/
.
Key Features
Use
Xa transactions
to ensureexactly-once
. So only supportexactly-once
for the database which is supportXa transactions
. You can setis_exactly_once=true
to enable it.
Supported DataSource Info
Datasource | Supported Versions | Driver | Url | Maven |
---|---|---|---|---|
Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | Download |
Database Dependency
Please download the PostgreSQL driver jar and copy it to the '$SEATUNNEL_HOME/plugins/jdbc/lib/' working directory
For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
Data Type Mapping
Cloudberry uses PostgreSQL's data type implementation. Please refer to PostgreSQL documentation for data type compatibility and mappings.
Options
Cloudberry connector uses the same options as PostgreSQL. For detailed configuration options, please refer to the PostgreSQL documentation.
Key options include:
- url (required): The JDBC connection URL
- driver (required): The driver class name (org.postgresql.Driver)
- user/password: Authentication credentials
- query or database/table combination: What data to write and how
- is_exactly_once: Enable exactly-once semantics with XA transactions
- batch_size: Control batch writing behavior
Task Example
Simple:
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
jdbc {
url = "jdbc:postgresql://localhost:5432/cloudberrydb"
driver = "org.postgresql.Driver"
user = "dbadmin"
password = "password"
query = "insert into test_table(name,age) values(?,?)"
}
}
Generate Sink SQL
sink {
Jdbc {
url = "jdbc:postgresql://localhost:5432/cloudberrydb"
driver = "org.postgresql.Driver"
user = "dbadmin"
password = "password"
generate_sink_sql = true
database = "mydb"
table = "public.test_table"
}
}
Exactly-once:
sink {
jdbc {
url = "jdbc:postgresql://localhost:5432/cloudberrydb"
driver = "org.postgresql.Driver"
user = "dbadmin"
password = "password"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
}
}
CDC(Change Data Capture) Event
sink {
jdbc {
url = "jdbc:postgresql://localhost:5432/cloudberrydb"
driver = "org.postgresql.Driver"
user = "dbadmin"
password = "password"
generate_sink_sql = true
database = "mydb"
table = "sink_table"
primary_keys = ["id","name"]
field_ide = UPPERCASE
}
}
Save mode function
sink {
Jdbc {
url = "jdbc:postgresql://localhost:5432/cloudberrydb"
driver = "org.postgresql.Driver"
user = "dbadmin"
password = "password"
generate_sink_sql = true
database = "mydb"
table = "public.test_table"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}
For more detailed examples and options, please refer to the PostgreSQL connector documentation.
Changelog
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Connector] Add Apache Cloudberry Support (#8985) | https://github.com/apache/seatunnel/commit/b6f82c1 | dev |