跳到主要内容
版本:Next

JsonPath

JSONPath 转换插件

描述

支持使用 JSONPath 选择数据

属性

名称类型是否必须默认值
columnsArrayYes
row_error_handle_wayEnumNoFAIL

common options [string]

转换插件的常见参数, 请参考 Transform Plugin 了解详情

row_error_handle_way [Enum]

该选项用于指定当该行发生错误时的处理方式,默认值为 FAIL

  • FAIL:选择FAIL时,数据格式错误会阻塞并抛出异常。
  • SKIP:选择SKIP时,数据格式错误会跳过该行数据。

columns[array]

属性

名称类型是否必须默认值
src_fieldStringYes
dest_fieldStringYes
pathStringYes
dest_typeStringNoString
column_error_handle_wayEnumNo

src_field

要解析的 JSON 源字段

支持的Seatunnel数据类型

  • STRING
  • BYTES
  • ARRAY
  • MAP
  • ROW

dest_field

使用 JSONPath 后的输出字段

dest_type

目标字段的类型

path

Jsonpath

column_error_handle_way [Enum]

该选项用于指定当列发生错误时的处理方式。

  • FAIL:选择FAIL时,数据格式错误会阻塞并抛出异常。
  • SKIP:选择SKIP时,数据格式错误会跳过此列数据。
  • SKIP_ROW:选择SKIP_ROW时,数据格式错误会跳过此行数据。

读取 JSON 示例

从源读取的数据是像这样的 JSON

{
"data": {
"c_string": "this is a string",
"c_boolean": true,
"c_integer": 42,
"c_float": 3.14,
"c_double": 3.14,
"c_decimal": 10.55,
"c_date": "2023-10-29",
"c_datetime": "16:12:43.459",
"c_array":["item1", "item2", "item3"]
}
}

假设我们想要使用 JsonPath 提取属性。

transform {
JsonPath {
plugin_input = "fake"
plugin_output = "fake1"
columns = [
{
"src_field" = "data"
"path" = "$.data.c_string"
"dest_field" = "c1_string"
},
{
"src_field" = "data"
"path" = "$.data.c_boolean"
"dest_field" = "c1_boolean"
"dest_type" = "boolean"
},
{
"src_field" = "data"
"path" = "$.data.c_integer"
"dest_field" = "c1_integer"
"dest_type" = "int"
},
{
"src_field" = "data"
"path" = "$.data.c_float"
"dest_field" = "c1_float"
"dest_type" = "float"
},
{
"src_field" = "data"
"path" = "$.data.c_double"
"dest_field" = "c1_double"
"dest_type" = "double"
},
{
"src_field" = "data"
"path" = "$.data.c_decimal"
"dest_field" = "c1_decimal"
"dest_type" = "decimal(4,2)"
},
{
"src_field" = "data"
"path" = "$.data.c_date"
"dest_field" = "c1_date"
"dest_type" = "date"
},
{
"src_field" = "data"
"path" = "$.data.c_datetime"
"dest_field" = "c1_datetime"
"dest_type" = "time"
},
{
"src_field" = "data"
"path" = "$.data.c_array"
"dest_field" = "c1_array"
"dest_type" = "array<string>"
}
]
}
}

那么数据结果表 fake1 将会像这样

datac1_stringc1_booleanc1_integerc1_floatc1_doublec1_decimalc1_datec1_datetimec1_array
too much content not to showthis is a stringtrue423.143.1410.552023-10-2916:12:43.459["item1", "item2", "item3"]

读取 SeatunnelRow 示例

假设数据行中的一列的类型是 SeatunnelRow,列的名称为 col

SeatunnelRow(col)other
nameage....
a18....

JsonPath 转换将 seatunnel 的值转换为一个数组。

transform {
JsonPath {
plugin_input = "fake"
plugin_output = "fake1"

row_error_handle_way = FAIL
columns = [
{
"src_field" = "col"
"path" = "$[0]"
"dest_field" = "name"
"dest_type" = "string"
},
{
"src_field" = "col"
"path" = "$[1]"
"dest_field" = "age"
"dest_type" = "int"
}
]
}
}

那么数据结果表 fake1 将会像这样:

nameagecolother
a18["a",18]...

配置异常数据处理策略

您可以配置 row_error_handle_waycolumn_error_handle_way 来处理异常数据,两者都是非必填项。

row_error_handle_way 配置对行数据内所有数据异常进行处理,column_error_handle_way 配置对某列数据异常进行处理,优先级高于 row_error_handle_way

跳过异常数据行

配置跳过任意列有异常的整行数据

transform {
JsonPath {

row_error_handle_way = SKIP

columns = [
{
"src_field" = "json_data"
"path" = "$.f1"
"dest_field" = "json_data_f1"
},
{
"src_field" = "json_data"
"path" = "$.f2"
"dest_field" = "json_data_f2"
}
]
}
}

跳过部分异常数据列

配置仅对 json_data_f1 列数据异常跳过,填充空值,其他列数据异常继续抛出异常中断处理程序

transform {
JsonPath {

row_error_handle_way = FAIL

columns = [
{
"src_field" = "json_data"
"path" = "$.f1"
"dest_field" = "json_data_f1"

"column_error_handle_way" = "SKIP"
},
{
"src_field" = "json_data"
"path" = "$.f2"
"dest_field" = "json_data_f2"
}
]
}
}

部分列异常跳过整行

配置仅对 json_data_f1 列数据异常跳过整行数据,其他列数据异常继续抛出异常中断处理程序

transform {
JsonPath {

row_error_handle_way = FAIL

columns = [
{
"src_field" = "json_data"
"path" = "$.f1"
"dest_field" = "json_data_f1"

"column_error_handle_way" = "SKIP_ROW"
},
{
"src_field" = "json_data"
"path" = "$.f2"
"dest_field" = "json_data_f2"
}
]
}
}

更新日志

  • 添加 JsonPath 转换