跳到主要内容
版本:Next

SeaTunnel Python SDK

SeaTunnel Python SDK 允许开发者使用 Python 与 SeaTunnel Engine 进行交互。它提供了围绕 SeaTunnel REST API 的封装,用于作业提交、管理和集群监控。

安装

SeaTunnel Python SDK 目前还没有发布到 PyPI,需要直接从 SeaTunnel 源码目录安装:

git clone https://github.com/apache/seatunnel.git
cd seatunnel
python -m pip install ./tools/seatunnel-python-sdk

需要 Python 3.9 或更高版本。

使用方法

作业管理

您可以使用客户端的 jobs 属性来管理作业。

提交作业

from seatunnel import SeaTunnelClient, SubmitJobQueryParams

config = """
env {
job.mode = "batch"
}
source {
FakeSource {
plugin_output = "fake"
row.num = 100
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
transform {}
sink {
Console {}
}
"""

with SeaTunnelClient(base_url="http://localhost:8080") as client:
query_params = SubmitJobQueryParams()
response = client.jobs.submit_job(conf=config, params=query_params)
print(response)

获取运行中的作业

with SeaTunnelClient(base_url="http://localhost:8080") as client:
running_jobs = client.jobs.get_running_jobs()
print(running_jobs)

获取作业详情

with SeaTunnelClient(base_url="http://localhost:8080") as client:
job_id = 12345 # 替换为实际的 Job ID
job_details = client.jobs.get_job_details(jobId=job_id)
print(job_details)

集群监控

您可以使用 cluster 属性来获取集群信息。

获取集群概览

with SeaTunnelClient(base_url="http://localhost:8080") as client:
overview = client.cluster.get_overview({"jobId": "12345"})
print(overview)

API 参考

Client (客户端)

SeaTunnelClient(base_url: str)

  • base_url: SeaTunnel Engine 的地址 (例如:http://127.0.0.1:8080)。

Jobs (client.jobs)

  • submit_job(conf: str, params: SubmitJobQueryParams): 使用配置字符串提交作业。
  • submit_job_from_file(filePath: str, params: SubmitJobFileQueryParams): 使用配置文件路径提交作业。
  • submit_jobs(confs: str): 批量提交多个作业。
  • stop_job(params: StopJobQueryParams): 停止特定作业。
  • stop_jobs(params: list[StopJobQueryParams]): 停止多个作业。
  • get_running_jobs(): 获取当前正在运行的作业列表。
  • get_job_details(jobId: int): 获取特定作业的详细信息。
  • get_finished_jobs_info(state: Optional[JobStatus] = None): 获取已完成作业的信息,可按状态过滤。

Cluster (client.cluster)

  • get_overview(params: Optional[dict[str, str]] = None): 获取集群概览,并将过滤标签作为查询参数透传给 REST API。
  • get_metrics(): 获取集群指标。
  • get_log(): 获取单个节点的日志。
  • get_logs(jobId: Optional[int] = None): 获取所有节点的日志,可按 Job ID 过滤。

Helper Classes (辅助类)

  • SubmitJobQueryParams: 提交作业的参数。
  • SubmitJobFileQueryParams: 从文件提交作业的参数。
  • StopJobQueryParams: 停止作业的参数。
  • JobStatus: 作业状态枚举。