Skip to main content
Version: Next

Sink Architecture

1. Overview

1.1 Problem Background

Writing data to external systems in distributed environments presents critical challenges:

  • Exactly-Once Guarantee: How to ensure each record is written exactly once, not zero or multiple times?
  • Transactional Consistency: How to commit writes atomically across multiple parallel writers?
  • Fault Tolerance: How to recover from failures without data loss or duplication?
  • Backpressure: How to handle slow sinks without overwhelming the system?
  • Idempotency: How to make retries safe?

1.2 Design Goals

SeaTunnel's Sink API aims to:

  1. Provide Verifiable Consistency Semantics: With checkpoint boundaries + 2PC, achieve exactly-once when the external sink supports transactional/idempotent commit
  2. Support Parallel Writes: Scale throughput with multiple writer instances
  3. Enable Global Coordination: Coordinate commits across distributed writers
  4. Ensure Fault Tolerance: Recover from failures without data inconsistency
  5. Provide Flexibility: Support various commit strategies (per-writer, aggregated, none)

1.3 Applicable Scenarios

  • Transactional databases (JDBC with XA transactions)
  • Message queues (Kafka with transactions)
  • File systems (atomic file rename)
  • Data lakes (Iceberg, Hudi, Delta Lake with table transactions)
  • Search engines (Elasticsearch with versioning)

2. Architecture Design

2.1 Overall Architecture

┌────────────────────────────────────────────────────────────────┐
│ TaskExecutionService (Worker Side) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkWriter<IN, CommitInfoT, StateT> │ │
│ │ │ │
│ │ • Receive records from upstream │ │
│ │ • Buffer and write data │ │
│ │ • Produce commitInfo at checkpoint boundary │ │
│ │ • Snapshot writer state │ │
│ │ • Cleanup/rollback on failure (engine-dependent) │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
└────────────────────────────┼─────────────────────────────────────┘
│ (CommitInfo)

┌────────────────────────────────────────────────────────────────┐
│ Coordinator Side (control plane, engine-dependent) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkCommitter<CommitInfoT> (Optional) │ │
│ │ │ │
│ │ • Receive commit infos from multiple writers │ │
│ │ • Commit each writer's changes independently │ │
│ │ • Retry failed commits │ │
│ │ • Must be idempotent │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ │ (Optional: AggregatedCommitInfo) │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkAggregatedCommitter<CommitInfoT, │ │
│ │ AggregatedCommitInfoT> │ │
│ │ (Optional) │ │
│ │ │ │
│ │ • Aggregate commit infos from all writers │ │
│ │ • Perform single global commit operation │ │
│ │ • Single-threaded, global coordinator │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘


External Data Sink
(Database / File / Message Queue)

2.2 Core Components

SeaTunnelSink (Factory Interface)

The top-level interface that serves as a factory for creating writers and committers.

public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
extends Serializable {

/**
* Create SinkWriter (called on worker)
*/
SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context)
throws IOException;

/**
* Restore SinkWriter from checkpoint (called on worker)
*/
default SinkWriter<IN, CommitInfoT, StateT> restoreWriter(
SinkWriter.Context context,
List<StateT> states) throws IOException {
return createWriter(context);
}

/**
* Serializer for writer state (optional).
*/
default Optional<Serializer<StateT>> getWriterStateSerializer() {
return Optional.empty();
}

/**
* Create SinkCommitter (optional, trigger location depends on execution engine)
*/
default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
return Optional.empty();
}

/**
* Serializer for commit info (optional).
*/
default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
return Optional.empty();
}

/**
* Create SinkAggregatedCommitter (optional).
*/
default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>>
createAggregatedCommitter() throws IOException {
return Optional.empty();
}

/**
* Serializer for aggregated commit info (optional).
*/
default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
return Optional.empty();
}

/**
* Get input schema.
*/
default Optional<CatalogTable> getWriteCatalogTable() {
return Optional.empty();
}
}

Key Design Points:

  • Three-tier commit architecture: Writer → Committer → AggregatedCommitter
  • Committer and AggregatedCommitter are optional (depends on sink requirements)
  • Writer is always required (performs actual data writing)

