Skip to main content
Version: 2.3.4

Apache Iceberg

Apache Iceberg source connector

Support Iceberg Versionโ€‹

  • 0.14.0

Support Those Enginesโ€‹

Spark
Flink
SeaTunnel Zeta

Key featuresโ€‹

Descriptionโ€‹

Source connector for Apache Iceberg. It can support batch and stream mode.

Supported DataSource Infoโ€‹

DatasourceDependentMaven
Icebergflink-shaded-hadoopDownload
Iceberghive-execDownload
Iceberglibfb303Download

Database Dependencyโ€‹

In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to <FLINK_HOME>/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.

flink-shaded-hadoop-x-xxx.jar
hive-exec-xxx.jar
libfb303-xxx.jar

Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.

Data Type Mappingโ€‹

Iceberg Data typeSeaTunnel Data type
BOOLEANBOOLEAN
INTEGERINT
LONGBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
STRINGSTRING
FIXED
BINARY
BYTES
DECIMALDECIMAL
STRUCTROW
LISTARRAY
MAPMAP

Source Optionsโ€‹

NameTypeRequiredDefaultDescription
catalog_namestringyes-User-specified catalog name.
catalog_typestringyes-The optional values are: hive(The hive metastore catalog),hadoop(The hadoop catalog)
uristringno-The Hive metastoreโ€™s thrift URI.
warehousestringyes-The location to store metadata files and data files.
namespacestringyes-The iceberg database name in the backend catalog.
tablestringyes-The iceberg table name in the backend catalog.
schemaconfigno-Use projection to select data columns and columns order.
case_sensitivebooleannofalseIf data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
start_snapshot_timestamplongno-Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp.
timestamp โ€“ the timestamp in millis since the Unix epoch
start_snapshot_idlongno-Instructs this scan to look for changes starting from a particular snapshot (exclusive).
end_snapshot_idlongno-Instructs this scan to look for changes up to a particular snapshot (inclusive).
use_snapshot_idlongno-Instructs this scan to look for use the given snapshot ID.
use_snapshot_timestamplongno-Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp โ€“ the timestamp in millis since the Unix epoch
stream_scan_strategyenumnoFROM_LATEST_SNAPSHOTStarting strategy for stream mode execution, Default to use FROM_LATEST_SNAPSHOT if donโ€™t specify any value,The optional values are:
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive.
common-optionsno-Source plugin common parameters, please refer to Source Common Options for details.

Task Exampleโ€‹

Simple:โ€‹

env {
parallelism = 2
job.mode = "BATCH"
}

source {
Iceberg {
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
f5 = "float"
f6 = "double"
f7 = "date"
f9 = "timestamp"
f10 = "timestamp"
f11 = "string"
f12 = "bytes"
f13 = "bytes"
f14 = "decimal(19,9)"
f15 = "array<int>"
f16 = "map<string, int>"
}
}
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
namespace = "database1"
table = "source"
result_table_name = "iceberg"
}
}

transform {
}

sink {
Console {
source_table_name = "iceberg"
}
}

Hive Catalog:โ€‹

source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}

Column Projection:โ€‹

source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"

schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
}
}

Changelogโ€‹

2.2.0-beta 2022-09-26โ€‹

  • Add Iceberg Source Connector

next versionโ€‹

  • [Feature] Support Hadoop3.x (3046)
  • [improve][api] Refactoring schema parse (4157)