跳到主要内容
版本:2.3.10

Apache Iceberg

Apache Iceberg sink连接器

Iceberg 版本支持

  • 1.4.2

引擎支持

Spark
Flink
SeaTunnel Zeta

描述

Apache Iceberg 目标连接器支持cdc模式、自动建表及表结构变更.

主要特性

支持的数据源信息

数据源依赖项Maven依赖
Iceberghive-exec下载
Iceberglibfb303下载

数据库依赖

为了确保与不同版本的 Hadoop 和 Hive 兼容,项目 pom 文件中的 hive-exec 依赖范围被设置为 provided。因此,如果您使用 Flink 引擎,可能需要将以下 Jar 包添加到 <FLINK_HOME>/lib 目录中;如果您使用的是 Spark 引擎并且已经集成了 Hadoop,则无需添加以下 Jar 包。

hive-exec-xxx.jar
libfb303-xxx.jar

某些版本的 hive-exec 包中不包含 libfb303-xxx.jar,因此您还需要手动导入该 Jar 包。

数据类型映射

SeaTunnel 数据类型Iceberg 数据类型
BOOLEANBOOLEAN
INTINTEGER
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
STRINGSTRING
BYTESFIXED
BINARY
DECIMALDECIMAL
ROWSTRUCT
ARRAYLIST
MAPMAP

Sink 选项

名称类型是否必须默认描述
catalog_namestringyesdefault用户指定的目录名称,默认为default
namespacestringyesdefaultbackend catalog(元数据存储的后端目录)中 Iceberg 数据库的名称,默认为 default
tablestringyes-backend catalog(元数据存储的后端目录)中 Iceberg 表的名称
iceberg.catalog.configmapyes-用于指定初始化 Iceberg Catalog 的属性,这些属性可以参考此文件:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java"
hadoop.configmapno-传递给 Hadoop 配置的属性
iceberg.hadoop-conf-pathstringno-指定core-site.xmlhdfs-site.xmlhive-site.xml 文件的加载路径
case_sensitivebooleannofalse列名匹配时是否区分大小写
iceberg.table.write-propsmapno-传递给 Iceberg 写入器初始化的属性,这些属性具有最高优先级,例如 write.format.defaultwrite.target-file-size-bytes 等设置。具体参数可以参考:'https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java'.
iceberg.table.auto-create-propsmapno-Iceberg 自动建表时指定的配置
iceberg.table.schema-evolution-enabledbooleannofalse设置为 true 时,Iceberg 表可以在同步过程中支持 schema 变更
iceberg.table.primary-keysstringno-用于标识表中一行数据的主键列列表,默认情况下以逗号分隔
iceberg.table.partition-keysstringno-创建表时使用的分区字段列表,默认情况下以逗号分隔
iceberg.table.upsert-mode-enabledbooleannofalse设置为 true 以启用 upsert 模式,默认值为 false
schema_save_modeEnumnoCREATE_SCHEMA_WHEN_NOT_EXISTschema 变更方式, 请参考下面的 schema_save_mode
data_save_modeEnumnoAPPEND_DATA数据写入方式, 请参考下面的 data_save_mode
custom_sqlstringno-自定义 delete 数据的 SQL 语句,用于数据写入方式。例如: delete from ... where ...
iceberg.table.commit-branchstringno-提交的默认分支

任务示例

简单示例:

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
MySQL-CDC {
plugin_output = "customers_mysql_cdc_iceberg"
server-id = 5652
username = "st_user"
password = "seatunnel"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
}

transform {
}

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Hive Catalog:

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Hadoop catalog:

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Multiple table(多表写入)

示例1

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
Iceberg {
...
namespace = "${database_name}_test"
table = "${table_name}_test"
}
}

示例2

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword

table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}

transform {
}

sink {
Iceberg {
...
namespace = "${schema_name}_test"
table = "${table_name}_test"
}
}

变更日志

