跳到主要内容
版本:Next

SQL

SQL 转换插件

描述

使用 SQL 来转换给定的输入行。

SQL 转换使用内存中的 SQL 引擎,我们可以通过 SQL 函数和 SQL 引擎的能力来实现转换任务。

属性

名称类型是否必须默认值
plugin_inputstringyes-
plugin_outputstringyes-
querystringyes-

plugin_input [string]

源表名称,查询 SQL 表名称必须与此字段匹配。

query [string]

查询 SQL,它是一个简单的 SQL,支持基本的函数和条件过滤操作。但是,复杂的 SQL 尚不支持,包括:多源表/行连接和聚合操作等。

查询表达式可以是select [table_name.]column_a,这时会去查询列为column_a的列,table_name为可选项 也可以是select c_row.c_inner_row.column_b,这时会去查询列c_row下的c_inner_rowcolumn_b嵌套结构查询中,不能存在table_name

示例

源端数据读取的表格如下:

idnameage
1Joy Ding20
2May Ding21
3Kin Dom24
4Joy Dom22

我们使用 SQL 查询来转换源数据,类似这样:

transform {
Sql {
plugin_input = "fake"
plugin_output = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from dual where id>0"
}
}

那么结果表 fake1 中的数据将会更新为:

idnameage
1Joy Ding_21
2May Ding_22
3Kin Dom_25
4Joy Dom_23

嵌套结构查询

例如你的上游数据结构是这样:

source {
FakeSource {
plugin_output = "fake"
row.num = 100
string.template = ["innerQuery"]
schema = {
fields {
name = "string"
c_date = "date"
c_row = {
c_inner_row = {
c_inner_int = "int"
c_inner_string = "string"
c_inner_timestamp = "timestamp"
c_map_1 = "map<string, string>"
c_map_2 = "map<string, map<string,string>>"
}
c_string = "string"
}
}
}
}
}

那么下列所有的查询表达式都是有效的

select 
name,
c_date,
c_row,
c_row.c_inner_row,
c_row.c_string,
c_row.c_inner_row.c_inner_int,
c_row.c_inner_row.c_inner_string,
c_row.c_inner_row.c_inner_timestamp,
c_row.c_inner_row.c_map_1,
c_row.c_inner_row.c_map_1.some_key

但是这个查询语句是无效的

select 
c_row.c_inner_row.c_map_2.some_key.inner_map_key

当查询map结构时,map结构应该为最后一个数据结构,不能查询嵌套map

作业配置示例

env {
job.mode = "BATCH"
}

source {
FakeSource {
plugin_output = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
}
}

transform {
Sql {
plugin_input = "fake"
plugin_output = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from dual where id>0"
}
}

sink {
Console {
plugin_input = "fake1"
}
}

更新日志

新版本

  • 添加SQL转换连接器