Apache Iceberg
Apache Iceberg sink连接器
Iceberg 版本支持
- 1.6.1
引擎支持
Spark
Flink
SeaTunnel Zeta
描述
Apache Iceberg 目标连接器支持cdc模式、自动建表及表结构变更.
主要特性
支持的数据源信息
数据源 | 依赖项 | Maven依赖 |
---|---|---|
Iceberg | hive-exec | 下载 |
Iceberg | libfb303 | 下载 |
数据库依赖
为了确保与不同版本的 Hadoop 和 Hive 兼容,项目 pom 文件中的 hive-exec 依赖范围被设置为 provided。因此,如果您使用 Flink 引擎,可能需要将以下 Jar 包添加到 <FLINK_HOME>/lib 目录中;如果您使用的是 Spark 引擎并且已经集成了 Hadoop,则无需添加以下 Jar 包。
hive-exec-xxx.jar
libfb303-xxx.jar
某些版本的 hive-exec 包中不包含 libfb303-xxx.jar,因此您还需要手动导入该 Jar 包。
数据类型映射
SeaTunnel 数据类型 | Iceberg 数据类型 |
---|---|
BOOLEAN | BOOLEAN |
INT | INTEGER |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
STRING | STRING |
BYTES | FIXED BINARY |
DECIMAL | DECIMAL |
ROW | STRUCT |
ARRAY | LIST |
MAP | MAP |
Sink 选项
名称 | 类型 | 是否必须 | 默认 | 描述 |
---|---|---|---|---|
catalog_name | string | yes | default | 用户指定的目录名称,默认为default |
namespace | string | yes | default | backend catalog(元数据存储的后端目录)中 Iceberg 数据库的名称,默认为 default |
table | string | yes | - | backend catalog(元数据存储的后端目录)中 Iceberg 表的名称 |
iceberg.catalog.config | map | yes | - | 用于指定初始化 Iceberg Catalog 的属性,这些属性可以参考此文件:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java" |
hadoop.config | map | no | - | 传递给 Hadoop 配置的属性 |
iceberg.hadoop-conf-path | string | no | - | 指定core-site.xml 、hdfs-site.xml 、hive-site.xml 文件的加载路径 |
case_sensitive | boolean | no | false | 列名匹配时是否区分大小写 |
iceberg.table.write-props | map | no | - | 传递给 Iceberg 写入器初始化的属性,这些属性具有最高优先级,例如 write.format.default 、write.target-file-size-bytes 等设置。具体参数可以参考:'https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java'. |
iceberg.table.auto-create-props | map | no | - | Iceberg 自动建表时指定的配置 |
iceberg.table.schema-evolution-enabled | boolean | no | false | 设置为 true 时,Iceberg 表可以在同步过程中支持 schema 变更 |
iceberg.table.primary-keys | string | no | - | 用于标识表中一行数据的主键列列表,默认情况下以逗号分隔 |
iceberg.table.partition-keys | string | no | - | 创建表时使用的分区字段列表,默认情况下以逗号分隔 |
iceberg.table.upsert-mode-enabled | boolean | no | false | 设置为 true 以启用 upsert 模式,默认值为 false |
schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema 变更方式, 请参考下面的 schema_save_mode |
data_save_mode | Enum | no | APPEND_DATA | 数据写入方式, 请参考下面的 data_save_mode |
custom_sql | string | no | - | 自定义 delete 数据的 SQL 语句,用于数据写入方式。例如: delete from ... where ... |
iceberg.table.commit-branch | string | no | - | 提交的默认分支 |
任务示例
简单示例
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MySQL-CDC {
plugin_output = "customers_mysql_cdc_iceberg"
server-id = 5652
username = "st_user"
password = "seatunnel"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
}
transform {
}
sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}
Hive Catalog
sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}
Hadoop catalog
sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}
AWS S3 Tables REST Catalog
Amazon S3 表类数据存储服务提供针对分析工作负载进行优化的 S3 存储,其功能旨在持续提高查询性能并降低表的存储成本。S3 表类数据存储服务专为存储表数据而设计,例如每日购买交易、流传感器数据或广告展示次数。表数据以列和行表示数据,就像在数据库表中一样。
您可以将 Iceberg REST 客户端连接到 Amazon S3 表类数据存储服务 Iceberg REST 端点,然后进行 REST API 调用来创建、更新或查询 S3 表存储桶中的表。该端点实现了 Apache Iceberg REST Catalog Open API specification 中指定的一组标准化 Iceberg REST API。该端点的工作原理是将 Iceberg REST API 操作转换为相应的 S3 表类数据存储服务操作。
S3 表类数据存储服务中的数据存储在新的存储桶类型中:表存储桶,它将表存储为子资源。表存储桶支持以 Apache Iceberg 格式存储表。使用标准 SQL 语句,您可以通过支持 Iceberg 的查询引擎来查询表,例如 Amazon Athena、Amazon Redshift 和 Apache Spark。
sink {
Iceberg {
catalog_name = "s3_tables_catalog"
namespace = "s3_tables_catalog"
table = "user_data"
iceberg.catalog.config = {
type: "rest"
warehouse: "arn:aws:s3tables:<Region>:<accountID>:bucket/<bucketname>"
uri: "https://s3tables.<Region>.amazonaws.com/iceberg"
rest.sigv4-enabled: "true"
rest.signing-name: "s3tables"
rest.signing-region: "<Region>"
}
}
}
Multiple table(多表写入)
示例1
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
Iceberg {
...
namespace = "${database_name}_test"
table = "${table_name}_test"
}
}
示例2
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword
table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}
transform {
}
sink {
Iceberg {
...
namespace = "${schema_name}_test"
table = "${table_name}_test"
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Connector-V2] Iceberg add glue catalog support (#9247) | https://github.com/apache/seatunnel/commit/ecff2e8618 | 2.3.11 |
[Improve] Remove useless iceberg sink config iceberg.table.config (#9307) | https://github.com/apache/seatunnel/commit/fbdf39ebf2 | 2.3.11 |
[Improve][connector-iceberg] fix schema change event (#9217) | https://github.com/apache/seatunnel/commit/56669095b7 | 2.3.11 |
[Feature][Transform] Support define sink column type (#9114) | https://github.com/apache/seatunnel/commit/ab7119e507 | 2.3.11 |
[Feat][Connector-v2][Iceberg]support filter conditions in iceberg source (#9095) | https://github.com/apache/seatunnel/commit/0eb72780ee | 2.3.11 |
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
[Fix][API] Fixed not invoke the SinkAggregatedCommitter 's init method (#9070) | https://github.com/apache/seatunnel/commit/df0d11d632 | 2.3.11 |
[Improve] iceberg options (#8967) | https://github.com/apache/seatunnel/commit/82a374ec87 | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
[Feature][Iceberg] Support read multi-table (#8524) | https://github.com/apache/seatunnel/commit/2bfb97e502 | 2.3.10 |
[Improve][Iceberg] Filter catalog table primaryKey is empty (#8413) | https://github.com/apache/seatunnel/commit/857aab5e83 | 2.3.9 |
[Improve][Connector-V2] Reduce the create times of iceberg sink writer (#8155) | https://github.com/apache/seatunnel/commit/45a7a715a2 | 2.3.9 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
[Feature][Iceberg] Support custom delete sql for sink savemode (#8094) | https://github.com/apache/seatunnel/commit/29ca928c36 | 2.3.9 |
[Improve][Connector-V2] Reduce the request times of iceberg load table (#8149) | https://github.com/apache/seatunnel/commit/555f5eb404 | 2.3.9 |
[Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281ed | 2.3.9 |
[Improve][Iceberg] Support table comment for catalog (#7936) | https://github.com/apache/seatunnel/commit/72ab38f317 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
[Fix][Connector-V2] Fix iceberg throw java: package sun.security.krb5 does not exist when use jdk 11 (#7734) | https://github.com/apache/seatunnel/commit/116af4febc | 2.3.8 |
[Hotfix][Connector-V2] Release resources when task is closed for iceberg sinkwriter (#7729) | https://github.com/apache/seatunnel/commit/ff281183bd | 2.3.8 |
[Fix][Connector-V2] Fixed iceberg sink can not handle uppercase fields (#7660) | https://github.com/apache/seatunnel/commit/b7be0cb4a1 | 2.3.8 |
[Hotfix][CDC] Fix ddl duplicate execution error when config multi_table_sink_replica (#7634) | https://github.com/apache/seatunnel/commit/23ab3edbbb | 2.3.8 |
[Improve][Iceberg] Add savemode create table primaryKey testcase (#7641) | https://github.com/apache/seatunnel/commit/6b36f90f4d | 2.3.8 |
[Hotfix] Fix iceberg missing column comment when savemode create table (#7608) | https://github.com/apache/seatunnel/commit/b35bd94bfb | 2.3.8 |
[Improve][Connector-V2] Remove hard code iceberg table format version (#7500) | https://github.com/apache/seatunnel/commit/f49b263e65 | 2.3.8 |
[Improve][API] Move catalog open to SaveModeHandler (#7439) | https://github.com/apache/seatunnel/commit/8c2c5c79a1 | 2.3.8 |
[Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246) | https://github.com/apache/seatunnel/commit/e3001207c8 | 2.3.8 |
[Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
[Bug][Connector-Iceberg]fix create iceberg v2 table with pks (#6895) | https://github.com/apache/seatunnel/commit/40d2c1b213 | 2.3.6 |
[Feature][Connector-V2] Iceberg-sink supports writing data to branches (#6697) | https://github.com/apache/seatunnel/commit/e3103535cc | 2.3.6 |
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a5 | 2.3.5 |
[Improve] Add SaveMode log of process detail (#6375) | https://github.com/apache/seatunnel/commit/b0d70ce224 | 2.3.5 |
[Improve][Zeta] Add classloader cache mode to fix metaspace leak (#6355) | https://github.com/apache/seatunnel/commit/9c3c2f183d | 2.3.5 |
[Improve][API] Unify type system api(data & type) (#5872) | https://github.com/apache/seatunnel/commit/b38c7edcc9 | 2.3.5 |
[Feature] Supports iceberg sink #6198 (#6265) | https://github.com/apache/seatunnel/commit/18d3e86194 | 2.3.5 |
[Test][E2E] Add thread leak check for connector (#5773) | https://github.com/apache/seatunnel/commit/1f2f3fc5f0 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[BUG][Connector-V2] Iceberg source lost data with parallelism option (#5732) | https://github.com/apache/seatunnel/commit/7f3b4be075 | 2.3.4 |
[Dependency]Bump org.apache.avro:avro in /seatunnel-connectors-v2/connector-iceberg (#5582) | https://github.com/apache/seatunnel/commit/13753a927b | 2.3.4 |
[Improve][Pom] Add junit4 to the root pom (#5611) | https://github.com/apache/seatunnel/commit/7b4f7db2a2 | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
[Doc][Iceberg] Improved iceberg documentation (#5335) | https://github.com/apache/seatunnel/commit/659a68a0be | 2.3.4 |
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf73 | 2.3.3 |
[Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638) | https://github.com/apache/seatunnel/commit/64760eed4d | 2.3.2 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
[Improve][SourceConnector] Unifie Iceberg source fields to schema (#3959) | https://github.com/apache/seatunnel/commit/20e1255fab | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677) | https://github.com/apache/seatunnel/commit/e24843515f | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
[Feature][Connector-V2][Iceberg] Modify the scope of flink-shaded-hadoop-2 to provided to be compatible with hadoop3.x (#3046) | https://github.com/apache/seatunnel/commit/b38c50789f | 2.3.0 |
[Feature][Connector V2] expose configurable options in Iceberg (#3394) | https://github.com/apache/seatunnel/commit/bd9a313ded | 2.3.0 |
[Improve][Connector][Iceberg] Improve code. (#3065) | https://github.com/apache/seatunnel/commit/9f38e3da74 | 2.3.0-beta |
[Code-Improve][Iceberg] Use automatic resource management to replace 'try - finally' code block. (#2909) | https://github.com/apache/seatunnel/commit/b7f640724b | 2.3.0-beta |
[Feature][Connector-V2] Add iceberg source connector (#2615) | https://github.com/apache/seatunnel/commit/ffc6088a79 | 2.2.0-beta |