Change Log
ChangeCommitVersion
[Improve] iceberg options (#8967)https://github.com/apache/seatunnel/commit/82a374ec82.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[Feature][Iceberg] Support read multi-table (#8524)https://github.com/apache/seatunnel/commit/2bfb97e502.3.10
[Improve][Iceberg] Filter catalog table primaryKey is empty (#8413)https://github.com/apache/seatunnel/commit/857aab5e82.3.9
[Improve][Connector-V2] Reduce the create times of iceberg sink writer (#8155)https://github.com/apache/seatunnel/commit/45a7a715a2.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Feature][Iceberg] Support custom delete sql for sink savemode (#8094)https://github.com/apache/seatunnel/commit/29ca928c32.3.9
[Improve][Connector-V2] Reduce the request times of iceberg load table (#8149)https://github.com/apache/seatunnel/commit/555f5eb402.3.9
[Feature][Core] Support cdc task ddl restore for zeta (#7463)https://github.com/apache/seatunnel/commit/8e322281e2.3.9
[Improve][Iceberg] Support table comment for catalog (#7936)https://github.com/apache/seatunnel/commit/72ab38f312.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Fix][Connector-V2] Fix iceberg throw java: package sun.security.krb5 does not exist when use jdk 11 (#7734)https://github.com/apache/seatunnel/commit/116af4feb2.3.8
[Hotfix][Connector-V2] Release resources when task is closed for iceberg sinkwriter (#7729)https://github.com/apache/seatunnel/commit/ff281183b2.3.8
[Fix][Connector-V2] Fixed iceberg sink can not handle uppercase fields (#7660)https://github.com/apache/seatunnel/commit/b7be0cb4a2.3.8
[Hotfix][CDC] Fix ddl duplicate execution error when config multi_table_sink_replica (#7634)https://github.com/apache/seatunnel/commit/23ab3edbb2.3.8
[Improve][Iceberg] Add savemode create table primaryKey testcase (#7641)https://github.com/apache/seatunnel/commit/6b36f90f42.3.8
[Hotfix] Fix iceberg missing column comment when savemode create table (#7608)https://github.com/apache/seatunnel/commit/b35bd94bf2.3.8
[Improve][Connector-V2] Remove hard code iceberg table format version (#7500)https://github.com/apache/seatunnel/commit/f49b263e62.3.8
[Improve][API] Move catalog open to SaveModeHandler (#7439)https://github.com/apache/seatunnel/commit/8c2c5c79a2.3.8
[Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246)https://github.com/apache/seatunnel/commit/e3001207c2.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f64462.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[Bug][Connector-Iceberg]fix create iceberg v2 table with pks (#6895)https://github.com/apache/seatunnel/commit/40d2c1b212.3.6
[Feature][Connector-V2] Iceberg-sink supports writing data to branches (#6697)https://github.com/apache/seatunnel/commit/e3103535c2.3.6
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a2.3.5
[Improve] Add SaveMode log of process detail (#6375)https://github.com/apache/seatunnel/commit/b0d70ce222.3.5
[Improve][Zeta] Add classloader cache mode to fix metaspace leak (#6355)https://github.com/apache/seatunnel/commit/9c3c2f1832.3.5
[Improve][API] Unify type system api(data & type) (#5872)https://github.com/apache/seatunnel/commit/b38c7edcc2.3.5
[Feature] Supports iceberg sink #6198 (#6265)https://github.com/apache/seatunnel/commit/18d3e86192.3.5
[Test][E2E] Add thread leak check for connector (#5773)https://github.com/apache/seatunnel/commit/1f2f3fc5f2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[BUG][Connector-V2] Iceberg source lost data with parallelism option (#5732)https://github.com/apache/seatunnel/commit/7f3b4be072.3.4
[Dependency]Bump org.apache.avro:avro in /seatunnel-connectors-v2/connector-iceberg (#5582)https://github.com/apache/seatunnel/commit/13753a9272.3.4
[Improve][Pom] Add junit4 to the root pom (#5611)https://github.com/apache/seatunnel/commit/7b4f7db2a2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Doc][Iceberg] Improved iceberg documentation (#5335)https://github.com/apache/seatunnel/commit/659a68a0b2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf72.3.3
[Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)https://github.com/apache/seatunnel/commit/64760eed42.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[Improve][SourceConnector] Unifie Iceberg source fields to schema (#3959)https://github.com/apache/seatunnel/commit/20e1255fa2.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a132.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)https://github.com/apache/seatunnel/commit/e248435152.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba2.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Feature][Connector-V2][Iceberg] Modify the scope of flink-shaded-hadoop-2 to provided to be compatible with hadoop3.x (#3046)https://github.com/apache/seatunnel/commit/b38c507892.3.0
[Feature][Connector V2] expose configurable options in Iceberg (#3394)https://github.com/apache/seatunnel/commit/bd9a313de2.3.0
[Improve][Connector][Iceberg] Improve code. (#3065)https://github.com/apache/seatunnel/commit/9f38e3da72.3.0-beta
[Code-Improve][Iceberg] Use automatic resource management to replace 'try - finally' code block. (#2909)https://github.com/apache/seatunnel/commit/b7f6407242.3.0-beta
[Feature][Connector-V2] Add iceberg source connector (#2615)https://github.com/apache/seatunnel/commit/ffc6088a72.2.0-beta