Jdbc
JDBC sink connector
Description
Write data through jdbc
Engine Supported and plugin name
- Spark: Jdbc
- Flink: Jdbc
Options
- Spark
- Flink
| name | type | required | default value |
|---|---|---|---|
| driver | string | yes | - |
| url | string | yes | - |
| user | string | yes | - |
| password | string | yes | - |
| dbTable | string | yes | - |
| saveMode | string | no | update |
| useSsl | string | no | false |
| customUpdateStmt | string | no | - |
| duplicateIncs | string | no | - |
| showSql | string | no | true |
url [string]
The URL of the JDBC connection. Refer to a case: jdbc:mysql://localhost/dbName
user [string]
username
password [string]
user password
dbTable [string]
Sink table name, if the table does not exist, it will be created.
saveMode [string]
Storage mode, add mode update , perform data overwrite in a specified way when inserting data key conflicts
Basic mode, currently supports overwrite , append , ignore and error . For the specific meaning of each mode, see save-modes
useSsl [string]
Configure when saveMode is specified as update , whether to enable ssl, the default value is false
isolationLevel [string]
The transaction isolation level, which applies to current connection. The default value is READ_UNCOMMITTED
customUpdateStmt [string]
Configure when saveMode is specified as update , which is used to specify the update statement template for key conflicts.
If customUpdateStmt is empty, the sql will auto-generate for all columns, else use the sql which refer to the usage of
INSERT INTO table (...) values (...) ON DUPLICATE KEY UPDATE... of mysql , use placeholders or fixed values in values
tips: the tableName of sql should be consistent with the dbTable.
duplicateIncs [string]
Configure when saveMode is specified as update , and when the specified key conflicts, the value is updated to the existing value plus the original value
showSql
Configure when saveMode is specified as update , whether to show sql
| name | type | required | default value |
|---|---|---|---|
| driver | string | yes | - |
| url | string | yes | - |
| username | string | yes | - |
| password | string | no | - |
| query | string | yes | - |
| batch_size | int | no | - |
| source_table_name | string | yes | - |
| common-options | string | no | - |
| parallelism | int | no | - |
| pre_sql | string | no | - |
| post_sql | string | no | - |
| ignore_post_sql_exceptions | boolean | no | - |
driver [string]
Driver name, such as com.mysql.cj.jdbc.Driver for MySQL.
Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar to $FLINK_HOME/lib for Standalone.
url [string]
The URL of the JDBC connection. Such as: jdbc:mysql://localhost:3306/test
username [string]
username
password [string]
password
query [string]
Insert statement
batch_size [int]
Number of writes per batch
parallelism [int]
The parallelism of an individual operator, for JdbcSink.
pre_sql [string]
This sql can be executed before output.
post_sql [string]
This sql can be executed after output, and just supports for batch job.
ignore_post_sql_exceptions [boolean]
Whether to ignore post_sql exceptions.
common options [string]
Sink plugin common parameters, please refer to Sink Plugin for details
Examples
- Spark
- Flink
jdbc {
driver = "com.mysql.cj.jdbc.Driver",
saveMode = "update",
url = "jdbc:mysql://ip:3306/database",
user = "userName",
password = "***********",
dbTable = "tableName",
customUpdateStmt = "INSERT INTO table (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
}
Insert data through JDBC
jdbc {
driver = "com.mysql.cj.jdbc.Driver",
saveMode = "update",
truncate = "true",
url = "jdbc:mysql://ip:3306/database",
user = "userName",
password = "***********",
dbTable = "tableName",
customUpdateStmt = "INSERT INTO tableName (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
jdbc.connect_timeout = 10000
jdbc.socket_timeout = 10000
}
Timeout config
JdbcSink {
source_table_name = fake
driver = com.mysql.cj.jdbc.Driver
url = "jdbc:mysql://localhost/test"
username = root
query = "insert into test(name,age) values(?,?)"
batch_size = 2
}