跳到主要内容
版本:Next

精确一次语义

1. 概述

1.1 问题背景

分布式数据处理面临基本的交付保证挑战:

  • 至多一次: 记录可能丢失(对关键数据不可接受)
  • 至少一次: 记录可能重复(导致计数错误、重复收费)
  • 精确一次: 每条记录恰好处理一次(理想但复杂)

实际影响:

场景: 金融交易处理

至少一次:
交易 $100 处理两次 → 用户被收费 $200 ❌

精确一次:
交易 $100 处理一次 → 用户被收费 $100 ✅

1.2 设计目标

SeaTunnel 的精确一次语义旨在:

  1. 端到端语义: 在启用 checkpoint 且外部系统支持事务/幂等提交等前提下,尽量提供可验证的一致性语义(避免丢失或重复可见)
  2. 透明实现: 框架处理复杂性,用户最少配置
  3. 性能效率: 在维护保证的同时最小化开销
  4. 故障弹性: 在任务/工作节点/主节点故障时维护保证
  5. 广泛适用性: 支持事务型和非事务型目标端

1.3 一致性级别

级别保证用例实现
至多一次无重复,可能丢失非关键日志无重试
至少一次无丢失,可能重复幂等处理重试但无事务
精确一次无丢失,无重复金融、计费、审计检查点 + 两阶段提交

2. 理论基础

2.1 Chandy-Lamport 算法

概念: 无需停止整个系统的分布式快照。

机制:

  1. 协调器向数据流注入屏障(标记)
  2. 收到屏障后,每个算子:
    • 快照其本地状态
    • 将屏障转发到下游
  3. 当所有算子都完成快照时,我们有一个一致的全局快照

关键属性: 快照表示跨分布式系统状态的一致切割。

2.2 两阶段提交协议

概念: 跨分布式参与者的原子提交。

阶段:

  1. 准备阶段: 所有参与者准备(尚无副作用)
  2. 提交阶段: 协调器决定提交/中止,所有参与者执行

在 SeaTunnel 中:

  • 准备: 检查点期间的 SinkWriter.prepareCommit(...)
  • 提交: 检查点完成后的 SinkCommitter.commit()

3. 精确一次架构

3.1 端到端流水线

┌──────────────────────────────────────────────────────────────┐
│ 数据源 │
│ • 从外部系统读取 │
│ • 跟踪偏移量/位置 │
│ • 在检查点中快照偏移量 │
└──────────────────────────────┬───────────────────────────────┘

▼ 检查点屏障
┌──────────────────────────────────────────────────────────────┐
│ 转换器 │
│ • 处理记录 │
│ • 快照转换器状态(如果有) │
└──────────────────────────────┬───────────────────────────────┘

▼ 检查点屏障
┌──────────────────────────────────────────────────────────────┐
│ 目标端写入器 │
│ • 缓冲写入 │
│ • prepareCommit(checkpointId) → 生成 CommitInfo (阶段 1) │
│ • 快照写入器状态 │
└──────────────────────────────┬───────────────────────────────┘

│ CommitInfo

┌──────────────────────────────────────────────────────────────┐
│ CheckpointCoordinator │
│ • 收集所有 CommitInfos │
│ • 持久化 CompletedCheckpoint │
│ • 触发提交/回调(触发点取决于执行引擎实现) │
└──────────────────────────────┬───────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 目标端提交器 │
│ • commit(CommitInfos) → 应用变更 (阶段 2) │
│ • 必须是幂等的 │
└──────────────────────────────┬───────────────────────────────┘


外部目标端
(变更可见)

3.2 关键组件

数据 Source 源偏移量管理:

Source 侧要想参与端到端精确一次,通常需要满足:

  • 可追踪进度: 读取过程持续维护“已处理到哪里”(如 Kafka offset、文件 position、CDC LSN 等)
  • 可快照: 在 checkpoint 时将进度写入状态后端(属于检查点状态的一部分)
  • 可提交/可确认: 在 checkpoint 成功后再将进度提交到外部系统(例如提交 offset)
  • 幂等提交: 由于重试、故障转移可能触发重复提交,提交动作必须可重放且结果一致

目标端两阶段提交:

Sink 侧两阶段提交(2PC)的语义拆分:

  • Writer(阶段 1 / prepare)
    • 将写入先落到“暂不可见”的位置(事务缓冲、临时文件、暂存表/分区等)
    • 在 barrier 到达时执行 prepare:封存本轮写入,并产出 CommitInfo(例如事务 ID、临时路径、批次号)
    • 将 CommitInfo 上报给协调器并随 CompletedCheckpoint 一起持久化
  • Committer(阶段 2 / commit)
    • 仅在 checkpoint 完成后运行 commit(CommitInfos),使外部副作用“变得可见”(提交事务、原子重命名、发布 batch)
    • 必须幂等:重复提交同一 CommitInfo 不能产生重复数据;典型做法是利用外部系统的事务 ID / 唯一键 / 幂等 API

