资源管理
1. 概述
1.1 问题背景
分布式执行引擎必须高效管理计算资源:
- 资源分配: 如何公平高效地将任务分配给工作节点?
- 负载均衡: 如何在工作节点之间均匀分布工作负载?
- 资源隔离: 如何防止作业之间的资源争用?
- 动态扩缩容: 如何在不中断作业的情况下添加/删除工作节点?
- 异构资源: 如何处理具有不同能力的工作节点?
1.2 设计目标
SeaTunnel 的资源管理系统旨在:
- 细粒度控制: 基于槽位的分配实现精确资源管理
- 灵活策略: 针对不同场景的多种分配策略
- 基于标签的过滤: 将任务分配给特定的工作节点组
- 高可用性: 容忍工作节点故障并自动重新分配
- 可观测性: 实时跟踪资源使用和可用性
1.3 架构概览
┌──────────────────────────────────────────────────────────────┐
│ JobMaster │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 请求资源 │ │
│ │ • 计算所需槽位 │ │
│ │ • (可选)表达资源需求(以当前引擎实现为准) │ │
│ │ • 应用标签过滤器(可选) │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ ResourceManager │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 工作节点注册表 │ │
│ │ • WorkerProfile (每个工作节点) │ │
│ │ - 总资源 │ │
│ │ - 可用资源 │ │
│ │ - 已分配槽位 │ │
│ │ - 未分配槽位 │ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 分配策略 │ │
│ │ • RandomStrategy / SlotRatioStrategy / SystemLoadStrategy│ │
│ └────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 槽位管理 │ │
│ │ • 分配槽位 │ │
│ │ • 释放槽位 │ │
│ │ • 跟踪槽位使用 │ │
│ └────────────────────────────────────────────────────┘ │
└──────────────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ 工作节点 │
│ │
│ Worker 1 Worker 2 Worker N │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Slot 1 │ │ Slot 1 │ │ Slot 1 │ │
│ │ Slot 2 │ │ Slot 2 │ │ Slot 2 │ │
│ │ ... │ │ ... │ │ ... │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└──────────────────────────────────────────────────────────────┘
2. 核心概念
2.1 槽位(Slot)
槽位是资源分配的基本单位。
一个槽位通常由以下信息描述:
- slotID: 槽位唯一标识
- worker: 槽位所在工作节点地址
- resourceProfile: 槽位可提供的资源容量(CPU/内存等)
关键属性:
- 粒度化: 每个槽位可以托管一个或多个任务(任务融合)
- 类型化: 槽位具有资源配置文件(CPU、内存)
- 有状态: 槽位跟踪分配状态(已分配/未分配)
示例:
- slotID = 1001
- worker = worker-1:5801
- resourceProfile = cpu.cores / heapMemory.bytes(字段以引擎实现为准)
2.2 ResourceProfile
描述资源需求或容量。
一个资源配置文件(ResourceProfile)通常包括:
- cpu.cores: CPU 核心数(当前实现为整数 core)
- heap-memory.bytes: JVM 堆内存(字节)
说明:当前资源调度在很多场景下以“slot 是否可用”为主要约束;ResourceProfile 作为扩展点存在,但是否支持按 CPU/内存精细调度取决于具体版本实现。
用途:
- 任务需求: 引擎在申请槽位时携带资源需求(当前实现常为默认/空需求,更多能力视版本而定)
- 槽位容量: 每个槽位公布其可用资源
- 匹配: ResourceManager 将任务需求与槽位容量匹配
2.3 WorkerProfile
表示工作节点的资源和槽位清单。
工作节点画像(WorkerProfile)通常包含:
- address: 工作节点地址
- totalResourceProfile: 节点总资源(常由槽位资源汇总得到)
- availableResourceProfile: 当前可用资源
- assignedSlots/unassignedSlots: 已分配/未分配槽位清单
- tags: 节点标签(用于过滤、隔离、数据局部性)
生命周期:
- 注册: 工作节点启动时向 ResourceManager 注册
- 心跳: 工作节点定期发送心跳及更新的资源信息
- 分配: ResourceManager 从未分配池中分配槽位
- 释放: 完成的任务释放槽位,将其移回未分配池
- 注销: 工作节点离开集群(优雅或故障)
3. ResourceManager
3.1 接口
ResourceManager 对外暴露的关键能力可以概括为:
- applyResources(jobId, resourceProfiles, tagFilters): 为作业申请一组满足资源需求的槽位;当资源不足时返回失败(例如抛出 NoEnoughResourceException 或以失败的 Future 表达)
- releaseResources(jobId, slots): 作业完成/失败后释放槽位,回收至可分配池
- heartbeat(workerProfile): 接收工作节点心跳并更新其资源/槽位信息
- memberRemoved(event): 处理成员移除事件(故障或优雅下线),触发资源回收与作业侧重调度
3.2 实现: AbstractResourceManager
典型实现会维护以下状态与策略:
- registerWorker: 已注册工作节点到 WorkerProfile 的映射(由心跳持续刷新)
- slotAllocationStrategy: 选择 worker 的分配策略(随机/比例/系统负载等)
- 故障检测: 结合 worker 心跳上报与 Hazelcast 成员事件判定节点失联(具体阈值以配置/实现为准)
申请资源的关键流程:
- 根据 tagFilters 过滤候选工作节点
- 针对每个 ResourceProfile 需求,使用策略选择一个满足容量约束的未分配槽位
- 将槽位从“未分配池”标记为“已分配”,并同步更新 WorkerProfile
- 返回分配结果;如任一需求无法满足,则整体失败并由 JobMaster 决定重试/降级
释放资源的关键流程:
- 将 slots 标记为未分配并回收到可分配池
- 更新工作节点可用资源与槽位统计
4. 槽位分配策略
4.1 RandomStrategy
随机选择具有可用槽位的工作节点。
核心思路:
- 过滤出“资源满足 requiredProfile 且存在未分配槽位”的工作节点集合
- 在集合中随机选择一个工作节点
- 从该节点的未分配槽位中挑选一个满足容量约束的槽位返回
优点:
- 简单快速
- 无协调开销
- 适用于同构集群
缺点:
- 无负载均衡
- 可能造成热点
4.2 SlotRatioStrategy
优先选择可用槽位比率更高的工作节点。
核心思路:
- 过滤出资源满足 requiredProfile 的工作节点
- 计算并选择“可用槽位比率 = unassigned / (assigned + unassigned)”最高的节点
- 从该节点的未分配槽位中选择一个满足容量约束的槽位
优点:
- 更好的负载均衡
- 均匀分布任务
- 防止工作节点过载
缺点:
- 计算稍多
- 可能不考虑实际 CPU/内存负载
4.3 SystemLoadStrategy
选择系统负载(CPU/内存使用)最低的工作节点。
核心思路:
- 基于心跳上报的资源使用情况计算节点负载(例如 CPU/内存利用率的加权)
- 在满足 requiredProfile 的候选节点中选择负载最低者
- 从该节点挑选一个满足容量约束的未分配槽位
负载计算的关键在于:
- 依赖指标的时效性与稳定性(过旧会导致误判,过抖会导致分配抖动)
- 需要明确权重与采样窗口,避免频繁迁移/重分配
优点:
- 考虑实际资源使用
- 最适合异构集群
- 优化集群利用率
缺点:
- 需要实时指标
- 计算成本更高
- 如果负载快速变化可能抖动
5. 基于标签的槽位过滤
5.1 用例
数据局部性:
env {
# 作业级 worker 标签过滤(key/value 全量匹配)
tag_filter = {
zone = "us-west-1"
}
}
资源专业化:
env {
tag_filter = {
resource = "gpu"
}
}
多租户:
env {
job.name = "tenant-a-job"
tag_filter = {
tenant = "a"
}
}
5.2 TagFilter
TagFilter 可以视为一个简单的键值匹配条件:
- key/value 需要同时匹配工作节点的 attributes(标签由集群部署侧维护)
- 多个 TagFilter 之间通常按“与(AND)”组合:任一不匹配则该节点被过滤
过滤过程:
过滤过程通常为:
- 枚举所有已注册工作节点
- 对每个节点依次校验 filters;全部匹配则保留
- 得到候选节点集合,交给槽位分配策略继续挑选
6. 资源分配流程
6.1 正常分配
sequenceDiagram
participant JM as JobMaster
participant RM as ResourceManager
participant Worker as Worker Node
JM->>JM: Generate PhysicalPlan
JM->>JM: Calculate required resources
JM->>RM: applyResources(profiles, tags)
RM->>RM: Filter workers by tags
RM->>RM: Select workers by strategy
RM->>RM: Allocate slots
RM-->>JM: Return SlotProfiles
JM->>JM: Assign slots to PhysicalVertices
loop For each task
JM->>Worker: DeployTaskOperation(task, slot)
Worker->>Worker: Execute task in slot
Worker-->>JM: ACK
end
6.2 资源不足
sequenceDiagram
participant JM as JobMaster
participant RM as ResourceManager
JM->>RM: applyResources(100 slots)
RM->>RM: Check available slots
Note over RM: Only 50 slots available
RM-->>JM: NoEnoughResourceException
JM->>JM: Retry with backoff
Note over JM: Wait for resources to free up
JM->>RM: applyResources(100 slots)
RM-->>JM: Success (after resources freed)
6.3 资源释放
sequenceDiagram
participant Task as SeaTunnelTask
participant JM as JobMaster
participant RM as ResourceManager
Task->>Task: Task completes/fails
Task->>JM: Task finished
JM->>RM: releaseResources(slots)
RM->>RM: Mark slots as unassigned
RM->>RM: Update WorkerProfile
Note over RM: Slots available for<br/>new allocations
7. 故障处理
7.1 工作节点故障
检测:
- worker 心跳/资源上报异常或停止(阈值以配置/实现为准)
- Hazelcast 成员移除事件
恢复:
ResourceManager 侧的典型处理步骤:
- 从注册表中移除失联/下线的工作节点
- 识别该节点上“已分配”的槽位集合(即可能承载了正在运行的任务)
- 将槽位丢失事件通知到对应的 JobMaster(或由 Coordinator 统一转发)
- 由作业侧触发 failover:标记任务失败、从检查点恢复、重新申请新槽位并重新部署
JobMaster 响应:
- 标记失败槽位上的任务为 FAILED
- 从最新检查点恢复
- 从 ResourceManager 请求新槽位
- 重新部署任务
7.2 ResourceManager 故障
高可用性:
- ResourceManager 状态是无状态的(工作节点注册表从心跳重建)
- 新的 ResourceManager 实例在主节点故障转移时启动
- 工作节点通过心跳机制重新注册
恢复:
恢复要点:
- ResourceManager 需要能够重新建立“工作节点注册表”:工作节点通过心跳主动上报其 address、资源、槽位与标签
- ResourceManager 需要定期清理超时心跳的节点,避免将任务分配给已失联节点
- 由于注册表可由心跳重建,故障转移后的新实例可以在短时间内恢复资源视图(视心跳间隔与超时参数而定)
8. 配置
8.1 槽位配置
seatunnel {
engine {
slot-service {
# 是否启用动态槽位
dynamic-slot = true
# 固定槽位数(仅在 dynamic-slot = false 时生效)
slot-num = 2
}
}
}
8.2 资源策略
seatunnel {
engine {
slot-service {
# worker 选择策略(取值需能映射到 AllocateStrategy 枚举)
# 选项: random / slot_ratio / system_load
slot-allocate-strategy = slot_ratio
}
}
}
8.3 资源配置说明
资源相关的可配置项以 config/seatunnel.yaml 与当前引擎实现为准;在没有稳定对外能力前,不建议在文档中给出“每槽位 CPU/内存”等固定配置样例,避免与实际实现不一致。
9. 监控和指标
9.1 关键指标
集群级别:
cluster.workers.total: 已注册工作节点总数cluster.workers.active: 最近有心跳的工作节点cluster.slots.total: 所有工作节点的槽位总数cluster.slots.available: 未分配的槽位cluster.slots.assigned: 使用中的槽位
每个工作节点:
worker.cpu.available: 可用 CPU 核心worker.memory.available: 可用内存(MB)worker.slots.total: 工作节点上的总槽位数worker.slots.assigned: 已分配的槽位worker.heartbeat.last: 最后一次心跳时间戳
每个作业:
job.slots.requested: 作业请求的槽位数job.slots.allocated: 成功分配的槽位数job.resource.wait_time: 等待资源的时间
9.2 可观测性
资源仪表板示例:
集群资源:
工作节点: 10 (全部健康)
总槽位: 20
可用槽位: 8
利用率: 60%
资源消费者排名:
job-123: 6 个槽位 (mysql-cdc → elasticsearch)
job-456: 4 个槽位 (kafka → jdbc)
job-789: 2 个槽位 (file → s3)
工作节点分布:
worker-1: 2/2 槽位 (100%)
worker-2: 1/2 槽位 (50%)
worker-3: 2/2 槽位 (100%)
...
10. 最佳实践
10.1 槽位大小设置
一般指南:
每个工作节点的槽位数 = CPU 核心数 - 1 (为操作系统保留 1 个)
示例:
8 核机器 → 6-7 个槽位
16 核机器 → 14-15 个槽位
每个槽位的内存:
堆内存 = 总内存 * 0.7 / 槽位数
示例:
32GB 机器, 6 个槽位
每个槽位的堆内存 = 32GB * 0.7 / 6 ≈ 3.7GB
10.2 策略选择
使用 RandomStrategy 当:
- 同构集群(所有工作节点相同)
- 简单部署
- 快速分配比完美平衡更重要
使用 SlotRatioStrategy 当:
- 需要良好的负载均衡
- 混合作业大小
- 中等集群规模(< 100 个工作节点)
使用 SystemLoadStrategy 当:
- 异构集群
- 工作节点具有不同的 CPU/内存
- 优化资源利用率至关重要
10.3 标签使用
数据局部性:
# 按区域/可用区标记工作节点(部署侧:Hazelcast member attributes,示意)
# worker-1.attributes.zone = "us-west-1a"
# worker-2.attributes.zone = "us-east-1b"
# 将作业分配到与数据相同的区域(作业级过滤)
env {
tag_filter = {
zone = "us-west-1a"
}
}
资源隔离:
# 为关键作业分配专用工作节点(部署侧 attributes,示意)
# worker-1.attributes.priority = "high"
# worker-4.attributes.priority = "normal"
env {
job.name = "critical-job"
tag_filter = {
priority = "high"
}
}
11. 相关资源
12. 参考资料
进一步阅读
- Google Borg - 大规模集群管理
- Apache YARN - Hadoop 中的资源管理
- Kubernetes - 容器编排和调度