Multi-Table Synchronization Architecture
1. Overview
1.1 Problem Background
Database migration and CDC scenarios often require synchronizing hundreds of tables:
- Resource Efficiency: How to avoid creating one job per table?
- Consistent Snapshot: How to ensure all tables start from same point in time?
- Schema Routing: How to route data to correct target tables?
- Independent Schemas: How to handle different schemas per table?
- Parallel Writing: How to maximize throughput for multiple tables?
1.2 Design Goals
SeaTunnel's multi-table synchronization aims to:
- Single Job, Multiple Tables: Synchronize hundreds of tables in one job
- Resource Efficiency: Share resources across tables
- Schema Independence: Each table maintains its own schema
- Dynamic Routing: Route records to correct sink based on table identity
- Horizontal Scalability: Support replica writers for high throughput
1.3 Use Cases
Database Migration:
source {
MySQL-CDC {
# Capture all tables in database
database-name = "my_db"
table-name = ".*" # Regex: all tables
}
}
sink {
JDBC {
# Write to PostgreSQL
url = "jdbc:postgresql://..."
}
}
Multi-Table CDC:
source {
MySQL-CDC {
table-name = "order_.*|user_.*|product_.*" # Multiple table patterns
}
}
sink {
Elasticsearch {
# Different indices per table
}
}
2. Core Abstractions
2.1 TablePath
Unique identifier for routing records to tables.
public class TablePath implements Serializable {
private final String databaseName;
private final String schemaName;
private final String tableName;
// Unique string representation
public String getFullName() {
return String.join(".", databaseName, schemaName, tableName);
}
}
Example:
TablePath orderTable = TablePath.of("my_db", "public", "orders");
TablePath userTable = TablePath.of("my_db", "public", "users");
2.2 SeaTunnelRow with TableId
Records carry table identity for routing.
public class SeaTunnelRow {
private final String tableId; // TablePath serialized
private final SeaTunnelRowKind rowKind; // INSERT, UPDATE, DELETE
private final Object[] fields;
public TablePath getTablePath() {
return TablePath.deserialize(tableId);
}
}
2.3 SinkIdentifier
Unique identifier for sink writers (table + replica index).
public class SinkIdentifier implements Serializable {
private final TableIdentifier tableIdentifier;
private final int index; // Replica index
// For multi-table: one identifier per table per replica
// Example: (orders, 0), (orders, 1), (users, 0), (users, 1)
}
3. MultiTableSource Architecture
3.1 Structure
public class MultiTableSource<T, SplitT, StateT>
implements SeaTunnelSource<T, SplitT, StateT> {
// Underlying sources (one per table)
private final Map<TablePath, SeaTunnelSource<T, SplitT, StateT>> sources;
// Produced catalog tables
private final List<CatalogTable> catalogTables;
}
3.2 Creation
// From configuration
MultiTableSource<SeaTunnelRow, ?, ?> multiSource =
MultiTableSource.builder()
.addSource(orderTablePath, orderSource)
.addSource(userTablePath, userSource)
.addSource(productTablePath, productSource)
.build();
3.3 Enumerator: Unified Split Assignment
public class MultiTableSourceSplitEnumerator {
private final Map<TablePath, SourceSplitEnumerator> enumerators;
@Override
public void handleSplitRequest(int subtaskId) {
// Round-robin across table enumerators
for (Map.Entry<TablePath, SourceSplitEnumerator> entry : enumerators.entrySet()) {
TablePath tablePath = entry.getKey();
SourceSplitEnumerator enumerator = entry.getValue();
// Request split from table enumerator
enumerator.handleSplitRequest(subtaskId);
}
}
@Override
public void addReader(int subtaskId) {
// Register reader with all table enumerators
for (SourceSplitEnumerator enumerator : enumerators.values()) {
enumerator.addReader(subtaskId);
}
}
}
3.4 Reader: Multi-Table Data Reading
public class MultiTableSourceReader {
private final Map<TablePath, SourceReader> readers;
private final Queue<TablePath> readOrder; // Round-robin queue
@Override
public void pollNext(Collector<SeaTunnelRow> output) {
if (readOrder.isEmpty()) {
return;
}
// Round-robin read from tables
TablePath currentTable = readOrder.poll();
SourceReader reader = readers.get(currentTable);
// Read from current table
reader.pollNext(new Collector<SeaTunnelRow>() {
@Override
public void collect(SeaTunnelRow row) {
// Tag row with table path
row.setTableId(currentTable.serialize());
output.collect(row);
}
});
// Re-add to queue for next round
readOrder.offer(currentTable);
}
@Override
public void addSplits(List<SplitT> splits) {
// Route splits to correct table readers
for (SplitT split : splits) {
TablePath tablePath = extractTablePath(split);
SourceReader reader = readers.get(tablePath);
reader.addSplits(Collections.singletonList(split));
// Add table to read order if not present
if (!readOrder.contains(tablePath)) {
readOrder.offer(tablePath);
}
}
}
}
4. MultiTableSink Architecture
4.1 Structure
public class MultiTableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
implements SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> {
// Underlying sinks (one per table)
private final Map<TablePath, SeaTunnelSink> sinks;
// Number of writer replicas per table
private final int replicaNum;
// Input catalog tables
private final List<CatalogTable> catalogTables;
}
4.2 Writer: Multi-Table Writing with Replicas
public class MultiTableSinkWriter<IN, CommitInfoT, StateT>
implements SinkWriter<IN, CommitInfoT, StateT> {
// Writers per table (multiple replicas per table)
private final Map<SinkIdentifier, SinkWriter<IN, CommitInfoT, StateT>> writers;
// Replica count per table
private final int replicaNum;
// Context
private final int writerIndex; // This writer's global index
@Override
public void write(IN element) throws IOException {
SeaTunnelRow row = (SeaTunnelRow) element;
// 1. Determine target table
TablePath tablePath = row.getTablePath();
// 2. Select replica for this table (load balancing)
int replicaIndex = selectReplica(tablePath, row);
// 3. Get writer for (table, replica)
SinkIdentifier identifier = new SinkIdentifier(
new TableIdentifier(tablePath),
replicaIndex
);
SinkWriter<IN, CommitInfoT, StateT> writer = writers.get(identifier);
// 4. Write to selected writer
writer.write(element);
}
private int selectReplica(TablePath tablePath, SeaTunnelRow row) {
// If primary key is available, route stably by primary key hash.
Optional<Object> primaryKey = extractPrimaryKeyIfPresent(row);
if (primaryKey.isPresent()) {
return Math.abs(primaryKey.get().hashCode()) % replicaNum;
}
// Otherwise, distribute across replicas (no stable routing guarantee).
return (int) (System.nanoTime() % replicaNum);
}
@Override
public Optional<CommitInfoT> prepareCommit(long checkpointId) throws IOException {
// Collect commit info from all writers
List<CommitInfoT> allCommitInfos = new ArrayList<>();
for (SinkWriter<IN, CommitInfoT, StateT> writer : writers.values()) {
Optional<CommitInfoT> commitInfo = writer.prepareCommit(checkpointId);
commitInfo.ifPresent(allCommitInfos::add);
}
// Wrap in multi-table commit info
return Optional.of((CommitInfoT) new MultiTableCommitInfo(allCommitInfos));
}
@Override
public List<StateT> snapshotState(long checkpointId) throws IOException {
// Snapshot all writers
List<StateT> allStates = new ArrayList<>();
for (Map.Entry<SinkIdentifier, SinkWriter> entry : writers.entrySet()) {
List<StateT> states = entry.getValue().snapshotState(checkpointId);
// Tag states with sink identifier for recovery
for (StateT state : states) {
allStates.add(wrapWithIdentifier(entry.getKey(), state));
}
}
return allStates;
}
}
4.3 Committer: Multi-Table Commit Coordination
public class MultiTableSinkCommitter<CommitInfoT>
implements SinkCommitter<CommitInfoT> {
// Committers per table
private final Map<TablePath, SinkCommitter<CommitInfoT>> committers;
@Override
public List<CommitInfoT> commit(List<CommitInfoT> commitInfos) throws IOException {
List<CommitInfoT> failed = new ArrayList<>();
// Group commit infos by table
Map<TablePath, List<CommitInfoT>> groupedInfos = groupByTable(commitInfos);
// Commit per table
for (Map.Entry<TablePath, List<CommitInfoT>> entry : groupedInfos.entrySet()) {
TablePath tablePath = entry.getKey();
List<CommitInfoT> tableCommitInfos = entry.getValue();
SinkCommitter<CommitInfoT> committer = committers.get(tablePath);
// Commit for this table
List<CommitInfoT> tableFailed = committer.commit(tableCommitInfos);
failed.addAll(tableFailed);
}
return failed;
}
private Map<TablePath, List<CommitInfoT>> groupByTable(List<CommitInfoT> commitInfos) {
Map<TablePath, List<CommitInfoT>> grouped = new HashMap<>();
for (CommitInfoT commitInfo : commitInfos) {
TablePath tablePath = extractTablePath(commitInfo);
grouped.computeIfAbsent(tablePath, k -> new ArrayList<>()).add(commitInfo);
}
return grouped;
}
}
5. Replica Mechanism
5.1 Why Replicas?
Problem: Single writer per table becomes bottleneck for high-throughput tables.
Solution: Multiple replica writers per table for parallel writing.
Without Replicas:
orders table (1000 writes/sec) → [Single Writer] → Bottleneck
With Replicas (replicaNum=4):
orders table (1000 writes/sec) → [Writer 0] (250 writes/sec)
→ [Writer 1] (250 writes/sec)
→ [Writer 2] (250 writes/sec)
→ [Writer 3] (250 writes/sec)
5.2 Replica Configuration
sink {
JDBC {
url = "..."
# Multi-table configuration
multi_table_sink_replica = 4 # replicas per table (applies to all tables)
}
}
5.3 Replica Selection Strategies
Hash-Based (when primary key is available):
// Ensures same primary key always goes to same replica (order preservation)
int replica = Math.abs(primaryKey.hashCode()) % replicaNum;
Random (when primary key is not available):
// Distributes load across replicas (no stable routing guarantee)
int replica = (int) (System.nanoTime() % replicaNum);
6. Schema Management in Multi-Table
6.1 Independent Schemas
Each table maintains its own schema:
public class MultiTableSink {
// Schema per table
private final Map<TablePath, CatalogTable> catalogTables;
public CatalogTable getCatalogTable(TablePath tablePath) {
return catalogTables.get(tablePath);
}
}
6.2 Schema Evolution Routing
public class MultiTableSinkWriter {
public void handleSchemaChange(SchemaChangeEvent event) {
// Route schema change to correct table writer
TablePath tablePath = event.getTableId().toTablePath();
// Apply to all replicas of this table
for (int i = 0; i < replicaNum; i++) {
SinkIdentifier identifier = new SinkIdentifier(
new TableIdentifier(tablePath),
i
);
SinkWriter writer = writers.get(identifier);
writer.applySchemaChange(event);
}
}
}
7. Data Flow Example
7.1 Full Pipeline
┌──────────────────────────────────────────────────────────────┐
│ MySQL CDC Source │
│ • Captures changes from 100 tables │
│ • Tags each row with TablePath │
└──────────────────────────┬───────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ SeaTunnelRow (with TablePath) │
│ tableId: "my_db.public.orders" │
│ fields: [1, "order-001", 99.99] │
└─────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ MultiTableSinkWriter │
│ • Extracts TablePath from row │
│ • Selects replica (hash or random) │
│ • Routes to correct writer │
└──────────────────────────┬───────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ orders │ │ users │ │ products │
│ Writer 0 │ │ Writer 0 │ │ Writer 0 │
│ Writer 1 │ │ Writer 1 │ │ Writer 1 │
│ Writer 2 │ │ │ │ │
│ Writer 3 │ │ │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ PostgreSQL │ │ PostgreSQL │ │ PostgreSQL │
│ orders │ │ users │ │ products │
└──────────────┘ └──────────────┘ └──────────────┘
7.2 Write Flow
sequenceDiagram
participant Source as MySQL CDC
participant Writer as MultiTableSinkWriter
participant OrderWriter as Order Writer (Replica 0)
participant UserWriter as User Writer (Replica 0)
participant PG as PostgreSQL
Source->>Writer: Row(tableId="orders", data=[...])
Writer->>Writer: Extract TablePath("orders")
Writer->>Writer: Select replica (hash) → 0
Writer->>OrderWriter: write(row)
OrderWriter->>PG: INSERT INTO orders ...
Source->>Writer: Row(tableId="users", data=[...])
Writer->>Writer: Extract TablePath("users")
Writer->>Writer: Select replica (hash) → 0
Writer->>UserWriter: write(row)
UserWriter->>PG: INSERT INTO users ...
7.3 Checkpoint Flow
sequenceDiagram
participant CP as CheckpointCoordinator
participant Writer as MultiTableSinkWriter
participant W1 as Order Writer 0
participant W2 as Order Writer 1
participant W3 as User Writer 0
CP->>Writer: triggerBarrier(checkpointId)
Writer->>W1: prepareCommit(checkpointId)
W1-->>Writer: CommitInfo(orders, replica=0)
Writer->>W2: prepareCommit(checkpointId)
W2-->>Writer: CommitInfo(orders, replica=1)
Writer->>W3: prepareCommit(checkpointId)
W3-->>Writer: CommitInfo(users, replica=0)
Writer->>CP: ACK([CommitInfo1, CommitInfo2, CommitInfo3])
8. Performance Optimization
8.1 Replica Sizing
Rule of Thumb:
replicaNum = ceil(Table Write Rate / Single Writer Throughput)
Example:
orders: 10,000 writes/sec
Single writer: 2,500 writes/sec
replicaNum = ceil(10,000 / 2,500) = 4
8.2 Table-Specific Replicas
// Future enhancement: different replicas per table
Map<TablePath, Integer> replicaConfig = Map.of(
TablePath.of("orders"), 4, // High-throughput table
TablePath.of("users"), 2, // Medium-throughput
TablePath.of("config"), 1 // Low-throughput
);
8.3 Batch Writing
public class MultiTableSinkWriter {
private final Map<SinkIdentifier, List<SeaTunnelRow>> buffers;
private static final int BATCH_SIZE = 1000;
@Override
public void write(SeaTunnelRow row) {
SinkIdentifier identifier = selectWriter(row);
List<SeaTunnelRow> buffer = buffers.computeIfAbsent(
identifier,
k -> new ArrayList<>()
);
buffer.add(row);
if (buffer.size() >= BATCH_SIZE) {
flushBuffer(identifier, buffer);
}
}
}
9. Monitoring and Observability
9.1 Key Metrics
Per-Table Metrics:
table.{tableName}.records_written: Records written per tabletable.{tableName}.bytes_written: Bytes written per tabletable.{tableName}.write_latency: Write latency per table
Per-Replica Metrics:
table.{tableName}.replica.{index}.records: Records per replicatable.{tableName}.replica.{index}.utilization: Replica utilization
Global Metrics:
multitable.tables.total: Total number of tablesmultitable.writers.total: Total number of writers (tables × replicas)multitable.throughput: Aggregate throughput
9.2 Monitoring Dashboard
Multi-Table Job: mysql-to-postgres
Tables: 100
Writers: 250 (avg 2.5 replicas per table)
Throughput: 50,000 records/sec
Top Tables by Throughput:
1. orders: 15,000 rec/sec (4 replicas)
2. events: 10,000 rec/sec (4 replicas)
3. users: 5,000 rec/sec (2 replicas)
...
Replica Distribution:
orders:
Replica 0: 3,750 rec/sec (25%)
Replica 1: 3,800 rec/sec (25.3%)
Replica 2: 3,700 rec/sec (24.7%)
Replica 3: 3,750 rec/sec (25%)
10. Best Practices
10.1 Table Selection
Table include/exclude patterns are connector-specific. Please refer to the specific Source connector documentation for the supported option keys and formats.
10.2 Replica Configuration
Start Conservative:
sink {
JDBC {
# Start with 1 replica, increase if bottleneck
multi_table_sink_replica = 1
}
}
Monitor and Tune:
# Check if single replica is bottleneck
# If write latency high → increase replicas
multi_table_sink_replica = 2 # Double capacity
10.3 Schema Management
Pre-create Target Tables:
-- Better: pre-create all target tables
CREATE TABLE orders (...);
CREATE TABLE users (...);
CREATE TABLE products (...);
Enable Auto-Create (Carefully):
sink {
JDBC {
# Auto-create missing tables
schema-evolution {
enabled = true
auto-create-table = true
}
}
}
10.4 Error Handling
Error tolerance and retry policies are typically connector-specific. Avoid relying on undocumented multi-table.* option keys unless they are defined by the connector you use.
11. Limitations and Considerations
11.1 Current Limitations
Shared Parallelism:
- All tables share same parallelism
- Cannot set different parallelism per table
Fixed Replicas:
- Same replica count for all tables
- High-throughput and low-throughput tables treated equally
Memory Overhead:
- Each writer maintains separate buffer
- 100 tables × 4 replicas = 400 writers in memory
11.2 Workarounds
High-Throughput Tables:
# Option 1: Separate job for hot tables
job-1 { source { table-name = "orders" } } # Dedicated job
job-2 { source { table-name = "user_.*|product_.*" } } # Rest
Memory Optimization:
# Reduce buffer size per writer
sink {
JDBC {
batch-size = 500 # Smaller batches
}
}
12. Future Enhancements
12.1 Dynamic Replicas
Per-table replica overrides are not supported by the current multi_table_sink_replica option (it applies to all tables). If you need per-table replicas, it requires additional connector/framework capabilities.
12.2 Adaptive Replicas
// Auto-adjust replicas based on throughput
if (table.getWriteRate() > threshold) {
increaseReplicas(table);
} else if (table.getWriteRate() < lowThreshold) {
decreaseReplicas(table);
}
13. Related Resources
14. References
Key Source Files
Example Implementations
- MySQL CDC Source:
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/ - JDBC Sink:
seatunnel-connectors-v2/connector-jdbc/