Skip to main content
Version: Next

CDC Production Cookbook: Full + Incremental Synchronization

Overview

This cookbook covers production-grade configuration and operation of Change Data Capture (CDC) connectors in SeaTunnel. It supplements the per-connector parameter references with end-to-end examples for the most common target systems and a troubleshooting checklist.

Supported CDC sources covered in this guide:

SourceConnector
MySQLMySQL-CDC
PostgreSQLPostgreSQL-CDC
OracleOracle-CDC

1. Full + Incremental Synchronization Lifecycle

A CDC job runs in two sequential phases:

Phase 1: Full Snapshot (batch read)
──> Read existing rows from every table in parallel splits
──> Write all rows to sink (no 2PC required for snapshot rows)
──> Record the binlog/WAL/SCN position at snapshot start

Phase 2: Incremental CDC (streaming)
──> Tail binlog/WAL/logminer from the snapshot-start position
──> Convert INSERT/UPDATE/DELETE events to SeaTunnelRow with RowKind
──> Write to sink via 2PC (if sink supports exactly-once)

This lifecycle is controlled by startup.mode:

startup.modeBehavior
initialRun full snapshot, then switch to incremental (recommended for production)
earliestSkip snapshot, start from the oldest available binlog/WAL position
latestSkip snapshot, start from the current binlog/WAL position
specificStart from a user-specified binlog file+offset or LSN/SCN
timestampStart from the binlog/WAL position at a given timestamp

Production recommendation: always use startup.mode=initial unless you are resuming a failed incremental job. Starting from latest silently skips existing data.


2. Prerequisites by Database

MySQL CDC

1. Enable binlog:
binlog_format = ROW
binlog_row_image = FULL

2. Create a CDC user with replication privileges:
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';

3. (Optional but recommended) Enable GTID for automatic failover:
gtid_mode = ON
enforce_gtid_consistency = ON

server-id requirement: MySQL treats each replication client as a unique server. The server-id in your job config must not conflict with any other MySQL replica or SeaTunnel CDC job connecting to the same MySQL instance. Assign a unique range (e.g., 5400–5499) per job.

PostgreSQL CDC

1. Set the WAL level:
wal_level = logical

2. Create a replication slot:
SELECT pg_create_logical_replication_slot('seatunnel_slot', 'pgoutput');

3. Grant replication privilege:
ALTER ROLE cdc_user REPLICATION LOGIN;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;

4. Publication (pgoutput decoder):
CREATE PUBLICATION seatunnel_pub FOR ALL TABLES;

Oracle CDC

1. Enable supplemental logging:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

2. Enable LogMiner:
EXECUTE DBMS_LOGMNR_D.BUILD(OPTIONS => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);

3. Create LogMiner user:
CREATE USER logminer_user IDENTIFIED BY password;
GRANT CREATE SESSION, LOGMINING, SELECT ANY TRANSACTION TO logminer_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;

3. Production Examples

3.1 MySQL CDC → Apache Doris

env {
parallelism = 4
job.mode = "STREAMING"
checkpoint.interval = 30000
}

source {
MySQL-CDC {
base-url = "jdbc:mysql://mysql-host:3306/orders_db"
username = "cdc_user"
password = "password"
database-names = ["orders_db"]
table-names = ["orders_db.orders", "orders_db.order_items"]
startup.mode = "initial"
server-id = "5400-5404"
# Enable exactly-once: coordinate 2PC with Doris sink
exactly-once = true
}
}

sink {
Doris {
fenodes = "doris-fe:8030"
username = "root"
password = ""
database = "orders_dw"
table = "${table_name}"
sink.enable-2pc = true
sink.label-prefix = "seatunnel_cdc_orders"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}

Key points:

  • sink.enable-2pc = true coordinates with SeaTunnel's exactly-once checkpoint
  • checkpoint.interval controls both the CDC commit frequency and the 2PC commit interval
  • A shorter interval reduces latency but increases Doris label metadata overhead

3.2 MySQL CDC → StarRocks

env {
parallelism = 4
job.mode = "STREAMING"
checkpoint.interval = 30000
}

source {
MySQL-CDC {
base-url = "jdbc:mysql://mysql-host:3306/inventory"
username = "cdc_user"
password = "password"
database-names = ["inventory"]
table-names = ["inventory.products", "inventory.customers"]
startup.mode = "initial"
server-id = "5410-5414"
exactly-once = true
}
}

sink {
StarRocks {
nodeUrls = ["starrocks-fe:8030"]
username = "root"
password = ""
database = "inventory_dw"
table = "${table_name}"
sink.properties.format = "json"
sink.properties.strip_outer_array = "true"
# Enable transaction for exactly-once
enable_upsert_delete = true
}
}

3.3 MySQL CDC → Kafka

env {
parallelism = 4
job.mode = "STREAMING"
checkpoint.interval = 10000
}

source {
MySQL-CDC {
base-url = "jdbc:mysql://mysql-host:3306/app_db"
username = "cdc_user"
password = "password"
database-names = ["app_db"]
table-names = ["app_db.users", "app_db.events"]
startup.mode = "initial"
server-id = "5420-5424"
format = "COMPATIBLE_DEBEZIUM_JSON"
}
}

sink {
Kafka {
topic = "cdc.${database_name}.${table_name}"
bootstrap.servers = "kafka:9092"
kafka.config {
acks = "all"
# Enable Kafka transactions for exactly-once
transactional.id = "seatunnel-cdc-${table_name}"
}
}
}

Debezium-compatible format: setting format = COMPATIBLE_DEBEZIUM_JSON produces Kafka messages that downstream consumers (Flink, Kafka Connect, etc.) can process with standard Debezium schemas.


3.4 PostgreSQL CDC → JDBC / Doris / StarRocks

env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 20000
}