2.3 Interaction Flow

Normal Write Flow (with Two-Phase Commit)

sequenceDiagram
participant CP as CheckpointCoordinator
participant Writer1 as SinkWriter 1
participant Writer2 as SinkWriter 2
participant Committer as SinkCommitter
participant Sink as External Sink

Writer1->>Writer1: write(record)
Writer2->>Writer2: write(record)

CP->>Writer1: triggerBarrier(checkpointId)
CP->>Writer2: triggerBarrier(checkpointId)

Writer1->>Writer1: prepareCommit(checkpointId)
Writer1->>CP: ack(commitInfo1)
Writer2->>Writer2: prepareCommit(checkpointId)
Writer2->>CP: ack(commitInfo2)

CP->>CP: All writers acked
CP->>CP: Persist checkpoint

CP->>Committer: commit([commitInfo1, commitInfo2])
Committer->>Sink: Commit writer1 changes
Committer->>Sink: Commit writer2 changes
Committer->>CP: ack()

Note over Writer1,Writer2: Framework may notify checkpoint completion for cleanup (engine-dependent)

Failure and Retry Flow

sequenceDiagram
participant CP as CheckpointCoordinator
participant Writer as SinkWriter
participant Committer as SinkCommitter
participant Sink as External Sink

Writer->>Writer: prepareCommit(checkpointId)
Writer->>CP: ack(commitInfo)

CP->>Writer: [Failure - writer crashes]

CP->>CP: Checkpoint fails
CP->>CP: Restore from previous checkpoint

CP->>Writer: restoreWriter(previousState)
Writer->>Writer: Replay records from checkpoint

Writer->>Writer: prepareCommit(checkpointId)
Writer->>CP: ack(commitInfo)

CP->>Committer: commit([commitInfo])
Committer->>Sink: Commit (idempotent)
Committer-->>Sink: [Commit fails due to network]
Committer->>Committer: Retry
Committer->>Sink: Commit (idempotent)
Sink-->>Committer: Success

Note over Writer,Committer: Framework may notify checkpoint completion for cleanup (engine-dependent)

3. Key Implementations

3.1 SinkWriter Interface

The writer runs on workers and performs actual data writing.

public interface SinkWriter<IN, CommitInfoT, StateT> {

/**
* Write single record
*/
void write(IN element) throws IOException;

/**
* Prepare commit info during checkpoint.
*
* Guideline: do not make data externally visible in this phase.
*/
Optional<CommitInfoT> prepareCommit(long checkpointId) throws IOException;

/**
* Abort prepared commit if checkpoint fails
*/
void abortPrepare();

/**
* Snapshot writer state for checkpoint
*/
List<StateT> snapshotState(long checkpointId) throws IOException;

/**
* Close writer
*/
void close() throws IOException;

/**
* Context for interacting with framework
*/
interface Context {
int getIndexOfSubtask();
MetricsContext getMetricsContext();
}
}

Critical Requirements:

  • prepareCommit(checkpointId) should not make data externally visible (commit is done in SinkCommitter / SinkAggregatedCommitter)
  • prepareCommit(checkpointId) returns commit info that will be passed to committer
  • State returned by snapshotState() must capture all uncommitted writes
  • abortPrepare() is only used by Spark when prepareCommit(...) fails by throwing an exception

Implementation Example (JDBC with XA Transactions):

