Skip to main content
Version: Next

Apache Iceberg

Apache Iceberg sink connector

Support Iceberg Version​

  • 1.4.2

Support Those Engines​

Spark
Flink
SeaTunnel Zeta

Description​

Sink connector for Apache Iceberg. It can support cdc mode 、auto create table and table schema evolution.

Supported DataSource Info​

DatasourceDependentMaven
Iceberghive-execDownload
Iceberglibfb303Download

Database Dependency​

In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to <FLINK_HOME>/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.

hive-exec-xxx.jar
libfb303-xxx.jar

Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.

Data Type Mapping​

SeaTunnel Data typeIceberg Data type
BOOLEANBOOLEAN
INTINTEGER
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
STRINGSTRING
BYTESFIXED
BINARY
DECIMALDECIMAL
ROWSTRUCT
ARRAYLIST
MAPMAP

Sink Options​

NameTypeRequiredDefaultDescription
catalog_namestringyesdefaultUser-specified catalog name. default is default
namespacestringyesdefaultThe iceberg database name in the backend catalog. default is default
tablestringyes-The iceberg table name in the backend catalog.
iceberg.catalog.configmapyes-Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java"
hadoop.configmapno-Properties passed through to the Hadoop configuration
iceberg.hadoop-conf-pathstringno-The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files.
case_sensitivebooleannofalseIf data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
iceberg.table.write-propsmapno-Properties passed through to Iceberg writer initialization, these take precedence, such as 'write.format.default', 'write.target-file-size-bytes', and other settings, can be found with specific parameters at 'https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java'.
iceberg.table.auto-create-propsmapno-Configuration specified by Iceberg during automatic table creation.
iceberg.table.schema-evolution-enabledbooleannofalseSetting to true enables Iceberg tables to support schema evolution during the synchronization process
iceberg.table.primary-keysstringno-Default comma-separated list of columns that identify a row in tables (primary key)
iceberg.table.partition-keysstringno-Default comma-separated list of partition fields to use when creating tables
iceberg.table.upsert-mode-enabledbooleannofalseSet to true to enable upsert mode, default is false
schema_save_modeEnumnoCREATE_SCHEMA_WHEN_NOT_EXISTthe schema save mode, please refer to schema_save_mode below
data_save_modeEnumnoAPPEND_DATAthe data save mode, please refer to data_save_mode below
iceberg.table.commit-branchstringno-Default branch for commits

Task Example​

Simple:​

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc_iceberg"
server-id = 5652
username = "st_user"
password = "seatunnel"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
}

transform {
}

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Hive Catalog:​

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Hadoop catalog:​

sink {
Iceberg {
catalog_name="seatunnel_test"
iceberg.catalog.config={
type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
}
namespace="seatunnel_namespace"
table="iceberg_sink_table"
iceberg.table.write-props={
write.format.default="parquet"
write.target-file-size-bytes=536870912
}
iceberg.table.primary-keys="id"
iceberg.table.partition-keys="f_datetime"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
}
}

Changelog​

2.3.4-SNAPSHOT 2024-01-18​

  • Add Iceberg Sink Connector

next version​