Kafka to Iceberg
Use this recipe when you want to land streaming events from Kafka into an Iceberg table for downstream analytics.
Prerequisites
Finish Run your first job.
Install the plugins required by this recipe. Follow Deployment > Download The Connector Plugins, then keep only the plugins below in
config/plugin_config:
--seatunnel-connectors--
connector-kafka
connector-iceberg
--end--
cd "${SEATUNNEL_HOME}"
sh bin/install-plugin.sh
ls connectors | rg 'connector-(kafka|iceberg)'
If you use Flink or Spark, add the Iceberg dependencies required by your environment, including
hive-execandlibfb303when needed.Choose an empty, writable Iceberg warehouse path. This tutorial uses a local Hadoop catalog at
file:///tmp/seatunnel/iceberg/warehouse-demo:
mkdir -p /tmp/seatunnel/iceberg/warehouse-demo
- Create the Kafka topic and produce a few JSON messages before starting the job:
kafka-topics.sh \
--create \
--if-not-exists \
--topic orders \
--bootstrap-server kafka:9092 \
--partitions 1 \
--replication-factor 1
kafka-console-producer.sh --topic orders --bootstrap-server kafka:9092 <<'EOF'
{"id":1001,"customer_id":2001,"total_amount":19.99,"event_date":"2026-06-12"}
{"id":1002,"customer_id":2002,"total_amount":29.99,"event_date":"2026-06-12"}
EOF
Minimal configuration
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Kafka {
plugin_output = "orders_kafka"
topic = "orders"
bootstrap.servers = "kafka:9092"
consumer.group = "seatunnel-orders"
start_mode = "earliest"
format = "json"
schema = {
fields {
id = bigint
customer_id = bigint
total_amount = "decimal(16, 2)"
event_date = string
}
}
}
}
sink {
Iceberg {
plugin_input = "orders_kafka"
catalog_name = "seatunnel_demo"
namespace = "lakehouse"
table = "orders"
iceberg.catalog.config = {
type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/warehouse-demo"
}
iceberg.table.primary-keys = "id"
iceberg.table.partition-keys = "event_date"
iceberg.table.upsert-mode-enabled = true
iceberg.table.schema-evolution-enabled = true
case_sensitive = true
}
}
Run the job
Save the config as config/kafka-to-iceberg.conf, then start SeaTunnel in local mode:
cd "${SEATUNNEL_HOME}"
./bin/seatunnel.sh --config ./config/kafka-to-iceberg.conf -m local
Keep the job running while Kafka messages are being consumed, because this is a streaming pipeline.
Validation result
- Verify that Iceberg metadata and data files appear under the warehouse path.
- Query the Iceberg table with Spark, Trino, or another Iceberg-compatible engine.
ls /tmp/seatunnel/iceberg/warehouse-demo/lakehouse/orders
spark-sql \
--conf spark.sql.catalog.seatunnel_demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.seatunnel_demo.type=hadoop \
--conf spark.sql.catalog.seatunnel_demo.warehouse=file:///tmp/seatunnel/iceberg/warehouse-demo \
-e "SELECT COUNT(*) FROM seatunnel_demo.lakehouse.orders"
If the table can be queried and the row count matches the Kafka messages you produced, the pipeline is working. With the two sample messages above, the count should be 2.
Common pitfalls
- JSON messages in Kafka do not match the schema defined in the source block.
- Checkpointing is disabled in a streaming pipeline, which weakens restart and consistency behavior.
- The Iceberg catalog type is correct, but the warehouse path is not writable by the engine process.
- Upsert mode is enabled even though the incoming records do not have stable primary keys.