public class JdbcExactlyOnceSinkWriter implements SinkWriter<SeaTunnelRow, XidInfo, Void> {

private final XAConnection xaConnection;
private final XAResource xaResource;
private final Connection connection;
private final PreparedStatement statement;
private final List<Xid> pendingXids = new ArrayList<>();

@Override
public void write(SeaTunnelRow element) throws IOException {
try {
// Start XA transaction if needed
if (currentXid == null) {
currentXid = generateXid();
xaResource.start(currentXid, XAResource.TMNOFLAGS);
}

// Execute INSERT (buffered in transaction)
setParameters(statement, element);
statement.executeUpdate();

} catch (SQLException e) {
throw new IOException("Failed to write record", e);
}
}

@Override
public Optional<XidInfo> prepareCommit(long checkpointId) throws IOException {
if (currentXid == null) {
return Optional.empty(); // No data written
}

try {
// End XA transaction
xaResource.end(currentXid, XAResource.TMSUCCESS);

// Prepare XA transaction (FIRST PHASE - no side effects yet)
xaResource.prepare(currentXid);

// Return XID for committer
XidInfo xidInfo = new XidInfo(currentXid);
pendingXids.add(currentXid);
currentXid = null;

return Optional.of(xidInfo);

} catch (XAException e) {
throw new IOException("Failed to prepare XA transaction", e);
}
}

@Override
public void abortPrepare() {
// Rollback prepared transaction
if (currentXid != null) {
try {
xaResource.rollback(currentXid);
} catch (XAException e) {
LOG.error("Failed to rollback XA transaction", e);
}
}
}

@Override
public List<Void> snapshotState(long checkpointId) {
// For XA, state is managed by database
return Collections.emptyList();
}
}

Implementation Example (File Sink with Atomic Rename):

public class FileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileWriterState> {

private final String tempFilePath;
private final String finalFilePath;
private final OutputStream outputStream;
private long bytesWritten = 0;

@Override
public void write(SeaTunnelRow element) throws IOException {
// Write to temporary file
byte[] bytes = serialize(element);
outputStream.write(bytes);
bytesWritten += bytes.length;
}

@Override
public Optional<FileCommitInfo> prepareCommit(long checkpointId) throws IOException {
// Flush and close temp file (no rename yet!)
outputStream.flush();
outputStream.close();

// Return commit info for committer to rename file
return Optional.of(new FileCommitInfo(tempFilePath, finalFilePath));
}

@Override
public void abortPrepare() {
// Delete temporary file
new File(tempFilePath).delete();
}

@Override
public List<FileWriterState> snapshotState(long checkpointId) {
// Save current write position
return Collections.singletonList(new FileWriterState(bytesWritten));
}
}

3.2 SinkCommitter Interface

The committer runs on master and coordinates commits from multiple writers.

public interface SinkCommitter<CommitInfoT> extends Closeable {

/**
* Commit multiple commit infos (from multiple writers or retries)
* MUST be idempotent - may be called multiple times with same commitInfo
*/
List<CommitInfoT> commit(List<CommitInfoT> commitInfos) throws IOException;

/**
* Abort commit infos (optional)
*/
default void abort(List<CommitInfoT> commitInfos) throws IOException {}

/**
* Close committer
*/
void close() throws IOException;
}

Critical Requirements:

  • commit() MUST be idempotent (calling twice with same commitInfo should be safe)
  • Returns list of failed commitInfos (will be retried)
  • Should handle partial failures gracefully

Implementation Example (JDBC XA Committer):

public class JdbcSinkCommitter implements SinkCommitter<XidInfo> {

private final XADataSource xaDataSource;

@Override
public List<XidInfo> commit(List<XidInfo> commitInfos) throws IOException {
List<XidInfo> failed = new ArrayList<>();

for (XidInfo xidInfo : commitInfos) {
try {
XAConnection xaConn = xaDataSource.getXAConnection();
XAResource xaResource = xaConn.getXAResource();

// SECOND PHASE: Commit prepared transaction
xaResource.commit(xidInfo.getXid(), false);

xaConn.close();

} catch (XAException e) {
if (e.errorCode == XAException.XAER_NOTA) {
// Transaction already committed (idempotent)
LOG.info("XA transaction already committed: {}", xidInfo.getXid());
} else {
// Commit failed, will retry
LOG.error("Failed to commit XA transaction: {}", xidInfo.getXid(), e);
failed.add(xidInfo);
}
}
}

return failed; // Framework will retry failed commits
}

@Override
public void abort(List<XidInfo> commitInfos) {
// Rollback prepared transactions
for (XidInfo xidInfo : commitInfos) {
try {
XAConnection xaConn = xaDataSource.getXAConnection();
xaConn.getXAResource().rollback(xidInfo.getXid());
xaConn.close();
} catch (Exception e) {
LOG.error("Failed to rollback XA transaction", e);
}
}
}
}

Implementation Example (File Committer with Atomic Rename):

