Http
Http 源连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
描述
用于从 Http 读取数据。
支持的数据源信息
为了使用 Http 连接器,需要以下依赖项。 可以通过 install-plugin.sh 或从 Maven 中央仓库下载。
| 数据源 | 支持的版本 | 依赖 | 
|---|---|---|
| Http | 通用 | 下载 | 
源选项
| 名称 | 类型 | 是否必须 | 默认值 | 描述 | 
|---|---|---|---|---|
| url | String | 是 | - | Http 请求 URL。 | 
| schema | Config | 否 | - | Http 和 seatunnel 数据结构映射 | 
| schema.fields | Config | 否 | - | 上游数据的 schema 字段 | 
| json_field | Config | 否 | - | 此参数帮助您配置 schema,因此此参数必须与 schema 一起使用。 | 
| pageing | Config | 否 | - | 此参数用于分页查询 | 
| pageing.page_field | String | 否 | - | 此参数用于指定请求中的页面字段名称。它可以在 headers、params 或 body 中使用占位符,如 ${page_field}。 | 
| pageing.use_placeholder_replacement | Boolean | 否 | false | 如果为 true,则使用占位符替换(${field})用于 headers、parameters 和 body 值,否则使用基于键的替换。 | 
| pageing.total_page_size | Int | 否 | - | 此参数用于控制总页数 | 
| pageing.batch_size | Int | 否 | - | 每个请求返回的批量大小,用于在总页数未知时确定是否继续 | 
| pageing.start_page_number | Int | 否 | 1 | 指定同步开始的页码 | 
| pageing.page_type | String | 否 | PageNumber | 此参数用于指定页面类型,如果未设置则为 PageNumber,仅支持 PageNumber和Cursor。 | 
| pageing.cursor_field | String | 否 | - | 此参数用于指定请求参数中的游标字段名称。 | 
| pageing.cursor_response_field | String | 否 | - | 此参数指定从中检索游标的响应字段。 | 
| content_json | String | 否 | - | 此参数可以获取一些 json 数据。如果您只需要 'book' 部分的数据,配置 content_field = "$.store.book.*"。 | 
| format | String | 否 | text | 上游数据的格式,目前仅支持 jsontext,默认为text。 | 
| method | String | 否 | get | Http 请求方法,仅支持 GET、POST 方法。 | 
| headers | Map | 否 | - | Http 头信息。 | 
| params | Map | 否 | - | Http 参数。 | 
| body | String | 否 | - | Http 请求体,程序将自动添加 http header application/json,body 是 jsonbody。 | 
| poll_interval_millis | Int | 否 | - | 流模式下请求 http api 的间隔(毫秒)。 | 
| retry | Int | 否 | - | 如果请求 http 返回 IOException的最大重试次数。 | 
| retry_backoff_multiplier_ms | Int | 否 | 100 | 请求 http 失败时的重试退避时间(毫秒)乘数。 | 
| retry_backoff_max_ms | Int | 否 | 10000 | 请求 http 失败时的最大重试退避时间(毫秒) | 
| enable_multi_lines | Boolean | 否 | false | |
| connect_timeout_ms | Int | 否 | 12000 | 连接超时设置,默认 12 秒。 | 
| socket_timeout_ms | Int | 否 | 60000 | Socket 超时设置,默认 60 秒。 | 
| common-options | 否 | - | 源插件通用参数,请参考 Source Common Options 获取详细信息 | |
| keep_params_as_form | Boolean | 否 | false | 是否按照表单提交参数,用于兼容旧行为。当为 true 时,params 参数的值通过表单提交。 | 
| keep_page_param_as_http_param | Boolean | 否 | false | 是否将分页参数设置为 params。用于兼容旧行为。 | 
| json_filed_missed_return_null | Boolean | 否 | false | 当 JSON 字段缺失时,设置为 true 并返回 null,否则返回错误。 | 
如何创建 Http 数据同步作业
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  Http {
    plugin_output = "http"
    url = "http://mockserver:1080/example/http"
    method = "GET"
    format = "json"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          C_MAP = "map<string, string>"
          C_ARRAY = "array<int>"
          C_STRING = string
          C_BOOLEAN = boolean
          C_TINYINT = tinyint
          C_SMALLINT = smallint
          C_INT = int
          C_BIGINT = bigint
          C_FLOAT = float
          C_DOUBLE = double
          C_BYTES = bytes
          C_DATE = date
          C_DECIMAL = "decimal(38, 18)"
          C_TIMESTAMP = timestamp
        }
      }
    }
  }
}
# 控制台打印读取的 Http 数据
sink {
  Console {
    parallelism = 1
  }
}
参数解释
format
当您指定 format 为 json 时,您还应该指定 schema 选项,例如:
上游数据如下:
{
  "code": 200,
  "data": "get success",
  "success": true
}
您应该指定 schema 如下:
schema {
  fields {
    code = int
    data = string
    success = boolean
  }
}
连接器将生成如下数据:
| code | data | success | 
|---|---|---|
| 200 | get success | true | 
当您指定 format 为 text 时,连接器不会对上游数据做任何处理,例如:
上游数据如下:
{
  "code": 200,
  "data": "get success",
  "success": true
}
连接器将生成如下数据:
| content | 
|---|
| {"code": 200, "data": "get success", "success": true} | 
keep_params_as_form
为了兼容旧版本的 http。
当设置为 true 时,<params> 和 <pageing> 将以表单形式提交。
当设置为 false 时,<params> 将添加到 url 路径中,而 <pageing> 不会添加到 body 或表单中。它将替换 params 和 body 中的占位符。
keep_page_param_as_http_param
是否将分页参数设置为 params。
当设置为 true 时,<pageing> 设置为 <params>。
当设置为 false 时,当页面字段存在于 <body> 或 <params> 中时,替换值。
当设置为 false 时,配置示例:
body="""{"id":1,"page":"${page}"}"""
params={
 page: "${page}"
}
params
默认情况下,参数将添加到 url 路径中。 如果您需要保持旧版本行为,请检查 keep_params_as_form。
body
HTTP body 用于在请求或响应中携带实际数据,包括 JSON、表单提交。
参考格式如下:
body="{"id":1,"name":"setunnel"}"
对于表单提交,请按如下设置 content-type。
headers {
    Content-Type = "application/x-www-form-urlencoded"
}
content_json
此参数可以获取一些 json 数据。如果您只需要 'book' 部分的数据,配置 content_field = "$.store.book.*"。
如果您的返回数据看起来像这样。
{
  "store": {
    "book": [
      {
        "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      {
        "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      }
    ],
    "bicycle": {
      "color": "red",
      "price": 19.95
    }
  },
  "expensive": 10
}
您可以配置 content_field = "$.store.book.*" 并且返回的结果看起来像这样:
[
  {
    "category": "reference",
    "author": "Nigel Rees",
    "title": "Sayings of the Century",
    "price": 8.95
  },
  {
    "category": "fiction",
    "author": "Evelyn Waugh",
    "title": "Sword of Honour",
    "price": 12.99
  }
]
然后您可以使用更简单的 schema 获取所需的结果,如
Http {
  url = "http://mockserver:1080/contentjson/mock"
  method = "GET"
  format = "json"
  content_field = "$.store.book.*"
  schema = {
    fields {
      category = string
      author = string
      title = string
      price = string
    }
  }
}
这里是一个示例:
- 测试数据可以在此链接找到 mockserver-config.json
- 任务配置请参考此链接 http_contentjson_to_assert.conf。
json_field
此参数帮助您配置 schema,因此此参数必须与 schema 一起使用。
如果您的数据看起来像这样:
{
  "store": {
    "book": [
      {
        "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      {
        "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      }
    ],
    "bicycle": {
      "color": "red",
      "price": 19.95
    }
  },
  "expensive": 10
}
您可以通过如下配置任务来获取 'book' 的内容:
source {
  Http {
    url = "http://mockserver:1080/jsonpath/mock"
    method = "GET"
    format = "json"
    json_field = {
      category = "$.store.book[*].category"
      author = "$.store.book[*].author"
      title = "$.store.book[*].title"
      price = "$.store.book[*].price"
    }
    schema = {
      fields {
        category = string
        author = string
        title = string
        price = string
      }
    }
  }
}
- 测试数据可以在此链接找到 mockserver-config.json
- 任务配置请参考此链接 http_jsonpath_to_assert.conf。
pageing
当前支持的分页类型是 PageNumber 和 Cursor。
如果您需要使用分页,您需要配置 pageing。默认分页类型是 PageNumber。
1. PageNumber
使用 PageNumber 分页时,您可以在 HTTP 请求的不同部分包含页面参数:
- 在 URL 参数中:将页面参数添加到 params部分
- 在请求体中:在 bodyJSON 中包含页面参数
- 在头信息中:将页面参数添加到 headers部分
您可以使用占位符如 ${page} 与 use_placeholder_replacement = true 来动态更新这些值。占位符可以以各种格式使用:
- 作为独立值:"${page}"
- 带前缀/后缀:"10${page}"或"page-${page}"
- 作为不带引号的数字:${page}(在 JSON 体中)
- 在嵌套 JSON 结构中:{"pagination":{"page":${page}}}
示例 1:在 body 和 params 中使用页面参数
source {
    Http {
      url = "http://localhost:8080/mock/queryData"
      method = "POST"
      format = "json"
      body="""{"id":1,"page":"${page}"}"""
      content_field = "$.data.*"
      params={
       page: "${page}"
      }
      pageing={
       #你可以不设置此参数,默认值是 PageNumber
       page_type="PageNumber"
       total_page_size=20
       page_field=page
       use_placeholder_replacement=true
       #当不知道 total_page_size 时使用 batch_size,如果读取大小<batch_size 则完成,否则继续
       #batch_size=10
      }
      schema = {
        fields {
          name = string
          age = string
        }
      }
    }
}
示例 2:在 headers 中使用页面参数
source {
    Http {
      url = "http://localhost:8080/mock/queryData"
      method = "GET"
      format = "json"
      headers={
        Page-Number = "${pageNo}"
        Authorization = "Bearer token-123"
      }
      pageing={
        page_field = pageNo
        start_page_number = 1
        batch_size = 10
        use_placeholder_replacement = true
      }
      schema = {
        fields {
          name = string
          age = string
        }
      }
    }
}
示例 3:使用基于键的替换(不使用占位符)
source {
    Http {
      url = "http://localhost:8080/mock/queryData"
      method = "GET"
      format = "json"
      params={
        page = "1"
      }
      pageing={
        page_field = page
        start_page_number = 1
        batch_size = 10
        use_placeholder_replacement = false
      }
      schema = {
        fields {
          name = string
          age = string
        }
      }
    }
}
示例 4:在 headers 中使用带前缀的页码
source {
    Http {
      url = "http://localhost:8080/mock/queryData"
      method = "GET"
      format = "json"
      headers = {
        Page-Number = "10${page}"  # 当 page=5 时将变为 "105"
        Authorization = "Bearer token-123"
      }
      pageing = {
        page_field = page
        start_page_number = 5
        batch_size = 10
        use_placeholder_replacement = true
      }
      schema = {
        fields {
          name = string
          age = string
        }
      }
    }
}
示例 5:在 body 中使用不带引号的页码
source {
    Http {
      url = "http://localhost:8080/mock/queryData"
      method = "POST"
      format = "json"
      body = """{"a":${page},"limit":10}"""  # 不带引号的数字
      pageing = {
        page_field = page
        start_page_number = 1
        batch_size = 10
        use_placeholder_replacement = true
      }
      schema = {
        fields {
          name = string
          age = string
        }
      }
    }
}
示例 6:使用带页面参数的嵌套 JSON 结构
source {
    Http {
      url = "http://localhost:8080/mock/queryData"
      method = "POST"
      format = "json"
      body = """{"pagination":{"page":${page},"size":10},"filters":{"active":true}}"""  # 嵌套结构
      pageing = {
        page_field = page
        start_page_number = 1
        total_page_size = 20
        use_placeholder_replacement = true
      }
      schema = {
        fields {
          name = string
          age = string
        }
      }
    }
}
2. Cursor
pageing.page_type 参数必须设置为 Cursor。
cursor_field 是请求参数中游标的字段名称。
cursor_response_field 是响应数据中分页令牌字段的名称,我们应该将其添加到请求的分页字段中。
source {
    Http {
      plugin_output = "http"
      url = "http://localhost:8080/mock/cursor_data"
      method = "GET"
      format = "json"
      content_field = "$.data.*"
      keep_page_param_as_http_param = true
      pageing ={
        page_type="Cursor"
        cursor_field ="cursor"
        cursor_response_field="$.paging.cursors.next"
      }
    schema = {
      fields {
        content=string
        id=int
        name=string
      }
    }
   json_field = {
    content = "$.data[*].content"
    id = "$.data[*].id"
    name = "$.data[*].name"
   }
  }
}
```
## 变更日志
<ChangeLog />