SQL
SQL 转换插件
描述
使用 SQL 来转换给定的输入行。
SQL 转换使用内存中的 SQL 引擎,我们可以通过 SQL 函数和 SQL 引擎的能力来实现转换任务。
属性
名称 | 类型 | 是否必须 | 默认值 |
---|---|---|---|
plugin_input | string | yes | - |
plugin_output | string | yes | - |
query | string | yes | - |
plugin_input [string]
源表名称,查询 SQL 表名称必须与此字段匹配。
query [string]
查询 SQL,它是一个简单的 SQL,支持基本的函数和条件过滤操作。但是,复杂的 SQL 尚不支持,包括:多源表/行连接和聚合操作等。
查询表达式可以是select [table_name.]column_a
,这时会去查询列为column_a
的列,table_name
为可选项
也可以是select c_row.c_inner_row.column_b
,这时会去查询列c_row
下的c_inner_row
的column_b
。嵌套结构查询中,不能存在table_name
示例
源端数据读取的表格如下:
id | name | age |
---|---|---|
1 | Joy Ding | 20 |
2 | May Ding | 21 |
3 | Kin Dom | 24 |
4 | Joy Dom | 22 |
我们使用 SQL 查询来转换源数据,类似这样:
transform {
Sql {
plugin_input = "fake"
plugin_output = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from dual where id>0"
}
}
那么结果表 fake1
中的数据将会更新为:
id | name | age |
---|---|---|
1 | Joy Ding_ | 21 |
2 | May Ding_ | 22 |
3 | Kin Dom_ | 25 |
4 | Joy Dom_ | 23 |
嵌套结构查询
例如你的上游数据结构是这样:
source {
FakeSource {
plugin_output = "fake"
row.num = 100
string.template = ["innerQuery"]
schema = {
fields {
name = "string"
c_date = "date"
c_row = {
c_inner_row = {
c_inner_int = "int"
c_inner_string = "string"
c_inner_timestamp = "timestamp"
c_map_1 = "map<string, string>"
c_map_2 = "map<string, map<string,string>>"
}
c_string = "string"
}
}
}
}
}
那么下列所有的查询表达式都是有效的
select
name,
c_date,
c_row,
c_row.c_inner_row,
c_row.c_string,
c_row.c_inner_row.c_inner_int,
c_row.c_inner_row.c_inner_string,
c_row.c_inner_row.c_inner_timestamp,
c_row.c_inner_row.c_map_1,
c_row.c_inner_row.c_map_1.some_key
但是这个查询语句是无效的
select
c_row.c_inner_row.c_map_2.some_key.inner_map_key
当查询map结构时,map结构应该为最后一个数据结构,不能查询嵌套map
作业配置示例
env {
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
}
}
transform {
Sql {
plugin_input = "fake"
plugin_output = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from dual where id>0"
}
}
sink {
Console {
plugin_input = "fake1"
}
}
更新日志
新版本
- 添加SQL转换连接器