4. 实现模式

4.1 事务型目标端(XA)

典型场景: 支持 XA/2PC 的事务型数据库等

实现:

实现要点:

  • Writer 使用 XA/事务能力将写入暂存于事务中
  • 在 prepareCommit 阶段产出可被提交器识别的事务标识(CommitInfo)
  • Committer 在 checkpoint 完成后提交事务,并对重复 commit 做幂等处理

优点:

  • 强一致性保证
  • 失败时自动回滚

缺点:

  • 需要数据库 XA 支持
  • 更高延迟(2PC 开销)
  • 准备阶段期间锁争用

4.2 幂等目标端(Upsert)

典型场景: 支持 upsert/merge 或自然幂等写入的目标端(例如按主键覆盖写入的存储)

实现:

实现要点:

  • 为每条记录选择一个确定性的幂等键(通常来自主键/业务唯一键)
  • 外部系统使用“按键覆盖/更新”(Upsert)语义:同一幂等键多次写入,最终只保留一个结果
  • prepareCommit 只需要保证批次边界(例如 flush 缓冲),不一定需要单独的 commit 阶段

关键: 相同主键 → 相同文档 → 幂等更新

优点:

  • 无事务开销
  • 更低延迟

缺点:

  • 需要唯一键
  • 无法处理复杂事务

4.3 基于日志的目标端(Kafka)

实现:

实现要点:

  • 使用 Kafka 事务能力将一个 checkpoint 边界内的写入纳入同一个事务
  • prepareCommit 阶段完成 flush 并产出事务标识(CommitInfo)
  • commit 阶段提交事务,使消息对下游消费者可见
  • 对故障恢复时的重复提交,需要依赖 Kafka 事务/幂等机制保证不会产生重复可见结果

4.4 文件目标端(原子重命名)

实现:

实现要点:

  • Writer 将数据写入临时路径/临时文件(对外不可见)
  • prepareCommit 阶段封存临时文件并产出 CommitInfo(临时路径 + 目标路径)
  • Committer 只做“原子可见化”动作(例如原子重命名/原子移动)
  • 需要确认底层文件系统对 rename/move 的原子性语义;在对象存储上往往需要额外设计(否则不能直接宣称精确一次)

关键: 原子重命名确保文件要么完全可见要么不可见。

5. 故障场景和恢复

5.1 检查点前任务故障

时间线:
t0: 检查点 N 完成
t1: 处理记录 [1000-2000]
t2: 任务失败 ❌
t3: 从检查点 N 恢复
t4: 重新处理记录 [1000-2000]

结果:
✅ 无数据丢失(记录重新处理)
✅ 无重复(故障前未提交任何内容)

5.2 prepareCommit 后任务故障

时间线:
t0: 检查点 N 进行中
t1: SinkWriter.prepareCommit(...) → XID-123 已准备
t2: 任务失败 ❌ (提交前)
t3: 从检查点 N-1 恢复
t4: 重新处理记录
t5: 新的 prepareCommit(...) → XID-124 已准备
t6: 提交器提交 XID-124

结果:
✅ XID-123 从未提交(超时后自动回滚)
✅ XID-124 已提交(正确数据)

5.3 提交期间提交器故障

时间线:
t0: 检查点 N 完成
t1: 提交器开始提交 [XID-100, XID-101, XID-102]
t2: 提交 XID-100 ✅
t3: 提交器失败 ❌ (XID-101, XID-102 未提交)
t4: 新提交器重试 [XID-100, XID-101, XID-102]
t5: 提交 XID-100 (已提交,幂等) ✅
t6: 提交 XID-101 ✅
t7: 提交 XID-102 ✅

结果:
✅ 所有 XID 最终提交
✅ 无重复(幂等提交)

5.4 网络分区

时间线:
t0: SinkWriter 准备 XID-200
t1: 检查点完成
t2: 提交器发送 commit(XID-200)
t3: 网络分区 ⚠️ (提交成功,但 ACK 丢失)
t4: 提交器重试 commit(XID-200)
t5: XID-200 已提交(幂等)

结果:
✅ 数据恰好提交一次
✅ 幂等性防止重复

6. 幂等性要求

6.1 为什么幂等性很重要

问题: 网络故障、重试和故障转移可能导致重复的提交尝试。

解决方案: 提交器操作必须是幂等的。

典型对比:

  • 非幂等提交: 重试一次就会额外插入一份数据(产生重复)
  • 幂等提交: 重试多次与提交一次效果一致(例如使用唯一键约束/Upsert/事务 ID 去重)

6.2 实现幂等性

策略 1: 检查后执行

要点:

  • 提交前先查询“该 CommitInfo 是否已完成提交”(通过事务表、元数据表、外部系统 API)
  • 已提交则直接返回成功;未提交则提交并记录结果