public class FileSinkCommitter implements SinkCommitter<FileCommitInfo> {

private final FileSystem fileSystem;

@Override
public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos) {
List<FileCommitInfo> failed = new ArrayList<>();

for (FileCommitInfo commitInfo : commitInfos) {
try {
Path tempPath = new Path(commitInfo.getTempFilePath());
Path finalPath = new Path(commitInfo.getFinalFilePath());

// Atomic rename (commit)
if (fileSystem.exists(finalPath)) {
// File already committed (idempotent)
LOG.info("File already exists, skipping: {}", finalPath);
fileSystem.delete(tempPath, false); // Clean up temp file
} else {
boolean success = fileSystem.rename(tempPath, finalPath);
if (!success) {
failed.add(commitInfo);
}
}

} catch (IOException e) {
LOG.error("Failed to commit file: {}", commitInfo, e);
failed.add(commitInfo);
}
}

return failed;
}
}

3.3 SinkAggregatedCommitter Interface

The aggregated committer performs single global commit for all writers.

public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>
extends Closeable {

/**
* Combine commit infos from multiple writers into single aggregated info
*/
AggregatedCommitInfoT combine(List<CommitInfoT> commitInfos);

/**
* Commit aggregated info (single global operation)
* MUST be idempotent
*/
List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfos)
throws IOException;

/**
* Abort aggregated commit infos
*/
default void abort(List<AggregatedCommitInfoT> aggregatedCommitInfos) throws IOException {}

/**
* Restore committer state from checkpoint
*/
default void restoreCommit(List<AggregatedCommitInfoT> aggregatedCommitInfos)
throws IOException {}

/**
* Close committer
*/
void close() throws IOException;
}

Use Cases:

  • Hive table commit (single COMMIT TRANSACTION for all partitions)
  • Iceberg table commit (single table snapshot)
  • Global index updates (update index once for all writes)

Implementation Example (Hive Sink):

public class HiveAggregatedCommitter
implements SinkAggregatedCommitter<HiveWriteInfo, HiveCommitInfo> {

@Override
public HiveCommitInfo combine(List<HiveWriteInfo> commitInfos) {
// Collect all written files across all writers
List<String> allFiles = new ArrayList<>();
for (HiveWriteInfo writeInfo : commitInfos) {
allFiles.addAll(writeInfo.getWrittenFiles());
}
return new HiveCommitInfo(allFiles);
}

@Override
public List<HiveCommitInfo> commit(List<HiveCommitInfo> aggregatedCommitInfos) {
List<HiveCommitInfo> failed = new ArrayList<>();

for (HiveCommitInfo commitInfo : aggregatedCommitInfos) {
try {
// Single global commit for entire table
hiveMetastore.beginTransaction();

for (String file : commitInfo.getAllFiles()) {
hiveMetastore.addPartitionFile(tableName, file);
}

hiveMetastore.commitTransaction(); // Global atomic commit

} catch (Exception e) {
LOG.error("Failed to commit to Hive", e);
hiveMetastore.rollbackTransaction();
failed.add(commitInfo);
}
}

return failed;
}
}

3.4 Code References

API Interfaces:

Example Implementations:

  • JDBC Sink: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/
  • Kafka Sink: seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/
  • File Sink: seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/

4. Design Considerations

4.1 Design Trade-offs

Two-Phase Commit

Pros:

  • Strong consistency guarantee (exactly-once)
  • Automatic failure recovery
  • Clear separation between prepare and commit

Cons:

  • Increased latency (data visible only after commit)
  • Requires transactional support in sink
  • Additional state for commit info
  • More complex implementation

When to Use:

  • Financial transactions, billing, audit logs
  • Any scenario requiring exactly-once guarantee

When Not to Use:

  • At-least-once is acceptable (logging, metrics)
  • Sink doesn't support transactions
  • Ultra-low latency required

Three-Tier vs Two-Tier Commit

Two-Tier (Writer → Committer):

  • Each writer's commit handled independently
  • Parallel commit operations
  • Suitable for most sinks

Three-Tier (Writer → Committer → AggregatedCommitter):

  • All writers' commits aggregated into single operation
  • Single global commit point
  • Required for table-level transactions (Hive, Iceberg)

