跳到主要内容
版本:Next

Snowflake

JDBC Snowflake Sink连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

通过JDBC写入数据。支持批处理模式和流处理模式,支持并发写入。

支持的数据源列表

数据源支持的版本驱动类URLMaven
Snowflake不同依赖版本对应不同的驱动类。net.snowflake.client.jdbc.SnowflakeDriverjdbc❄️//<account_name>.snowflakecomputing.com下载

数据库依赖

请下载支持列表中对应的'Maven'依赖,并将其复制到'$SEATUNNEL_HOME/plugins/jdbc/lib/'工作目录下
例如Snowflake数据源:cp snowflake-connector-java-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

Snowflake 数据类型SeaTunnel 数据类型
BOOLEANBOOLEAN
TINYINT
SMALLINT
BYTEINT
SHORT_TYPE
INT
INTEGER
INT
BIGINTLONG
DECIMAL
NUMERIC
NUMBER
DECIMAL(x,y)
DECIMAL(x,y)(获取指定列的大小>38)DECIMAL(38,18)
REAL
FLOAT4
FLOAT
DOUBLE
DOUBLE PRECISION
FLOAT8
FLOAT
DOUBLE
CHAR
CHARACTER
VARCHAR
STRING
TEXT
VARIANT
OBJECT
STRING
DATEDATE
TIMETIME
DATETIME
TIMESTAMP
TIMESTAMP_LTZ
TIMESTAMP_NTZ
TIMESTAMP_TZ
TIMESTAMP
BINARY
VARBINARY
GEOGRAPHY
GEOMETRY
BYTES

配置选项

名称类型必填默认值描述
urlString-JDBC连接的URL。参考示例:jdbc:snowflake://<account_name>.snowflakecomputing.com
driverString-用于连接远程数据源的JDBC类名,
如果使用Snowflake,值为net.snowflake.client.jdbc.SnowflakeDriver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-使用此SQL将上游输入数据写入数据库。例如INSERT ...query具有更高的优先级
databaseString-使用此databasetable-name自动生成SQL并接收上游输入数据写入数据库。
此选项与query互斥,且具有更高的优先级。
tableString-使用database和此table-name自动生成SQL并接收上游输入数据写入数据库。
此选项与query互斥,且具有更高的优先级。
primary_keysArray-此选项用于在自动生成SQL时支持insertdeleteupdate等操作。
support_upsert_by_query_primary_key_existBooleanfalse选择使用INSERT SQL、UPDATE SQL来处理更新事件(INSERT, UPDATE_AFTER),基于查询主键是否存在。此配置仅在数据库不支持upsert语法时使用。注意:此方法性能较低。
connection_check_timeout_secInt30用于验证连接的操作的等待时间(秒)。
max_retriesInt0提交失败(executeBatch)的重试次数
batch_sizeInt1000对于批处理写入,当缓冲的记录数达到batch_size或时间达到checkpoint.interval时,
数据将被刷新到数据库中
max_commit_attemptsInt3事务提交失败的重试次数
transaction_timeout_secInt-1事务打开后的超时时间,默认为-1(永不超时)。注意,设置超时可能会影响
精确一次语义
auto_commitBooleantrue默认启用自动事务提交
propertiesMap-额外的连接配置参数,当properties和URL中有相同参数时,优先级由驱动程序的
具体实现决定。例如,在MySQL中,properties优先于URL。
common-options-接收器插件通用参数,详情请参考接收器通用选项
enable_upsertBooleantrue通过主键存在启用upsert,如果任务没有键重复数据,将此参数设置为false可以加快数据导入速度

提示

如果未设置partition_column,将以单并发运行,如果设置了partition_column,将根据任务的并发度并行执行。

任务示例

简单示例:

此示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送到JDBC Sink。FakeSource总共生成16行数据(row.num=16),每行有两个字段,name(字符串类型)和age(int类型)。最终目标表test_table中也将有16行数据。在运行此作业之前,您需要在Snowflake数据库中创建数据库test和表test_table。如果您尚未安装和部署SeaTunnel,请按照安装SeaTunnel中的说明进行安装和部署。然后按照使用SeaTunnel Engine快速入门中的说明运行此作业。

# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}
source {
# 这是一个示例源插件,**仅用于测试和演示功能源插件**
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# 如果您想了解更多关于如何配置SeaTunnel的信息,并查看完整的源插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/source
}
transform {

# 如果您想了解更多关于如何配置SeaTunnel的信息,并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/transform-v2
}
sink {
jdbc {
url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
driver = "net.snowflake.client.jdbc.SnowflakeDriver"
user = "root"
password = "123456"
query = "insert into test_table(name,age) values(?,?)"
}
# 如果您想了解更多关于如何配置SeaTunnel的信息,并查看完整的接收器插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/sink
}

CDC(变更数据捕获)事件

我们也支持CDC变更数据。在这种情况下,您需要配置databasetableprimary_keys

sink {
jdbc {
url = "jdbc:snowflake://<account_name>.snowflakecomputing.com"
driver = "net.snowflake.client.jdbc.SnowflakeDriver"
user = "root"
password = "123456"
generate_sink_sql = true


# 您需要同时配置database和table
database = test
table = sink_table
primary_keys = ["id","name"]
}
}