Checkpoint Mechanism
1. Overview
1.1 Problem Background
Distributed data processing systems face critical challenges for fault tolerance:
- State Loss: How to preserve processing state across failures?
- Exactly-Once: How to ensure each record is processed exactly once?
- Distributed Consistency: How to create consistent snapshots across distributed tasks?
- Performance: How to checkpoint without blocking data processing?
- Recovery: How to efficiently restore state after failures?
1.2 Design Goals
SeaTunnel's checkpoint mechanism aims to:
- Guarantee Exactly-Once Semantics: Consistent state snapshots + two-phase commit
- Minimize Overhead: Asynchronous checkpoint, no data processing blocking
- Fast Recovery: Restore from latest checkpoint in seconds
- Distributed Coordination: Coordinate checkpoints across hundreds of tasks
- Pluggable Storage: Support multiple storage backends (HDFS, S3, Local, OSS)
1.3 Theoretical Foundation
SeaTunnel's checkpoint is based on the Chandy-Lamport distributed snapshot algorithm:
Key Idea: Insert special markers (barriers) into data streams. When a task receives barrier:
- Snapshot its local state
- Forward barrier downstream
- Continue processing
Result: Globally consistent snapshot without pausing entire system.
Reference: "Distributed Snapshots: Determining Global States of Distributed Systems" (Chandy & Lamport, 1985)
2. Architecture Design
2.1 Checkpoint Architecture
┌─────────────────────────────────────────────────────────────────┐
│ JobMaster (per job) │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ CheckpointCoordinator (per pipeline) │ │
│ │ │ │
│ │ • Trigger checkpoint (periodic/manual) │ │
│ │ • Generate checkpoint ID │ │
│ │ • Track pending checkpoints │ │
│ │ • Collect task acknowledgements │ │
│ │ • Persist completed checkpoints │ │
│ │ • Cleanup old checkpoints │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ │ (Trigger Barrier) │
│ ▼ │
└─────────────────────────────────────────────────────────────────┘
│
│ (CheckpointBarrier)
▼
┌─────────────────────────────────────────────────────────────────┐
│ Worker Nodes │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SourceTask 1 │ │ SourceTask 2 │ │ SourceTask N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Snapshot │ │ 2. Snapshot │ │ 2. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 3. ACK │ │ 3. ACK │ │ 3. ACK │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ │ (Barrier Propagation) │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Transform 1 │ │ Transform 2 │ │ Transform N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Snapshot │ │ 2. Snapshot │ │ 2. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 3. ACK │ │ 3. ACK │ │ 3. ACK │ │
│ │ 4. Forward │ │ 4. Forward │ │ 4. Forward │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SinkTask 1 │ │ SinkTask 2 │ │ SinkTask N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Prepare │ │ 2. Prepare │ │ 2. Prepare │ │
│ │ Commit │ │ Commit │ │ Commit │ │
│ │ 3. Snapshot │ │ 3. Snapshot │ │ 3. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 4. ACK │ │ 4. ACK │ │ 4. ACK │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ (All ACKs received)
▼
┌─────────────────────────────────────────────────────────────────┐
│ CheckpointStorage │
│ (HDFS / S3 / Local / OSS) │
│ │
│ CompletedCheckpoint { │
│ checkpointId: 123 │
│ taskStates: { │
│ SourceTask-1: { splits: [...], offsets: [...] } │
│ SinkTask-1: { commitInfo: XidInfo(...) } │
│ ... │
│ } │
│ } │
└─────────────────────────────────────────────────────────────────┘
2.2 Key Data Structures
CheckpointCoordinator
public class CheckpointCoordinator {
// Checkpoint ID generator
private final CheckpointIDCounter checkpointIdCounter;
// Checkpoint execution plan
private final CheckpointPlan checkpointPlan;
// Pending checkpoints (in progress)
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
// Completed checkpoints (success)
private final ArrayDeque<String> completedCheckpointIds;
// Latest completed checkpoint
private CompletedCheckpoint latestCompletedCheckpoint;
// Checkpoint storage
private final CheckpointStorage checkpointStorage;
// Configuration
private final long checkpointInterval; // Trigger interval (ms)
private final long checkpointTimeout; // Timeout (ms)
private final int minPauseBetweenCheckpoints; // Min pause (ms)
}
PendingCheckpoint
Represents in-progress checkpoint.
public class PendingCheckpoint {
private final long checkpointId;
private final CheckpointType checkpointType; // CHECKPOINT or SAVEPOINT
private final long triggerTimestamp;
// Tasks that haven't acknowledged yet
private final Set<Long> notYetAcknowledgedTasks;
// Collected action states (from task ACKs)
private final Map<ActionStateKey, ActionState> actionStates;
// Task statistics (records processed, bytes, etc.)
private final Map<Long, TaskStatistics> taskStatistics;
// Future completed when all tasks ACK
private final CompletableFuture<CompletedCheckpoint> completableFuture;
/**
* Called when task acknowledges checkpoint
*/
public void acknowledgeTask(long taskId, List<ActionSubtaskState> states,
TaskStatistics statistics) {
notYetAcknowledgedTasks.remove(taskId);
// Collect states
for (ActionSubtaskState state : states) {
actionStates.computeIfAbsent(state.getKey(), k -> new ActionState())
.putSubtaskState(state);
}
// Collect statistics
taskStatistics.put(taskId, statistics);
// Check if all tasks acknowledged
if (notYetAcknowledgedTasks.isEmpty()) {
completeCheckpoint();
}
}
private void completeCheckpoint() {
CompletedCheckpoint completed = new CompletedCheckpoint(
checkpointId, actionStates, taskStatistics, System.currentTimeMillis()
);
completableFuture.complete(completed);
}
}
CompletedCheckpoint
Persisted checkpoint data.
public class CompletedCheckpoint implements Serializable {
private final long checkpointId;
private final Map<ActionStateKey, ActionState> taskStates;
private final Map<Long, TaskStatistics> taskStatistics;
private final long completedTimestamp;
}
public class ActionState implements Serializable {
private final ActionStateKey key; // (pipelineId, actionId)
private final Map<Integer, ActionSubtaskState> subtaskStates;
}
public class ActionSubtaskState implements Serializable {
private final int subtaskIndex;
private final byte[] state; // Serialized state
}
2.3 CheckpointStorage
Abstraction for checkpoint persistence.
public interface CheckpointStorage {
/**
* Store completed checkpoint
*/
void storeCheckpoint(CompletedCheckpoint checkpoint) throws IOException;
/**
* Get latest checkpoint
*/
Optional<CompletedCheckpoint> getLatestCheckpoint() throws IOException;
/**
* Get specific checkpoint by ID
*/
Optional<CompletedCheckpoint> getCheckpoint(long checkpointId) throws IOException;
/**
* Delete old checkpoint
*/
void deleteCheckpoint(long checkpointId) throws IOException;
}
Implementations:
LocalFileStorage: Local file system (testing)HdfsStorage: Hadoop FileSystem-based backend; can work with HDFS/S3A/etc depending on Hadoop configuration
Note: S3 and OSS support are provided through Hadoop FileSystem configuration (e.g., fs.s3a.impl) rather than separate CheckpointStorage implementations.
3. Checkpoint Flow
3.1 Trigger Checkpoint
sequenceDiagram
participant Timer as Periodic Timer
participant Coord as CheckpointCoordinator
participant Plan as CheckpointPlan
Timer->>Coord: Trigger (every 60s)
Coord->>Coord: Generate checkpointId (123)
Coord->>Coord: Check conditions
Note over Coord: • Min pause elapsed?<br/>• Max concurrent not exceeded?<br/>• Previous checkpoint complete?
Coord->>Coord: Create PendingCheckpoint(123)
Coord->>Plan: Get starting tasks
loop For each starting task
Coord->>Task: Send CheckpointBarrierTriggerOperation(123)
end
Coord->>Coord: Start timeout timer (10 minutes)
Trigger Conditions:
- Checkpoint interval elapsed (e.g., 60 seconds)
- Minimum pause between checkpoints elapsed (e.g., 10 seconds)
- Number of concurrent checkpoints < max (e.g., 1)
- No checkpoint in progress (for single concurrent)
3.2 Barrier Propagation
sequenceDiagram
participant Coord as Coordinator
participant Source as SourceTask
participant Transform as TransformTask
participant Sink as SinkTask
Coord->>Source: Trigger barrier(123)
Source->>Source: Receive barrier
Source->>Source: snapshotState() → splits, offsets
Source->>Coord: ACK(state)
Source->>Transform: Forward barrier(123)
Transform->>Transform: Receive barrier
Transform->>Transform: snapshotState() → transform state
Transform->>Coord: ACK(state)
Transform->>Sink: Forward barrier(123)
Sink->>Sink: Receive barrier
Sink->>Sink: prepareCommit(checkpointId) → commitInfo
Sink->>Sink: snapshotState() → writer state
Sink->>Coord: ACK(commitInfo + state)
Coord->>Coord: All ACKs received
Coord->>Coord: Create CompletedCheckpoint
Barrier Flow Rules:
- Source Tasks: Start of pipeline, receive barrier from coordinator
- Transform Tasks: Receive from upstream, snapshot, forward downstream
- Sink Tasks: End of pipeline, receive from upstream, snapshot, no forward
Barrier Alignment (for tasks with multiple inputs):
// Task with 2 inputs
Input 1: ──data──data──[barrier-123]──data──data──
│ Wait!
Input 2: ──data──data──data──data──[barrier-123]──
│
▼
Both barriers received, snapshot state
3.3 State Snapshot
Each task type snapshots different state:
SourceTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Snapshot SourceReader state (splits + offsets)
List<byte[]> states = sourceFlowLifeCycle.snapshotState(checkpointId);
// 2. Create ActionSubtaskState
ActionSubtaskState state = new ActionSubtaskState(subtaskIndex, states);
// 3. Send ACK to coordinator
sendAcknowledgement(checkpointId, Collections.singletonList(state));
// 4. Forward barrier downstream
forwardBarrierToDownstream(checkpointId);
}
TransformTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Snapshot Transform state (usually stateless, empty state)
List<byte[]> states = transformFlowLifeCycle.snapshotState(checkpointId);
// 2. Create ActionSubtaskState
ActionSubtaskState state = new ActionSubtaskState(subtaskIndex, states);
// 3. Send ACK
sendAcknowledgement(checkpointId, Collections.singletonList(state));
// 4. Forward barrier
forwardBarrierToDownstream(checkpointId);
}
SinkTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Prepare commit (TWO-PHASE COMMIT)
Optional<CommitInfoT> commitInfo = sinkWriter.prepareCommit(checkpointId);
// 2. Snapshot writer state
List<StateT> writerStates = sinkWriter.snapshotState(checkpointId);
// 3. Create ActionSubtaskState (includes both commit info and state)
ActionSubtaskState state = new ActionSubtaskState(
subtaskIndex,
serialize(writerStates),
commitInfo.orElse(null)
);
// 4. Send ACK (NO forwarding - end of pipeline)
sendAcknowledgement(checkpointId, Collections.singletonList(state));
}
3.4 Checkpoint Completion
sequenceDiagram
participant Coord as CheckpointCoordinator
participant Pending as PendingCheckpoint
participant Storage as CheckpointStorage
participant Committer as SinkCommitter
participant Tasks as All Tasks
Pending->>Pending: All tasks ACKed
Pending->>Coord: notifyCheckpointComplete()
Coord->>Coord: Create CompletedCheckpoint
Coord->>Storage: Persist checkpoint
Storage-->>Coord: Success
Coord->>Committer: commit(commitInfos)
Committer-->>Coord: Success
Coord->>Tasks: notifyCheckpointComplete(123)
Tasks->>Tasks: Cleanup resources
Coord->>Storage: Delete old checkpoints
Completion Steps:
- All tasks acknowledged
- Create
CompletedCheckpointfromPendingCheckpoint - Persist checkpoint to storage
- Trigger sink commit (two-phase commit)
- Notify all tasks of completion
- Cleanup old checkpoints (retain last N)
3.5 Checkpoint Timeout
// CheckpointCoordinator
private void startCheckpointTimeout(long checkpointId, long timeoutMs) {
scheduledExecutor.schedule(() -> {
PendingCheckpoint pending = pendingCheckpoints.get(checkpointId);
if (pending != null && !pending.isCompleted()) {
LOG.warn("Checkpoint {} timeout after {}ms, {} tasks not yet acknowledged",
checkpointId, timeoutMs, pending.getNotYetAcknowledgedTasks());
// Fail checkpoint
pending.abort();
pendingCheckpoints.remove(checkpointId);
// Trigger job failover if needed
handleCheckpointFailure(checkpointId);
}
}, timeoutMs, TimeUnit.MILLISECONDS);
}
Timeout Handling:
- Default timeout: 10 minutes
- If timeout, checkpoint fails
- Job continues with previous checkpoint
- Next checkpoint will be triggered per schedule
4. Recovery Process
4.1 Restore from Checkpoint
sequenceDiagram
participant JM as JobMaster
participant Storage as CheckpointStorage
participant Source as SourceTask
participant Sink as SinkTask
JM->>Storage: getLatestCheckpoint()
Storage-->>JM: CompletedCheckpoint(123)
JM->>JM: Extract states per task
JM->>Source: Deploy with NotifyTaskRestoreOperation
activate Source
Source->>Source: restoreState(splits, offsets)
Source->>Source: Seek to checkpointed offset
Source-->>JM: Ready
deactivate Source
JM->>Sink: Deploy with NotifyTaskRestoreOperation
activate Sink
Sink->>Sink: restoreWriter(writerState)
Sink->>Sink: Restore uncommitted transactions
Sink-->>JM: Ready
deactivate Sink
JM->>Source: Start execution
JM->>Sink: Start execution
Restore Steps:
- JobMaster retrieves latest
CompletedCheckpointfrom storage - Extract state for each task (by ActionStateKey and subtaskIndex)
- Deploy tasks with
NotifyTaskRestoreOperationcontaining state - Tasks restore state:
- SourceReader: Restore splits and offsets, seek to position
- Transform: Restore transform state (usually none)
- SinkWriter: Restore writer state, may have uncommitted transactions
- Tasks transition to READY_START state
- Job resumes execution
Example: JDBC Source Recovery:
public class JdbcSourceReader {
@Override
public void restoreState(List<JdbcSourceState> states) {
for (JdbcSourceState state : states) {
JdbcSourceSplit split = state.getSplit();
long offset = state.getCurrentOffset();
// Restore split with offset
pendingSplits.add(split);
// When processing split, start from offset
String query = split.getQuery() + " OFFSET " + offset;
}
}
}
4.2 Exactly-Once Recovery
Combination of checkpoint restore + sink two-phase commit ensures exactly-once:
Checkpoint N (completed):
Source offsets: [100, 200, 300]
Sink prepared commits: [XID-1, XID-2, XID-3]
Sink committer commits XID-1, XID-2, XID-3
↓ [Failure]
Recovery from Checkpoint N:
1. Restore source offsets: [100, 200, 300]
2. Sources start reading from offset 100, 200, 300
3. Sink writers restore state (may have uncommitted XIDs)
4. Sink committer retries committing XIDs (idempotent)
Result: Records 0-99, 100-199, 200-299 committed exactly once
Records from 100+ reprocessed but not duplicated (idempotent commit)
5. Configuration and Tuning
5.1 Checkpoint Configuration
env {
# Enable checkpoint
checkpoint.interval = 60000 # Trigger every 60 seconds
# Checkpoint timeout
checkpoint.timeout = 600000 # 10 minutes
# Min pause between checkpoints
min-pause = 10000 # 10 seconds
}
Checkpoint storage is configured on the engine side (e.g., config/seatunnel.yaml under seatunnel.engine.checkpoint.storage), rather than as job-level env options.
5.2 Tuning Guidelines
Checkpoint Interval:
- Shorter interval: Faster recovery, higher overhead
- Longer interval: Lower overhead, slower recovery
Trade-offs:
- Shorter interval → More frequent I/O → Higher storage cost
- Longer interval → Less overhead → Longer recovery time
Rule of Thumb: Set interval to tolerable recovery time (data loss window).
Checkpoint Timeout:
- Should be >> checkpoint interval
- Depends on state size and storage speed
- Choose based on end-to-end latency, state size, and checkpoint storage throughput
Storage Selection (SeaTunnel Engine):
localfile(LocalFileStorage): local filesystem, non-HAhdfs(HdfsStorage): Hadoop FileSystem-based backend; can work with HDFS/S3A/etc depending on Hadoop configuration
6. Performance Optimization
6.1 Async Checkpoint
State snapshot doesn't block data processing:
public class AsyncSnapshotSupport {
@Override
public void snapshotState(long checkpointId) {
// 1. Create snapshot of current state (fast, in-memory copy)
StateSnapshot snapshot = createSnapshot();
// 2. Continue data processing (doesn't wait for serialization/upload)
// ...
// 3. Async serialize and upload
CompletableFuture.runAsync(() -> {
byte[] serialized = serialize(snapshot);
checkpointStorage.upload(checkpointId, serialized);
}, executorService);
}
}
6.2 Incremental Checkpoint (Future)
Only checkpoint changed state:
// Full checkpoint (first)
Checkpoint 1: State = 1GB → Upload 1GB
// Incremental checkpoints (subsequent)
Checkpoint 2: State = 1.1GB → Upload 100MB (delta)
Checkpoint 3: State = 1.05GB → Upload 0MB (deletion doesn't upload)
Benefits:
- Reduce checkpoint time
- Lower storage I/O
- Faster checkpoint completion
Challenges:
- More complex state management
- Need to track state changes
- Restore requires chain of deltas
6.3 Local State Backend (Future)
Store hot state locally, checkpoint only summary:
// RocksDB local state backend
class RocksDBStateBackend {
private final RocksDB rocksDB; // Fast local SSD
@Override
public void put(String key, byte[] value) {
rocksDB.put(key.getBytes(), value); // Local write (fast)
}
@Override
public byte[] snapshotState() {
// Only checkpoint RocksDB snapshot reference
return rocksDB.createCheckpoint().getBytes();
}
}
7. Best Practices
7.1 State Size Optimization
1. Keep State Small:
// ❌ BAD: Buffer entire dataset
class BadSourceReader {
private List<SeaTunnelRow> bufferedRows = new ArrayList<>(); // May be huge!
List<State> snapshotState() {
return serialize(bufferedRows); // Huge state
}
}
// ✅ GOOD: Track offset only
class GoodSourceReader {
private long currentOffset = 0;
List<State> snapshotState() {
return serialize(currentOffset); // Small state
}
}
2. Use Efficient Serialization:
- Prefer Protobuf, Kryo over Java serialization
- Compress large state (gzip, snappy)
7.2 Monitoring
Key Metrics:
checkpoint_duration: Time from trigger to completioncheckpoint_size: Size of persisted checkpointcheckpoint_failure_rate: Percentage of failed checkpointscheckpoint_alignment_duration: Time spent aligning barriers
Alerting:
- Alert if
checkpoint_duration> threshold (e.g., 5 minutes) - Alert if
checkpoint_failure_rate> 10% - Alert if no checkpoint completed in 2x interval
7.3 Troubleshooting
Problem: Checkpoint timeout
Possible Causes:
- Task stuck (slow data processing)
- Large state (slow serialization/upload)
- Slow storage (network/disk I/O)
- Barrier alignment slow (skewed data)
Solutions:
- Increase checkpoint timeout
- Optimize state size
- Use faster storage
- Tune parallelism
Problem: High checkpoint overhead
Possible Causes:
- Checkpoint interval too short
- Large state size
- Slow storage
Solutions:
- Increase checkpoint interval
- Optimize state size
- Enable incremental checkpoint (when available)
8. Related Resources
9. References
Key Source Files
Academic Papers
- Chandy, K. M., & Lamport, L. (1985). "Distributed Snapshots: Determining Global States of Distributed Systems"
- Carbone, P., et al. (2017). "State Management in Apache Flink"