4.2 Performance Considerations

Batch Writing

public class BatchSinkWriter {
private final List<SeaTunnelRow> batch = new ArrayList<>();
private static final int BATCH_SIZE = 1000;

@Override
public void write(SeaTunnelRow element) {
batch.add(element);
if (batch.size() >= BATCH_SIZE) {
flushBatch();
}
}

private void flushBatch() {
// Write entire batch in single operation
statement.executeBatch();
batch.clear();
}
}

Benefits:

  • Amortize per-record overhead
  • Reduce network round-trips
  • Better throughput

Async Writes

public class AsyncSinkWriter {
private final BlockingQueue<CompletableFuture<Void>> pendingWrites = new LinkedBlockingQueue<>();

@Override
public void write(SeaTunnelRow element) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Async write operation
actualWrite(element);
}, executorService);

pendingWrites.add(future);
}

@Override
public Optional<CommitInfo> prepareCommit(long checkpointId) {
// Wait for all pending writes to complete
for (CompletableFuture<Void> future : pendingWrites) {
future.join();
}
pendingWrites.clear();

return Optional.of(createCommitInfo());
}
}

Connection Pooling

public class JdbcSinkWriter {
private final HikariDataSource dataSource;

@Override
public void write(SeaTunnelRow element) {
try (Connection conn = dataSource.getConnection()) {
// Reuse pooled connections
PreparedStatement stmt = conn.prepareStatement(sql);
stmt.executeUpdate();
}
}
}

4.3 Idempotency Patterns

1. Natural Idempotency (Upsert)

// INSERT ON DUPLICATE KEY UPDATE (MySQL)
String sql = "INSERT INTO table (id, name) VALUES (?, ?) " +
"ON DUPLICATE KEY UPDATE name = VALUES(name)";

// MERGE INTO (Oracle, SQL Server)
String sql = "MERGE INTO table USING (SELECT ? as id, ? as name FROM dual) src " +
"ON (table.id = src.id) " +
"WHEN MATCHED THEN UPDATE SET table.name = src.name " +
"WHEN NOT MATCHED THEN INSERT (id, name) VALUES (src.id, src.name)";

2. Deduplication Key

public class KafkaSinkWriter {
@Override
public void write(SeaTunnelRow element) {
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
element.getField(0).toString(), // Key for deduplication
element.toString()
);

// Kafka deduplicates based on (topic, partition, offset, idempotent producer)
producer.send(record);
}
}

3. External Deduplication Table

public class JdbcCommitter {
@Override
public List<XidInfo> commit(List<XidInfo> commitInfos) {
for (XidInfo xidInfo : commitInfos) {
String xidString = xidInfo.getXid().toString();

// Check if already committed
boolean exists = checkCommitTable(xidString);
if (exists) {
LOG.info("XID already committed: {}", xidString);
continue; // Idempotent
}

// Commit transaction
xaResource.commit(xidInfo.getXid(), false);

// Record commit
insertCommitTable(xidString, System.currentTimeMillis());
}
}
}

5. Best Practices

5.1 Usage Recommendations

1. Choose Appropriate Commit Level

// Simple sink: Writer only (at-least-once)
public class SimpleSink implements SeaTunnelSink<...> {
SinkWriter createWriter(...) { return new SimpleWriter(); }
// No committer - data written directly
}

// Transactional sink: Writer + Committer (exactly-once)
public class TransactionalSink implements SeaTunnelSink<...> {
SinkWriter createWriter(...) { return new TransactionalWriter(); }
Optional<SinkCommitter> createCommitter() { return Optional.of(new Committer()); }
}

// Table sink: Writer + Committer + AggregatedCommitter
public class TableSink implements SeaTunnelSink<...> {
SinkWriter createWriter(...) { return new TableWriter(); }
Optional<SinkCommitter> createCommitter() { return Optional.of(new Committer()); }
Optional<SinkAggregatedCommitter> createAggregatedCommitter() {
return Optional.of(new AggregatedCommitter());
}
}

2. Proper State Management