策略 2: 数据库级幂等性

要点:

  • 使用唯一约束/唯一索引来承载“去重键”(事务 ID / 批次 ID / checkpointId)
  • 将“写入去重标记”和“应用外部副作用”放在同一事务或同一原子语义内,避免部分成功导致的不一致

策略 3: 自然幂等性(XA)

要点:

  • 依赖 XA 协议本身对重复 commit 的处理语义
  • 对“已提交/不存在”的错误码进行兼容处理,将其视为幂等成功

7. 性能考虑

7.1 检查点间隔权衡

短间隔(10-30s):
✅ 快速恢复(重新处理更少)
❌ 更高开销(频繁快照)
❌ 更多提交操作

长间隔(5-10分钟):
✅ 更低开销(快照更少)
❌ 恢复更慢(重新处理更多)
✅ 更少提交操作

建议: 大多数工作负载 60-120 秒

7.2 批量大小优化

优化思路:

  • 使用批量写入将外部系统交互的固定开销摊薄(例如每 1000 条 flush 一次)
  • 批量过大可能增加延迟与内存占用;批量过小会增加外部 I/O 次数

影响: 1000x 批量 → ~10x 吞吐量提升

7.3 异步检查点

优化思路:

  • 在 barrier 到达时尽快做“轻量快照”(例如复制状态引用/增量快照元数据)
  • 将序列化与上传等重 I/O 工作放到异步线程执行,减少对主处理线程的阻塞
  • 需要权衡:异步快照会增加内存峰值(需要暂存 snapshot),并要求正确处理并发可见性

影响: 快照上传时数据处理继续

8. 配置

8.1 启用精确一次

env {
# 检查点配置
checkpoint.interval = 60000 # 60 秒
checkpoint.timeout = 600000 # 10 分钟

# 精确一次模式(vs 至少一次)
# 使用事务型目标端时这是隐式的
}

8.2 数据源配置

Kafka:

source {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "my_topic"

# Kafka 消费者偏移量提交
commit_on_checkpoint = true # 检查点后提交偏移量
}
}

JDBC:

source {
JDBC {
url = "jdbc:mysql://..."

# 基于查询的数据源(幂等重新处理)
query = "SELECT * FROM table WHERE id >= ? AND id < ?"
}
}

8.3 目标端配置

JDBC (XA):

sink {
JDBC {
url = "jdbc:mysql://..."

# 启用 XA 事务
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
is_exactly_once = true
}
}

Kafka (事务):

sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "output_topic"

# Kafka 事务
transaction.id = "seatunnel-kafka-sink"
enable.idempotence = true
}
}

9. 测试精确一次

9.1 功能测试

建议的功能测试步骤:

  1. 向数据源注入固定集合的记录(可重复、可计数、最好带主键)
  2. 触发/等待至少一个 checkpoint 完成
  3. 在关键窗口注入故障(例如 prepareCommit 之后、commit 之前;或 barrier 对齐期间)
  4. 恢复后继续运行并结束作业
  5. 验证输出端:输入计数 = 输出计数,且基于主键/去重键无重复

9.2 混沌测试

建议的混沌测试维度:

  • 随机杀任务/杀 worker/重启 master
  • 注入网络延迟、短暂网络分区、外部存储抖动
  • 暂停/延迟 checkpoint 触发,模拟对齐与上传压力

验收标准:

  • 输入计数与输出计数一致
  • 输出端无重复(主键/去重键唯一)
  • 对关键失败窗口(prepareCommit/commit)覆盖到位

9.3 监控验证

要跟踪的指标:

source.records_read = 1,000,000
sink.records_written = 1,000,000
sink.records_committed = 1,000,000

✅ 所有计数匹配 → 精确一次验证

10. 最佳实践

10.1 选择适当的目标端

使用事务型目标端(XA)用于:

  • 金融交易
  • 计费系统
  • 审计日志
  • 关键数据

使用幂等目标端用于:

  • 高吞吐量场景
  • 可接受最终一致性
  • 无事务支持

10.2 处理有毒记录

处理建议:

  • 明确“有毒记录”的判定范围(格式错误/约束冲突/不可恢复的业务异常)
  • 选择策略:写入死信队列(DLQ)并告警、跳过并计数、或触发失败(强一致场景)
  • 与精确一次语义的关系:跳过会破坏端到端“无丢失”,但可能是可接受的业务权衡;需在文档/配置中显式声明

10.3 监控检查点健康

关键指标:

  • checkpoint.duration: 应 < 间隔的 10%
  • checkpoint.failure_rate: 应 < 1%
  • checkpoint.size: 监控随时间增长

警报:

如果 checkpoint.duration > 300s 则告警
如果 checkpoint.failure_rate > 5% 则告警
如果在 2x 间隔内无检查点则告警

11. 相关资源

12. 参考资料

学术论文

进一步阅读