RESTful API
SeaTunnel有一个用于监控的API,可用于查询运行作业的状态和统计信息,以及最近完成的作业。监控API是RESTful风格的,它接受HTTP请求并使用JSON数据格式进行响应。
概述
v2版本的api使用jetty支持,与v1版本的接口规范相同 ,可以通过修改seatunnel.yaml
中的配置项来指定端口和context-path,
同时可以配置 enable-dynamic-port
开启动态端口(默认从 port
开始累加),默认为关闭,
如果enable-dynamic-port
为true
,我们将使用port
和port
+port-range
范围内未使用的端口,默认范围是100。
seatunnel:
engine:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
port-range: 100
同时也可以配置context-path,配置如下:
seatunnel:
engine:
http:
enable-http: true
port: 8080
context-path: /seatunnel
API参考
返回Zeta集群的概览
GET
/overview?tag1=value1&tag2=value2
(Returns an overview over the Zeta engine cluster.)
参数
参数名称 是否必传 参数类型 参数描述 tag键值对 否 字符串 一组标签值, 通过该标签值过滤满足条件的节点信息
响应
{
"projectVersion":"2.3.5-SNAPSHOT",
"gitCommitAbbrev":"DeadD0d0",
"totalSlot":"0",
"unassignedSlot":"0",
"works":"1",
"runningJobs":"0",
"finishedJobs":"0",
"failedJobs":"0",
"cancelledJobs":"0"
}
注意:
- 当你使用
dynamic-slot
时, 返回结果中的totalSlot
和unassignedSlot
将始终为0. 设置为固定的slot值后, 将正确返回集群中总共的slot数量以及未分配的slot数量. - 当添加标签过滤后,
works
,totalSlot
,unassignedSlot
将返回满足条件的节点的相关指标. 注意runningJobs
等job相关指标为集群级别结果, 无法根据标签进行过滤.
返回所有作业及其当前状态的概览
GET
/running-jobs
(返回所有作业及其当前状态的概览。)
参数
响应
[
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false,
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
}
}
]
返回作业的详细信息
GET
/job-info/:jobId
(返回作业的详细信息。)
参数
参数名称 是否必传 参数类型 参数描述 jobId 是 long job id
响应
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"metrics": {
"SourceReceivedCount": "",
"SourceReceivedQPS": "",
"SourceReceivedBytes": "",
"SourceReceivedBytesPerSeconds": "",
"SinkWriteCount": "",
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
"TableSourceReceivedQPS": {},
"TableSinkWriteCount": {},
"TableSinkWriteQPS": {},
"TableSinkWriteBytes": {},
"TableSinkWriteBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
"envOptions": {
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false
}
jobId
, jobName
, jobStatus
, createTime
, jobDag
, metrics
字段总会返回.
envOptions
, pluginJarsUrls
, isStartWithSavePoint
字段在Job在RUNNING状态时会返回
finishedTime
, errorMsg
字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL
当我们查询不到这个Job时,返回结果为:
{
"jobId" : ""
}
返回作业的详细信息
此API已经弃用,请使用/job-info/:jobId替代。
GET
/running-job/:jobId
(返回作业的详细信息。)
参数
参数名称 是否必传 参数类型 参数描述 jobId 是 long job id
响应
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
},
"finishedTime": "",
"errorMsg": null,
"envOptions": {
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false
}
jobId
, jobName
, jobStatus
, createTime
, jobDag
, metrics
字段总会返回.
envOptions
, pluginJarsUrls
, isStartWithSavePoint
字段在Job在RUNNING状态时会返回
finishedTime
, errorMsg
字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL
当我们查询不到这个Job时,返回结果为:
{
"jobId" : ""
}
返回所有已完成的作业信息
GET
/finished-jobs/:state
(返回所有已完成的作业信息。)
参数
参数名称 是否必传 参数类型 参数描述 state optional string finished job status. FINISHED
,CANCELED
,FAILED
,UNKNOWABLE
响应
[
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"errorMsg": null,
"createTime": "",
"finishTime": "",
"jobDag": {
"jobId": "",
"envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
"type": "",
"vertexName": "",
"tablePaths": [
""
]
}
],
"pipelineEdges": {}
},
"metrics": ""
}
]
返回系统监控信息
GET
/system-monitoring-information
(返回系统监控信息。)
参数
响应
[
{
"processors":"8",
"physical.memory.total":"16.0G",
"physical.memory.free":"16.3M",
"swap.space.total":"0",
"swap.space.free":"0",
"heap.memory.used":"135.7M",
"heap.memory.free":"440.8M",
"heap.memory.total":"576.5M",
"heap.memory.max":"3.6G",
"heap.memory.used/total":"23.54%",
"heap.memory.used/max":"3.73%",
"minor.gc.count":"6",
"minor.gc.time":"110ms",
"major.gc.count":"2",
"major.gc.time":"73ms",
"load.process":"24.78%",
"load.system":"60.00%",
"load.systemAverage":"2.07",
"thread.count":"117",
"thread.peakCount":"118",
"cluster.timeDiff":"0",
"event.q.size":"0",
"executor.q.async.size":"0",
"executor.q.client.size":"0",
"executor.q.client.query.size":"0",
"executor.q.client.blocking.size":"0",
"executor.q.query.size":"0",
"executor.q.scheduled.size":"0",
"executor.q.io.size":"0",
"executor.q.system.size":"0",
"executor.q.operations.size":"0",
"executor.q.priorityOperation.size":"0",
"operations.completed.count":"10",
"executor.q.mapLoad.size":"0",
"executor.q.mapLoadAllKeys.size":"0",
"executor.q.cluster.size":"0",
"executor.q.response.size":"0",
"operations.running.count":"0",
"operations.pending.invocations.percentage":"0.00%",
"operations.pending.invocations.count":"0",
"proxy.count":"8",
"clientEndpoint.count":"0",
"connection.active.count":"2",
"client.connection.count":"0",
"connection.count":"0"
}
]
提交作业
POST
/submit-job
(如果作业提交成功,返回jobId和jobName。)
参数
参数名称 是否必传 参数类型 参数描述 jobId optional string job id jobName optional string job name isStartWithSavePoint optional string if job is started with save point
请求体
{
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
}
响应
{
"jobId": 733584788375666689,
"jobName": "rest_api_test"
}
批量提交作业
POST
/submit-jobs
(如果作业提交成功,返回jobId和jobName。)
参数(在请求体中params字段中添加)
参数名称 是否必传 参数类型 参数描述 jobId optional string job id jobName optional string job name isStartWithSavePoint optional string if job is started with save point
请求体
[
{
"params":{
"jobId":"123456",
"jobName":"SeaTunnel-01"
},
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 1000,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
},
{
"params":{
"jobId":"1234567",
"jobName":"SeaTunnel-02"
},
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"result_table_name": "fake",
"row.num": 1000,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"source_table_name": ["fake"]
}
]
}
]
响应
[
{
"jobId": "123456",
"jobName": "SeaTunnel-01"
},{
"jobId": "1234567",
"jobName": "SeaTunnel-02"
}
]
停止作业
POST
/stop-job
(如果作业成功停止,返回jobId。)
批量停止作业
POST
/stop-jobs
(如果作业成功停止,返回jobId。)
加密配置
POST
/encrypt-config
(如果配置加密成功,则返回加密后的配置。)
请求体
{
"env": {
"parallelism": 1,
"shade.identifier":"base64"
},
"source": [
{
"plugin_name": "MySQL-CDC",
"schema" : {
"fields": {
"name": "string",
"age": "int"
}
},
"result_table_name": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "seatunnel",
"password": "seatunnel_password",
"table-name": "inventory_vwyw0n"
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "seatunnel",
"password": "seatunnel_password"
}
]
}
响应
{
"env": {
"parallelism": 1,
"shade.identifier": "base64"
},
"source": [
{
"plugin_name": "MySQL-CDC",
"schema": {
"fields": {
"name": "string",
"age": "int"
}
},
"result_table_name": "fake",
"parallelism": 1,
"hostname": "127.0.0.1",
"username": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk",
"table-name": "inventory_vwyw0n"
}
],
"transform": [],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "localhost:8123",
"database": "default",
"table": "fake_all",
"username": "c2VhdHVubmVs",
"password": "c2VhdHVubmVsX3Bhc3N3b3Jk"
}
]
}
更新运行节点的tags
POST
/update-tags
因为更新只能针对于某个节点,因此需要用当前节点ip:port用于更新
(如果更新成功,则返回"success"信息)
更新节点tags
请求体
如果请求参数是Map
对象,表示要更新当前节点的tags
{
"tag1": "dev_1",
"tag2": "dev_2"
}
响应
{
"status": "success",
"message": "update node tags done."
}
移除节点tags
请求体
如果参数为空Map
对象,表示要清除当前节点的tags
{}
响应
响应体将为:
{
"status": "success",
"message": "update node tags done."
}
请求参数异常
- 如果请求参数为空
响应
{
"status": "fail",
"message": "Request body is empty."
}
- 如果参数不是
Map
对象
响应
{
"status": "fail",
"message": "Invalid JSON format in request body."
}
获取所有节点日志内容
GET
/logs/:jobId
(返回日志列表。)
请求参数
参数(在请求体中params字段中添加)
参数名称 是否必传 参数类型 参数描述 jobId optional string job id
当jobId
为空时,返回所有节点的日志信息,否则返回指定jobId
在所有节点的的日志列表。
响应
返回请求节点的日志列表、内容
返回所有日志文件列表
如果你想先查看日志列表,可以通过GET
请求获取日志列表,http://localhost:8080/logs?format=json
[
{
"node": "localhost:8080",
"logLink": "http://localhost:8080/logs/job-899485770241277953.log",
"logName": "job-899485770241277953.log"
},
{
"node": "localhost:8080",
"logLink": "http://localhost:8080/logs/job-899470314109468673.log",
"logName": "job-899470314109468673.log"
}
]
当前支持的格式有json
和html
,默认为html
。
例子
获取所有节点jobId为733584788375666689
的日志信息:http://localhost:8080/logs/733584788375666689
获取所有节点日志列表:http://localhost:8080/logs
获取所有节点日志列表以JSON格式返回:http://localhost:8080/logs?format=json
获取日志文件内容:http://localhost:8080/logs/job-898380162133917698.log