跳到主要内容
版本:Next

Http

Http 源连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

用于从 Http 读取数据。

支持的数据源信息

为了使用 Http 连接器,需要以下依赖项。 可以通过 install-plugin.sh 或从 Maven 中央仓库下载。

数据源支持的版本依赖
Http通用下载

源选项

名称类型是否必须默认值描述
urlString-Http 请求 URL。
schemaConfig-Http 和 seatunnel 数据结构映射
schema.fieldsConfig-上游数据的 schema 字段
json_fieldConfig-此参数帮助您配置 schema,因此此参数必须与 schema 一起使用。
pageingConfig-此参数用于分页查询
pageing.page_fieldString-此参数用于指定请求中的页面字段名称。它可以在 headers、params 或 body 中使用占位符,如 ${page_field}。
pageing.use_placeholder_replacementBooleanfalse如果为 true,则使用占位符替换(${field})用于 headers、parameters 和 body 值,否则使用基于键的替换。
pageing.total_page_sizeInt-此参数用于控制总页数
pageing.batch_sizeInt-每个请求返回的批量大小,用于在总页数未知时确定是否继续
pageing.start_page_numberInt1指定同步开始的页码
pageing.page_typeStringPageNumber此参数用于指定页面类型,如果未设置则为 PageNumber,仅支持 PageNumberCursor
pageing.cursor_fieldString-此参数用于指定请求参数中的游标字段名称。
pageing.cursor_response_fieldString-此参数指定从中检索游标的响应字段。
content_jsonString-此参数可以获取一些 json 数据。如果您只需要 'book' 部分的数据,配置 content_field = "$.store.book.*"
formatStringtext上游数据的格式,目前仅支持 json text,默认为 text
methodStringgetHttp 请求方法,仅支持 GET、POST 方法。
headersMap-Http 头信息。
paramsMap-Http 参数。
bodyString-Http 请求体,程序将自动添加 http header application/json,body 是 jsonbody。
poll_interval_millisInt-流模式下请求 http api 的间隔(毫秒)。
retryInt-如果请求 http 返回 IOException 的最大重试次数。
retry_backoff_multiplier_msInt100请求 http 失败时的重试退避时间(毫秒)乘数。
retry_backoff_max_msInt10000请求 http 失败时的最大重试退避时间(毫秒)
enable_multi_linesBooleanfalse
connect_timeout_msInt12000连接超时设置,默认 12 秒。
socket_timeout_msInt60000Socket 超时设置,默认 60 秒。
common-options-源插件通用参数,请参考 Source Common Options 获取详细信息
keep_params_as_formBooleanfalse是否按照表单提交参数,用于兼容旧行为。当为 true 时,params 参数的值通过表单提交。
keep_page_param_as_http_paramBooleanfalse是否将分页参数设置为 params。用于兼容旧行为。

如何创建 Http 数据同步作业

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Http {
plugin_output = "http"
url = "http://mockserver:1080/example/http"
method = "GET"
format = "json"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
}
}

# 控制台打印读取的 Http 数据
sink {
Console {
parallelism = 1
}
}

参数解释

format

当您指定 format 为 json 时,您还应该指定 schema 选项,例如:

上游数据如下:

{
"code": 200,
"data": "get success",
"success": true
}

您应该指定 schema 如下:


schema {
fields {
code = int
data = string
success = boolean
}
}

连接器将生成如下数据:

codedatasuccess
200get successtrue

当您指定 format 为 text 时,连接器不会对上游数据做任何处理,例如:

上游数据如下:

{
"code": 200,
"data": "get success",
"success": true
}

连接器将生成如下数据:

content
{"code": 200, "data": "get success", "success": true}

keep_params_as_form

为了兼容旧版本的 http。 当设置为 true 时,<params><pageing> 将以表单形式提交。 当设置为 false 时,<params> 将添加到 url 路径中,而 <pageing> 不会添加到 body 或表单中。它将替换 params 和 body 中的占位符。

keep_page_param_as_http_param

是否将分页参数设置为 params。 当设置为 true 时,<pageing> 设置为 <params>。 当设置为 false 时,当页面字段存在于 <body><params> 中时,替换值。

当设置为 false 时,配置示例:

body="""{"id":1,"page":"${page}"}"""
params={
page: "${page}"
}

params

默认情况下,参数将添加到 url 路径中。 如果您需要保持旧版本行为,请检查 keep_params_as_form。

body

HTTP body 用于在请求或响应中携带实际数据,包括 JSON、表单提交。

参考格式如下:

body="{"id":1,"name":"setunnel"}"

对于表单提交,请按如下设置 content-type。

headers {
Content-Type = "application/x-www-form-urlencoded"
}

content_json

此参数可以获取一些 json 数据。如果您只需要 'book' 部分的数据,配置 content_field = "$.store.book.*"

如果您的返回数据看起来像这样。

{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
},
"expensive": 10
}

您可以配置 content_field = "$.store.book.*" 并且返回的结果看起来像这样:

[
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
}
]

然后您可以使用更简单的 schema 获取所需的结果,如