public class StatefulSinkWriter {
private long recordsWritten = 0;
private long bytesWritten = 0;

@Override
public List<WriterState> snapshotState(long checkpointId) {
return Collections.singletonList(
new WriterState(recordsWritten, bytesWritten)
);
}

public StatefulSinkWriter restoreState(List<WriterState> states) {
if (!states.isEmpty()) {
WriterState state = states.get(0);
this.recordsWritten = state.getRecordsWritten();
this.bytesWritten = state.getBytesWritten();
}
return this;
}
}

3. Resource Management

@Override
public void close() throws IOException {
// Close in reverse order of creation
if (statement != null) statement.close();
if (connection != null) connection.close();
if (dataSource != null) dataSource.close();
}

5.2 Common Pitfalls

1. Side Effects in prepareCommit(checkpointId)

// ❌ BAD: Actual commit in prepareCommit(checkpointId)
public Optional<CommitInfo> prepareCommit(long checkpointId) {
connection.commit(); // WRONG! This is a side effect!
return Optional.of(new CommitInfo());
}

// ✅ GOOD: Only prepare, no side effects
public Optional<CommitInfo> prepareCommit(long checkpointId) {
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid); // Prepare only, no commit yet
return Optional.of(new XidInfo(xid));
}

2. Non-Idempotent Commit

// ❌ BAD: Direct INSERT (not idempotent)
public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
for (CommitInfo info : commitInfos) {
executeInsert(info); // May fail if called twice!
}
}

// ✅ GOOD: UPSERT (idempotent)
public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
for (CommitInfo info : commitInfos) {
executeUpsert(info); // Safe to call multiple times
}
}

3. Large State

// ❌ BAD: Buffer all records in state
public class BadWriter {
private List<SeaTunnelRow> bufferedRows = new ArrayList<>(); // May be huge!

public List<State> snapshotState() {
return Collections.singletonList(new State(bufferedRows));
}
}

// ✅ GOOD: Flush before checkpoint, track metadata only
public class GoodWriter {
private long lastCommittedOffset = 0;

public Optional<CommitInfo> prepareCommit(long checkpointId) {
flushBufferedRows(); // Write to external system
return Optional.of(new CommitInfo(lastCommittedOffset));
}
}

5.3 Debugging Tips

1. Enable XA Transaction Logging

// Log XA operations for debugging
LOG.info("Starting XA transaction: {}", xid);
xaResource.start(xid, XAResource.TMNOFLAGS);

LOG.info("Preparing XA transaction: {}", xid);
xaResource.prepare(xid);

LOG.info("Committing XA transaction: {}", xid);
xaResource.commit(xid, false);

2. Track Commit Progress

public class MonitoredCommitter {
private final Counter commitAttempts = metricGroup.counter("commit_attempts");
private final Counter commitSuccesses = metricGroup.counter("commit_successes");
private final Counter commitFailures = metricGroup.counter("commit_failures");

public List<CommitInfo> commit(List<CommitInfo> commitInfos) {
commitAttempts.inc(commitInfos.size());

List<CommitInfo> failed = new ArrayList<>();
for (CommitInfo info : commitInfos) {
try {
doCommit(info);
commitSuccesses.inc();
} catch (Exception e) {
commitFailures.inc();
failed.add(info);
}
}
return failed;
}
}

3. Test Failure Scenarios

@Test
public void testCheckpointFailureRecovery() {
// Write data
writer.write(row1);
writer.write(row2);

// Prepare commit
Optional<CommitInfo> commitInfo = writer.prepareCommit(checkpointId);

// Simulate checkpoint failure
writer.abortPrepare();

// Verify no data committed
assertFalse(dataExistsInSink());

// Restore and retry
writer.write(row1);
writer.write(row2);
commitInfo = writer.prepareCommit(checkpointId);

// Commit should succeed
committer.commit(Collections.singletonList(commitInfo.get()));
assertTrue(dataExistsInSink());
}

7. References

Example Connectors

  • Simple Sink: ConsoleSink (logs to stdout)
  • File Sink: FileSink (atomic file rename)
  • Database Sink: JdbcSink (XA transactions)
  • Streaming Sink: KafkaSink (Kafka transactions)
  • Table Sink: IcebergSink (table commits)

Further Reading