source {
Postgres-CDC {
base-url = "jdbc:postgresql://pg-host:5432/mydb"
username = "cdc_user"
password = "password"
database-names = ["mydb"]
schema-names = ["public"]
table-names = ["public.orders", "public.customers"]
startup.mode = "initial"
slot.name = "seatunnel_slot"
decoding.plugin.name = "pgoutput"
}
}

sink {
Doris {
fenodes = "doris-fe:8030"
username = "root"
password = ""
database = "mydb_ods"
table = "${table_name}"
sink.enable-2pc = true
sink.label-prefix = "seatunnel_pg_cdc"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}

3.5 Oracle CDC → Doris / StarRocks / JDBC

env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 60000
}

source {
Oracle-CDC {
base-url = "jdbc:oracle:thin:@oracle-host:1521:ORCL"
username = "logminer_user"
password = "password"
database-names = ["ORCL"]
schema-names = ["HR"]
table-names = ["HR.EMPLOYEES", "HR.DEPARTMENTS"]
startup.mode = "initial"
}
}

sink {
Doris {
fenodes = "doris-fe:8030"
username = "root"
password = ""
database = "hr_ods"
table = "${table_name}"
sink.enable-2pc = true
sink.label-prefix = "seatunnel_oracle_cdc"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}

4. Checkpoint and 2PC Interaction

The checkpoint.interval setting controls both fault-tolerance frequency and the 2PC commit cadence for sinks like Doris and StarRocks:

Checkpoint N triggers:
1. Source: flush current binlog offset to checkpoint storage
2. Sink: commit all pending micro-batch transactions (2PC phase 2)
3. Sink: begin new transaction for next batch (2PC phase 1)

On failure and restore:
1. Engine restores binlog offset from last completed checkpoint
2. Sink rolls back any in-flight (uncommitted) transactions
3. CDC replay continues from checkpoint offset — no data loss, no duplicates

Choosing checkpoint.interval:

ScenarioRecommended interval
Low-latency CDC (< 5 s freshness required)5 000–10 000 ms
Standard production CDC30 000–60 000 ms
Oracle LogMiner (heavy query overhead)60 000–120 000 ms
High-throughput bulk CDC60 000–300 000 ms

5. Schema Evolution and DDL Support Boundaries

DDL OperationMySQL CDCPostgreSQL CDCOracle CDC
ADD COLUMNSupported (since 2.3.x)SupportedLimited
DROP COLUMNNot propagated by defaultNot propagatedNot propagated
RENAME COLUMNNot supportedNot supportedNot supported
ALTER COLUMN TYPERisky — may cause deserialization errorsRiskyRisky
TRUNCATE TABLENot capturedNot capturedNot captured
DROP TABLENot capturedNot capturedNot captured

Production recommendation: avoid ALTER TABLE on tables being synced by CDC without a coordinated schema migration plan. For complex DDL changes, use the following procedure:

  1. Stop the CDC job gracefully with a savepoint
  2. Apply DDL to both source and target
  3. Update the job config if schema mapping changes
  4. Restart the job from the savepoint

6. Observing CDC Delay

Query running job metrics via REST API

# Get current CDC lag (record count behind)
curl http://<master>:8080/hazelcast/rest/maps/running-job-metrics/<job-id>

Key metrics to watch

MetricMeaning
SourceReceivedCountTotal records read from source (snapshot + incremental)
SinkWriteCountTotal records written to sink
lagBinlog offset lag in MySQL terms

Lag monitoring script

#!/bin/bash
JOB_ID=$1
MASTER=$2
while true; do
echo "$(date) --- CDC lag:"
curl -s http://$MASTER:8080/hazelcast/rest/maps/running-job-metrics/$JOB_ID | \
python3 -c "import sys,json; d=json.load(sys.stdin); print(d)"
sleep 30
done

7. Troubleshooting Checklist

Permission errors

ErrorFix
Access denied for user ... REPLICATIONGrant REPLICATION SLAVE and REPLICATION CLIENT to CDC user
pg_hba.conf replication rejectedAdd host replication cdc_user 0.0.0.0/0 md5 to pg_hba.conf
Oracle ORA-01031: insufficient privilegesGrant LOGMINING, SELECT ANY TRANSACTION to LogMiner user

Network issues

ErrorFix
Connection refused on binlog portVerify MySQL bind-address is not 127.0.0.1; check firewall
PostgreSQL replication slot not createdVerify wal_level = logical and restart PG; slot creation requires superuser or replication role

Replication / offset issues

ErrorFix
Could not find first log file name in binary logMySQL binlog rotation deleted the starting position; reset job with startup.mode=initial
Replication slot ... does not existSlot was dropped; re-create it and restart job with startup.mode=initial
SCN not found in redo logOracle redo log recycled the starting SCN; restart job with startup.mode=initial

Checkpoint failures

ErrorFix
Checkpoint timeout exceededIncrease checkpoint.timeout; reduce parallelism to reduce checkpoint overhead
Checkpoint storage path not accessibleVerify HDFS / S3 connectivity from all workers; check storage credentials
state not found on restoreCheckpoint was deleted or path changed; submit fresh job

2PC sink issues

ErrorFix
Doris Label already existsDuplicate label due to restart without savepoint; change sink.label-prefix
StarRocks transaction timeoutIncrease sink.properties.timeout; check StarRocks FE connection pooling

See Also