Http {
url = "http://mockserver:1080/contentjson/mock"
method = "GET"
format = "json"
content_field = "$.store.book.*"
schema = {
fields {
category = string
author = string
title = string
price = string
}
}
}

这里是一个示例:

json_field

此参数帮助您配置 schema,因此此参数必须与 schema 一起使用。

如果您的数据看起来像这样:

{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
},
"expensive": 10
}

您可以通过如下配置任务来获取 'book' 的内容:

source {
Http {
url = "http://mockserver:1080/jsonpath/mock"
method = "GET"
format = "json"
json_field = {
category = "$.store.book[*].category"
author = "$.store.book[*].author"
title = "$.store.book[*].title"
price = "$.store.book[*].price"
}
schema = {
fields {
category = string
author = string
title = string
price = string
}
}
}
}

pageing

当前支持的分页类型是 PageNumberCursor。 如果您需要使用分页,您需要配置 pageing。默认分页类型是 PageNumber

1. PageNumber

使用 PageNumber 分页时,您可以在 HTTP 请求的不同部分包含页面参数:

  • 在 URL 参数中:将页面参数添加到 params 部分
  • 在请求体中:在 body JSON 中包含页面参数
  • 在头信息中:将页面参数添加到 headers 部分

您可以使用占位符如 ${page}use_placeholder_replacement = true 来动态更新这些值。占位符可以以各种格式使用:

  • 作为独立值:"${page}"
  • 带前缀/后缀:"10${page}""page-${page}"
  • 作为不带引号的数字:${page}(在 JSON 体中)
  • 在嵌套 JSON 结构中:{"pagination":{"page":${page}}}
示例 1:在 body 和 params 中使用页面参数
source {
Http {
url = "http://localhost:8080/mock/queryData"
method = "POST"
format = "json"
body="""{"id":1,"page":"${page}"}"""
content_field = "$.data.*"
params={
page: "${page}"
}
pageing={
#你可以不设置此参数,默认值是 PageNumber
page_type="PageNumber"
total_page_size=20
page_field=page
use_placeholder_replacement=true
#当不知道 total_page_size 时使用 batch_size,如果读取大小<batch_size 则完成,否则继续
#batch_size=10
}
schema = {
fields {
name = string
age = string
}
}
}
}
示例 2:在 headers 中使用页面参数
source {
Http {
url = "http://localhost:8080/mock/queryData"
method = "GET"
format = "json"
headers={
Page-Number = "${pageNo}"
Authorization = "Bearer token-123"
}
pageing={
page_field = pageNo
start_page_number = 1
batch_size = 10
use_placeholder_replacement = true
}
schema = {
fields {
name = string
age = string
}
}
}
}
示例 3:使用基于键的替换(不使用占位符)
source {
Http {
url = "http://localhost:8080/mock/queryData"
method = "GET"
format = "json"
params={
page = "1"
}
pageing={
page_field = page
start_page_number = 1
batch_size = 10
use_placeholder_replacement = false
}
schema = {
fields {
name = string
age = string
}
}
}
}
示例 4:在 headers 中使用带前缀的页码
source {
Http {
url = "http://localhost:8080/mock/queryData"
method = "GET"
format = "json"
headers = {
Page-Number = "10${page}" # 当 page=5 时将变为 "105"
Authorization = "Bearer token-123"
}
pageing = {
page_field = page
start_page_number = 5
batch_size = 10
use_placeholder_replacement = true
}
schema = {
fields {
name = string
age = string
}
}
}
}
示例 5:在 body 中使用不带引号的页码
source {
Http {
url = "http://localhost:8080/mock/queryData"
method = "POST"
format = "json"
body = """{"a":${page},"limit":10}""" # 不带引号的数字
pageing = {
page_field = page
start_page_number = 1
batch_size = 10
use_placeholder_replacement = true
}
schema = {
fields {
name = string
age = string
}
}
}
}
示例 6:使用带页面参数的嵌套 JSON 结构
source {
Http {
url = "http://localhost:8080/mock/queryData"
method = "POST"
format = "json"
body = """{"pagination":{"page":${page},"size":10},"filters":{"active":true}}""" # 嵌套结构
pageing = {
page_field = page
start_page_number = 1
total_page_size = 20
use_placeholder_replacement = true
}
schema = {
fields {
name = string
age = string
}
}
}
}

2. Cursor

pageing.page_type 参数必须设置为 Cursorcursor_field 是请求参数中游标的字段名称。 cursor_response_field 是响应数据中分页令牌字段的名称,我们应该将其添加到请求的分页字段中。


source {
Http {
plugin_output = "http"
url = "http://localhost:8080/mock/cursor_data"
method = "GET"
format = "json"
content_field = "$.data.*"
keep_page_param_as_http_param = true
pageing ={
page_type="Cursor"
cursor_field ="cursor"
cursor_response_field="$.paging.cursors.next"
}
schema = {
fields {
content=string
id=int
name=string
}
}
json_field = {
content = "$.data[*].content"
id = "$.data[*].id"
name = "$.data[*].name"
}
}
}

```

## 变更日志

<ChangeLog />