Skip to main content
Version: 2.3.2

JDBC

JDBC sink connector

Description​

Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics (using XA transaction guarantee).

tip

Warn: for license compliance, you have to provide database driver yourself, copy to $SEATNUNNEL_HOME/lib/ directory in order to make them work.

e.g. If you use MySQL, should download and copy mysql-connector-java-xxx.jar to $SEATNUNNEL_HOME/lib/. For Spark/Flink, you should also copy it to $SPARK_HOME/jars/ or $FLINK_HOME/lib/.

Key features​

Use Xa transactions to ensure exactly-once. So only support exactly-once for the database which is support Xa transactions. You can set is_exactly_once=true to enable it.

Options​

nametyperequireddefault value
urlStringYes-
driverStringYes-
userStringNo-
passwordStringNo-
queryStringNo-
databaseStringNo-
tableStringNo-
primary_keysArrayNo-
support_upsert_by_query_primary_key_existBooleanNofalse
connection_check_timeout_secIntNo30
max_retriesIntNo0
batch_sizeIntNo1000
batch_interval_msIntNo1000
is_exactly_onceBooleanNofalse
generate_sink_sqlBooleanNofalse
xa_data_source_class_nameStringNo-
max_commit_attemptsIntNo3
transaction_timeout_secIntNo-1
auto_commitBooleanNotrue
common-optionsno-

driver [string]​

The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.

user [string]​

userName

password [string]​

password

url [string]​

The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/test

query [string]​

Use this sql write upstream input datas to database. e.g INSERT ...

database [string]​

Use this database and table-name auto-generate sql and receive upstream input datas write to database.

This option is mutually exclusive with query and has a higher priority.

table [string]​

Use database and this table-name auto-generate sql and receive upstream input datas write to database.

This option is mutually exclusive with query and has a higher priority.

primary_keys [array]​

This option is used to support operations such as insert, delete, and update when automatically generate sql.

support_upsert_by_query_primary_key_exist [boolean]​

Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupported upsert syntax. Note: that this method has low performance

connection_check_timeout_sec [int]​

The time in seconds to wait for the database operation used to validate the connection to complete.

max_retries[int]​

The number of retries to submit failed (executeBatch)

batch_size[int]​

For batch writing, when the number of buffered records reaches the number of batch_size or the time reaches batch_interval_ms , the data will be flushed into the database

batch_interval_ms[int]​

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms , the data will be flushed into the database

is_exactly_once[boolean]​

Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set xa_data_source_class_name.

generate_sink_sql[boolean]​

Generate sql statements based on the database table you want to write to

xa_data_source_class_name[string]​

The xa data source class name of the database Driver, for example, mysql is com.mysql.cj.jdbc.MysqlXADataSource, and please refer to appendix for other data sources

max_commit_attempts[int]​

The number of retries for transaction commit failures

transaction_timeout_sec[int]​

The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect exactly-once semantics

auto_commit [boolean]​

Automatic transaction commit is enabled by default

common options​

Sink plugin common parameters, please refer to Sink Common Options for details

tips​

In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup : 1 postgres needs to set max_prepared_transactions > 1 such as ALTER SYSTEM set max_prepared_transactions to 10. 2 mysql version need >= 8.0.29 and Non-root users need to grant XA_RECOVER_ADMIN permissions. such as grant XA_RECOVER_ADMIN on test_db.* to 'user1'@'%'. 3 mysql can try to add rewriteBatchedStatements=true parameter in url for better performance.

appendix​

there are some reference value for params above.

datasourcedriverurlxa_data_source_class_namemaven
MySQLcom.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306/testcom.mysql.cj.jdbc.MysqlXADataSourcehttps://mvnrepository.com/artifact/mysql/mysql-connector-java
PostgreSQLorg.postgresql.Driverjdbc:postgresql://localhost:5432/postgresorg.postgresql.xa.PGXADataSourcehttps://mvnrepository.com/artifact/org.postgresql/postgresql
DMdm.jdbc.driver.DmDriverjdbc:dm://localhost:5236dm.jdbc.driver.DmdbXADataSourcehttps://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
Phoenixorg.apache.phoenix.queryserver.client.Driverjdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF/https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
SQL Servercom.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433com.microsoft.sqlserver.jdbc.SQLServerXADataSourcehttps://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
Oracleoracle.jdbc.OracleDriverjdbc:oracle:thin:@localhost:1521/xepdb1oracle.jdbc.xa.OracleXADataSourcehttps://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
sqliteorg.sqlite.JDBCjdbc:sqlite:test.db/https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
GBase8acom.gbase.jdbc.Driverjdbc:gbase://e2e_gbase8aDb:5258/test/https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
StarRockscom.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306/test/https://mvnrepository.com/artifact/mysql/mysql-connector-java
db2com.ibm.db2.jcc.DB2Driverjdbc:db2://localhost:50000/testdbcom.ibm.db2.jcc.DB2XADataSourcehttps://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
saphanacom.sap.db.jdbc.Driverjdbc:sap://localhost:39015/https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
Doriscom.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306/test/https://mvnrepository.com/artifact/mysql/mysql-connector-java
teradatacom.teradata.jdbc.TeraDriverjdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test/https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
Redshiftcom.amazon.redshift.jdbc42.Driverjdbc:redshift://localhost:5439/testdbcom.amazon.redshift.xa.RedshiftXADataSourcehttps://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
Snowflakenet.snowflake.client.jdbc.SnowflakeDriverjdbc❄️//<account_name>.snowflakecomputing.com/https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
Verticacom.vertica.jdbc.Driverjdbc:vertica://localhost:5433/https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar

Example​

Simple

jdbc {
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}

Exactly-once

jdbc {

url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"

max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"

is_exactly_once = "true"

xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
}

CDC(Change data capture) event

sink {
jdbc {
url = "jdbc:mysql://localhost:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"

database = "sink_database"
table = "sink_table"
primary_keys = ["key1", "key2", ...]
}
}

Changelog​

2.2.0-beta 2022-09-26​

  • Add Console Sink Connector

2.3.0-beta 2022-10-20​

  • [BugFix] Fix JDBC split exception (2904)
  • [Feature] Support Phoenix JDBC Sink (2499)
  • [Feature] Support SQL Server JDBC Sink (2646)
  • [Feature] Support Oracle JDBC Sink (2550)
  • [Feature] Support StarRocks JDBC Sink (3060)
  • [Feature] Support DB2 JDBC Sink (2410)

next version​

  • [Feature] Support CDC write DELETE/UPDATE/INSERT events (3378)
  • [Feature] Support Teradata JDBCγ€€Sink (3362)
  • [Feature] Support Sqlite JDBC Sink (3089)
  • [Feature] Support CDC write DELETE/UPDATE/INSERT events (3378)
  • [Feature] Support Doris JDBC Sink
  • [Feature] Support Redshift JDBC Sink(#3615)
  • [Improve] Add config item enable upsert by query(#3708)
  • [Improve] Add database field to sink config(#4199)
  • [Improve] Add Vertica connector(#4303)