Skip to main content
Version: 2.3.5

SQL

SQL transform plugin

Description​

Use SQL to transform given input row.

SQL transform use memory SQL engine, we can via SQL functions and ability of SQL engine to implement the transform task.

Options​

nametyperequireddefault value
source_table_namestringyes-
result_table_namestringyes-
querystringyes-

source_table_name [string]​

The source table name, the query SQL table name must match this field.

query [string]​

The query SQL, it's a simple SQL supported base function and criteria filter operation. But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like.

the query expression can be select [table_name.]column_a to query the column that named column_a. and the table name is optional.
or select c_row.c_inner_row.column_b to query the inline struct column that named column_b within c_row column and c_inner_row column. In this query expression, can't have table name.

Example​

The data read from source is a table like this:

idnameage
1Joy Ding20
2May Ding21
3Kin Dom24
4Joy Dom22

We use SQL query to transform the source data like this:

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0"
}
}

Then the data in result table fake1 will update to

idnameage
1Joy Ding_21
2May Ding_22
3Kin Dom_25
4Joy Dom_23

Struct query​

if your upstream data schema is like this:

source {
FakeSource {
result_table_name = "fake"
row.num = 100
string.template = ["innerQuery"]
schema = {
fields {
name = "string"
c_date = "date"
c_row = {
c_inner_row = {
c_inner_int = "int"
c_inner_string = "string"
c_inner_timestamp = "timestamp"
c_map_1 = "map<string, string>"
c_map_2 = "map<string, map<string,string>>"
}
c_string = "string"
}
}
}
}
}

Those query all are valid:

select 
name,
c_date,
c_row,
c_row.c_inner_row,
c_row.c_string,
c_row.c_inner_row.c_inner_int,
c_row.c_inner_row.c_inner_string,
c_row.c_inner_row.c_inner_timestamp,
c_row.c_inner_row.c_map_1,
c_row.c_inner_row.c_map_1.some_key

But this query are not valid:

select 
c_row.c_inner_row.c_map_2.some_key.inner_map_key

The map must be the latest struct, can't query the nesting map.

Job Config Example​

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0"
}
}

sink {
Console {
source_table_name = "fake1"
}
}

Changelog​

  • Support struct query

new version​

  • Add SQL Transform Connector