跳到主要内容
版本:2.3.9

Apache Iceberg

Apache Iceberg sink连接器

Iceberg 版本支持

  • 1.4.2

引擎支持

Spark
Flink
SeaTunnel Zeta

描述

Apache Iceberg 目标连接器支持cdc模式、自动建表及表结构变更.

主要特性

支持的数据源信息

数据源依赖项Maven依赖
Iceberghive-exec下载
Iceberglibfb303下载

数据库依赖

为了确保与不同版本的 Hadoop 和 Hive 兼容,项目 pom 文件中的 hive-exec 依赖范围被设置为 provided。因此,如果您使用 Flink 引擎,可能需要将以下 Jar 包添加到 <FLINK_HOME>/lib 目录中;如果您使用的是 Spark 引擎并且已经集成了 Hadoop,则无需添加以下 Jar 包。

hive-exec-xxx.jar
libfb303-xxx.jar

某些版本的 hive-exec 包中不包含 libfb303-xxx.jar,因此您还需要手动导入该 Jar 包。

数据类型映射

SeaTunnel 数据类型Iceberg 数据类型
BOOLEANBOOLEAN
INTINTEGER
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
STRINGSTRING
BYTESFIXED
BINARY
DECIMALDECIMAL
ROWSTRUCT
ARRAYLIST
MAPMAP

Sink 选项

名称类型是否必须默认描述
catalog_namestringyesdefault用户指定的目录名称,默认为default
namespacestringyesdefaultbackend catalog(元数据存储的后端目录)中 Iceberg 数据库的名称,默认为 default
tablestringyes-backend catalog(元数据存储的后端目录)中 Iceberg 表的名称
iceberg.catalog.configmapyes-用于指定初始化 Iceberg Catalog 的属性,这些属性可以参考此文件:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java"
hadoop.configmapno-传递给 Hadoop 配置的属性
iceberg.hadoop-conf-pathstringno-指定core-site.xmlhdfs-site.xmlhive-site.xml 文件的加载路径
case_sensitivebooleannofalse列名匹配时是否区分大小写
iceberg.table.write-propsmapno-传递给 Iceberg 写入器初始化的属性,这些属性具有最高优先级,例如 write.format.defaultwrite.target-file-size-bytes 等设置。具体参数可以参考:'https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java'.
iceberg.table.auto-create-propsmapno-Iceberg 自动建表时指定的配置
iceberg.table.schema-evolution-enabledbooleannofalse设置为 true 时,Iceberg 表可以在同步过程中支持 schema 变更
iceberg.table.primary-keysstringno-用于标识表中一行数据的主键列列表,默认情况下以逗号分隔
iceberg.table.partition-keysstringno-创建表时使用的分区字段列表,默认情况下以逗号分隔
iceberg.table.upsert-mode-enabledbooleannofalse设置为 true 以启用 upsert 模式,默认值为 false
schema_save_modeEnumnoCREATE_SCHEMA_WHEN_NOT_EXISTschema 变更方式, 请参考下面的 schema_save_mode
data_save_modeEnumnoAPPEND_DATA数据写入方式, 请参考下面的 data_save_mode
custom_sqlstringno-自定义 delete 数据的 SQL 语句,用于数据写入方式。例如: delete from ... where ...
iceberg.table.commit-branchstringno-提交的默认分支

任务示例

简单示例:

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
MySQL-CDC {
plugin_output = "customers_mysql_cdc_iceberg"
server-id = 5652
username = "st_user"
password = "seatunnel"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
}

transform {
}

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Hive Catalog:

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Hadoop catalog:

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Multiple table(多表写入)

示例1

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
Iceberg {
...
namespace = "${database_name}_test"
table = "${table_name}_test"
}
}

示例2

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword

table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}

transform {
}

sink {
Iceberg {
...
namespace = "${schema_name}_test"
table = "${table_name}_test"
}
}

Changelog(变更日志)

2.3.4-SNAPSHOT 2024-01-18

  • Add Iceberg Sink Connector

next version