数据写入 Sink 架构
1. 概述
1.1 问题背景
在分布式环境中向外部系统写入数据面临关键挑战:
- 精确一次保证:如何确保每条记录精确写入一次,而不是零次或多次?
- 事务一致性:如何在多个并行写入器之间原子性地提交写入操作?
- 容错:如何从失败中恢复而不丢失数据或产生重复?
- 反压:如何处理慢速数据 Sink而不使系统过载?
- 幂等性:如何使重试操作安全?
1.2 设计目标
SeaTunnel 的数据 Sink 旨在:
- 提供可验证的一致性语义:在外部系统支持事务/幂等提交的前提下,通过两阶段提交与检查点边界实现端到端一致性
- 支持并行写入:通过多个写入器实例扩展吞吐量
- 启用全局协调:协调分布式写入器之间的提交
- 确保容错:从失败中恢复而不产生数据不一致
- 提供灵活性:支持各种提交策略
1.3 适用场景
- 事务性数据库(JDBC 与 XA 事务)
- 消息队列(Kafka 与事务)
- 文件系统(原子文件重命名)
- 数据湖(Iceberg、Hudi、Paimon 与表事务)
- 搜索引擎(Elasticsearch 与版本控制)
2. 架构设计
2.1 整体架构
┌────────────────────────────────────────────────────────────────┐
│ 执行引擎任务侧(数据面) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkWriter<IN, CommitInfoT, StateT> │ │
│ │ │ │
│ │ • 从上游接收记录 │ │
│ │ • 缓冲并写入数据 │ │
│ │ • 在 checkpoint 边界产出 commitInfo │ │
│ │ • 快照写入器状态 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ │ checkpoint 完成通知触发 │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkCommitter<CommitInfoT>(可选) │ │
│ │ │ │
│ │ • 使 prepare 的变更对外可见 │ │
│ │ • 失败可重试,要求幂等 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
│
│ (可选:聚合提交任务,单实例)
▼
┌────────────────────────────────────────────────────────────────┐
│ 执行引擎协调侧(控制面) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SinkAggregatedCommitter<CommitInfoT, │ │
│ │ AggregatedCommitInfoT>(可选)│ │
│ │ │ │
│ │ • 聚合多个 writer 的 commitInfo │ │
│ │ • 执行一次全局提交(单线程语义) │
│ └──────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
│
▼
外部数据系统
(数据库 / 文件 / 消息队列)
2.2 核心组件
SeaTunnelSink(工厂接口)
作为创建写入器和提交器的工厂的顶层接口。
契约要点(概念级):
- 创建 writer:在工作节点(Task)侧创建
SinkWriter,负责接收记录并写入 - 恢复 writer:在 failover 后用 checkpoint 中的 writerState 恢复未完成写入
- 创建 committer(可选):当数据 Sink 需要两阶段提交时使用。它负责在 checkpoint 成功后提交
prepareCommit(checkpointId)产生的提交信息;运行位置取决于执行引擎实现(例如在 SeaTunnel Engine 中由 Sink 任务在notifyCheckpointComplete回调中触发) - 创建 aggregated committer(可选):当外部系统需要“全局单点提交”(如表级提交/单次元数据提交)时使用。该提交器按单线程语义执行,通常与 committer 二选一;如果同时提供两者,需要确保语义不会重复提交/发生冲突
- 描述写入 schema:通过
CatalogTable明确输入字段、投影与类型约束
这组工厂方法的核心目的是把“写入(数据面)”与“提交(控制面)”解耦,使得 checkpoint 成为全局一致性边界。
关键设计点:
- 两阶段提交扩展点:写入器(必需)+(committer 或 aggregated committer,按需求选择)
- committer 与 aggregated committer 在很多场景下应视为互斥选项:前者提交每个 writer 的变更,后者先聚合再做一次全局提交
- 写入器始终是必需的(执行实际的数据写入)
2.3 交互流程
正常写入流程(带两阶段提交)
sequenceDiagram
participant CP as 框架(Checkpoint/回调)
participant Writer1 as SinkWriter 1
participant Writer2 as SinkWriter 2
participant Committer as SinkCommitter
participant Sink as 数据 Sink
Writer1->>Writer1: write(record)
Writer2->>Writer2: write(record)
CP->>Writer1: triggerBarrier(checkpointId)
CP->>Writer2: triggerBarrier(checkpointId)
Writer1->>Writer1: prepareCommit(checkpointId)
Writer1->>CP: ack(commitInfo1)
Writer2->>Writer2: prepareCommit(checkpointId)
Writer2->>CP: ack(commitInfo2)
CP->>CP: 所有写入器已确认
CP->>CP: 持久化检查点
Note over CP,Committer: checkpoint 成功后,框架触发提交(触发点/运行位置取决于执行引擎实现)
CP->>Committer: commit([commitInfo1, commitInfo2])
Committer->>Sink: 提交 writer1 的变更
Committer->>Sink: 提交 writer2 的变更
Committer->>CP: ack()
CP->>Writer1: notifyCheckpointComplete(checkpointId)
CP->>Writer2: notifyCheckpointComplete(checkpointId)
失败和重试流程
sequenceDiagram
participant CP as 框架(Checkpoint/回调)
participant Writer as SinkWriter
participant Committer as SinkCommitter
participant Sink as 数据 Sink
Note over Writer: 写入进行中(事务/临时文件)
CP->>Writer: triggerBarrier(checkpointId)
Writer->>Writer: prepareCommit(checkpointId)
Writer->>CP: ack(commitInfo)
alt Checkpoint 成功
Note over CP,Committer: checkpoint 成功后,框架/引擎会触发提交(触发点/运行位置取决于执行引擎实现)
CP->>Committer: commit([commitInfo])
Committer->>Sink: 提交变更(幂等)
Committer->>CP: ack()
CP->>Writer: notifyCheckpointComplete(checkpointId)
else Checkpoint 失败/中止
CP->>Writer: notifyCheckpointAborted(checkpointId)
Note over Writer,Committer: 引擎可选调用 commit/abort 相关回调进行清理;\n务必保证 commit 幂等,避免只依赖 abort 完成回滚
end
Note over Committer: commit 失败由框架重试\n必须保证幂等
核心职责:
write(element):接收上游记录并写入外部系统的“临时/事务内”区域(避免对外可见)prepareCommit(checkpointId):在 checkpoint 边界生成提交信息(commitInfo),要求“无副作用”(不让数据对外可见)snapshotState(checkpointId):把“已写入但未提交”的可恢复状态写入 checkpoint(事务句柄、文件清单、位点等)abortPrepare():用于回滚prepareCommit阶段产生的副作用(是否会被调用取决于执行引擎/实现路径)notifyCheckpointAborted():checkpoint 失败/中止回调(若 writer 或运行时实现了 CheckpointListener,可在此做清理)notifyCheckpointComplete():checkpoint 成功且提交完成后做清理(释放事务、删除临时文件/状态等)
关键要求:
prepareCommit(...)必须无副作用;真正让数据对外可见的动作应发生在 committer 的commit()阶段snapshotState()必须覆盖所有“已写入但未提交”的中间结果,否则恢复会丢数据或重复写- 清理路径必须可重试且幂等:同一 checkpoint 的 abort/cleanup 可能被调用多次
典型实现形态(不绑定具体源码):
- 事务型数据 Sink :writer 在事务内写入,prepare 阶段产出事务句柄/提交 token,commit 阶段统一提交
- 文件型数据 Sink :writer 写临时文件并产出“文件清单/元数据”,commit 阶段做原子 rename/元数据提交
3.2 SinkCommitter 接口
提交器由执行引擎在 checkpoint 成功后触发执行,用于使本次 checkpoint 对应的“准备写入”对外可见(运行位置取决于具体执行引擎实现)。
契约要点:
commit(commitInfos):对一批提交信息执行提交;必须支持重试,因此要求幂等- 返回值语义:返回“仍需重试/未完成”的提交信息集合(框架会在后续 checkpoint 或恢复路径中重试)
abort(commitInfos)(可选):放弃提交并做资源清理(例如回滚事务、删除临时文件)
关键要求:
commit()必须是幂等的(使用相同的 commitInfo 调用两次应该是安全的)- 返回失败的 commitInfos 列表(将被重试)
- 应优雅地处理部分失败
实现提示:
- 需要明确幂等键(例如事务 id、文件清单版本、外部系统的去重 key)
- 需要能区分“可重试失败”(网络抖动)与“不可重试失败”(权限/数据非法),避免无意义重试
3.3 SinkAggregatedCommitter 接口
聚合提交器为所有写入器执行单个全局提交。
契约要点:
combine(commitInfos):把多个 writer 的提交信息聚合成“全局一次提交”所需的元数据commit(aggregatedCommitInfos):对聚合后的信息做全局提交;同样必须幂等restoreCommit(...):恢复聚合提交器状态,确保 failover 后仍可完成/重试“全局提交”
使用场景:
- Hive 表提交(所有分区的单个 COMMIT TRANSACTION)
- Iceberg 表提交(单个表快照)
- 全局索引更新(为所有写入更新一次索引)
实现示例(语义级,以 Hive 为例):
combine: Sink 总所有 writer 产生的文件/分区元数据,形成一次表级提交所需的“全量变更集”commit:对外部 metastore/表事务执行一次全局原子提交;失败后需要可重试且不重复(幂等)
4. 设计考量
4.1 设计权衡
两阶段提交
优点:
- 强一致性保证(精确一次)
- 自动失败恢复
- 准备和提交之间的清晰分离
缺点:
- 增加延迟(数据仅在提交后可见)
- 需要数据 Sink 中的事务支持
- 提交信息的额外状态
- 更复杂的实现
何时使用:
- 金融交易、计费、审计日志
- 外部系统支持事务/幂等提交,并且业务需要端到端精确一次的场景
何时不使用:
- 至少一次可接受(日志、指标)
- 数据 Sink 不支持事务
- 需要超低延迟
两层提交 vs 聚合提交
两层(写入器 → 提交器):
- 每个写入器的提交独立处理
- 并行提交操作
- 适用于大多数数据 Sink
聚合提交(写入器 → 聚合提交器):
- 所有写入器的提交信息先被聚合
- 执行一次全局提交操作(单线程语义)
- 适用于需要“单点表级提交/元数据提交”的外部系统(Hive、Iceberg 等)
4.2 性能考量
批量写入
将多条记录合并为一次外部写入(JDBC batch / bulk API / multi-put)。
好处:
- 摊销每条记录的开销
- 减少网络往返
- 更好的吞吐量
异步写入
将外部 I/O 下沉到后台线程/异步客户端,以降低 write() 的尾延迟。但需要明确:
- 如果采用异步写入,
prepareCommit(...)需要等待所有“已接收记录”的异步写入完成,才能生成可靠的 commitInfo - 需要有背压/限流策略,避免异步积压导致 OOM
连接池
对 JDBC/HTTP 等短连接成本高的外部系统,优先使用连接池/长连接以减少握手与认证开销。
4.3 幂等性模式
1. 自然幂等性(Upsert)
利用外部系统提供的 Upsert/Merge 语义,使“重复提交同一业务键”不会产生重复数据。
2. 去重键
为每条写入生成可重复的幂等键(业务主键、事件 id、事务 id),并让外部系统/协议基于该键实现去重。
3. 外部去重表
在外部系统维护“已提交记录表/去重索引”,提交前先检查是否已提交;这种方式通用但会引入额外写放大与一致性成本。
5. 最佳实践
5.1 使用建议
1. 选择适当的提交级别
- 仅 writer:适合至少一次(数据写入立即可见,恢复会重放,需外部幂等)
- writer + committer:适合两阶段提交(checkpoint 边界产出 commitInfo,并在 checkpoint 成功后触发 commit;触发位置取决于执行引擎实现)
- writer + aggregated committer:适合表级事务/全局单点提交(先聚合多个 writer 的 commitInfo,再执行一次全局提交)
2. 正确的状态管理
- 状态里只放“恢复必需信息”(事务句柄/临时文件清单/最后一致性偏移量等),避免把大批数据放进状态
- 恢复时要能把状态回放到 writer 内部,并确保 prepare/commit 的幂等性仍成立
3. 资源管理
- 明确资源生命周期:writer/committer 的
close()必须可重复调用且不抛出不可恢复异常 - 尽量做到“按创建逆序关闭”,并确保失败时也能释放外部资源(连接/事务/临时文件)
5.2 常见陷阱
1. prepareCommit(...) 中的副作用
prepareCommit(...)只能生成“提交所需的凭据/元数据”,不能让数据对外可见- 一旦在 prepare 阶段产生外部副作用,failover 重放会导致重复写入
2. 非幂等提交
commit()需要支持相同 commitInfo 的重复调用(网络抖动/主节点重启会发生)- 优先依赖外部系统的幂等语义(upsert/merge/幂等事务 id),否则需要自建去重机制
3. 大状态
- 避免把大量缓冲记录放进 checkpoint 状态,状态越大越容易导致 checkpoint 超时与恢复变慢
- 把大数据留在外部系统(临时文件/事务日志),状态里只保留引用与必要元数据
5.3 调试技巧
1. 启用 XA 事务日志
- 记录关键生命周期事件:事务开始/prepare/commit/rollback、checkpointId、writerIndex
- 避免记录敏感数据(凭据/明文 SQL/用户数据),以可追踪的事务 id 为主
2. 跟踪提交进度
- 输出/采集提交指标:提交耗时、失败率、重试次数、单次提交大小
- 重点关注“提交堆积”与“commitInfo 重试风暴”,它们通常意味着幂等设计或外部系统稳定性问题
3. 测试失败场景
- 覆盖典型故障:writer 崩溃、committer 崩溃、commit 超时、重复提交、checkpoint 超时
- 验证点:不丢数据、不重复可见(或重复可见但幂等)、恢复后可继续推进 checkpoint
6. 相关资源
7. 参考资料
示例连接器
- 简单数据 Sink :ConsoleSink(输出到标准输出)
- 文件数据 Sink :FileSink(原子文件重命名)
- 数据库数据 Sink :JdbcSink(XA 事务)
- 流式数据 Sink :KafkaSink(Kafka 事务)
- 表数据 Sink :IcebergSink(表提交)