跳到主要内容
版本:Next

Vertica

JDBC Vertica Sink 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

描述

通过 JDBC 写入数据。支持批处理和流处理模式,支持并发写入,支持精确一次语义(使用 XA 事务保证)。

使用依赖

  1. 需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/plugins/ 中。

对于 SeaTunnel Zeta 引擎

  1. 需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/lib/ 中。

主要特性

使用 Xa 事务 来保证 精确一次。因此仅支持支持 Xa 事务 的数据库。可以通过设置 is_exactly_once=true 来启用。

支持的数据源信息

数据源支持的版本驱动类名URL 格式Maven 依赖
Vertica不同依赖版本有不同的驱动类名com.vertica.jdbc.Driverjdbc:vertica://localhost:5433/vertica下载

数据库依赖

请下载支持列表中对应的 'Maven' 依赖,并将其复制到 $SEATUNNEL_HOME/plugins/jdbc/lib/ 工作目录中。
例如 Vertica 数据源:cp vertica-jdbc-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

Vertica 数据类型SeaTunnel 数据类型
BIT(1)
INT UNSIGNED
BOOLEAN
TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR
INT
INT UNSIGNED
INTEGER UNSIGNED
BIGINT
BIGINT
BIGINT UNSIGNEDDECIMAL(20,0)
DECIMAL(x,y)(获取指定列的列大小 <38)DECIMAL(x,y)
DECIMAL(x,y)(获取指定列的列大小 >38)DECIMAL(38,18)
DECIMAL UNSIGNEDDECIMAL((获取指定列的列大小)+1,
(获取指定列的小数点右侧的位数)))
FLOAT
FLOAT UNSIGNED
FLOAT
DOUBLE
DOUBLE UNSIGNED
DOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
JSON
STRING
DATEDATE
TIMETIME
DATETIME
TIMESTAMP
TIMESTAMP
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BINARY
VARBINAR
BIT(n)
BYTES
GEOMETRY
UNKNOWN
尚未支持

接收器选项

名称类型是否必填默认值描述
urlString-JDBC 连接的 URL。参考示例:jdbc:vertica://localhost:5433/vertica
driverString-用于连接远程数据源的 JDBC 类名,如果使用 Vertica,值为 com.vertica.jdbc.Driver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 优先级更高。
databaseString-使用此 databasetable-name 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,且优先级更高。
tableString-使用 database 和此 table-name 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,且优先级更高。
primary_keysArray-此选项用于在自动生成 SQL 时支持 insertdeleteupdate 等操作。
support_upsert_by_query_primary_key_existBooleanfalse选择使用 INSERT SQL、UPDATE SQL 来处理更新事件(INSERT, UPDATE_AFTER),基于查询主键是否存在。此配置仅在数据库不支持 upsert 语法时使用。注意:此方法性能较低。
connection_check_timeout_secInt30用于验证连接完成的数据库操作的等待时间(秒)。
max_retriesInt0提交失败(executeBatch)的重试次数。
batch_sizeInt1000对于批量写入,当缓冲的记录数达到 batch_size 或时间达到 checkpoint.interval 时,数据将被刷新到数据库中。
is_exactly_onceBooleanfalse是否启用精确一次语义,将使用 Xa 事务。如果启用,需要设置 xa_data_source_class_name
generate_sink_sqlBooleanfalse根据要写入的数据库表生成 SQL 语句。
xa_data_source_class_nameString-数据库驱动的 XA 数据源类名,例如 Vertica 为 com.vertical.cj.jdbc.VerticalXADataSource,其他数据源请参考附录。
max_commit_attemptsInt3事务提交失败的重试次数。
transaction_timeout_secInt-1事务打开后的超时时间,默认为 -1(永不超时)。注意:设置超时可能会影响精确一次语义。
auto_commitBooleantrue默认启用自动事务提交。
propertiesMap-额外的连接配置参数,当 properties 和 URL 中有相同的参数时,优先级由驱动的具体实现决定。例如,在 MySQL 中,properties 优先于 URL。
common-options-接收器插件通用参数,详情请参考 Sink Common Options
enable_upsertBooleantrue通过主键存在启用 upsert。如果任务中没有键重复数据,将此参数设置为 false 可以加快数据导入速度。

提示

如果未设置 partition_column,将以单并发运行;如果设置了 partition_column,将根据任务的并发度并行执行。

任务示例

简单示例:

此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并发送到 JDBC Sink。FakeSource 总共生成 16 行数据(row.num=16),每行有两个字段,name(字符串类型)和 age(int 类型)。最终目标表 test_table 中也将有 16 行数据。在运行此任务之前,您需要在 Vertica 中创建数据库 test 和表 test_table。如果您尚未安装和部署 SeaTunnel,请按照 安装 SeaTunnel 中的说明进行安装和部署。然后按照 使用 SeaTunnel Engine 快速开始 中的说明运行此任务。

# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}

source {
# 这是一个示例源插件,**仅用于测试和演示功能**
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# 如果想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的源插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/source
}

transform {
# 如果想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/transform-v2
}

sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
# 如果想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的接收器插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/sink
}

生成接收器 SQL

此示例不需要编写复杂的 SQL 语句,您可以通过配置数据库名称和表名称自动生成插入语句。

sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
user = "root"
password = "123456"
# 根据数据库表名自动生成 SQL 语句
generate_sink_sql = true
database = test
table = test_table
}
}

精确一次:

对于精确写入场景,我们保证精确一次语义。

sink {
jdbc {
url = "jdbc:vertica://localhost:5433/vertica"
driver = "com.vertica.jdbc.Driver"
max_retries = 0
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
is_exactly_once = "true"
xa_data_source_class_name = "com.vertical.cj.jdbc.VerticalXADataSource"
}
}