跳到主要内容
版本:2.3.7

SQL配置文件

SQL配置文件结构

SQL配置文件类似下面这样:

SQL

/* config
env {
parallelism = 1
job.mode = "BATCH"
}
*/

CREATE TABLE source_table WITH (
'connector'='jdbc',
'type'='source',
'url' = 'jdbc:mysql://localhost:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = '123456',
'query' = 'select * from source',
'properties'= '{
useSSL = false,
rewriteBatchedStatements = true
}'
);

CREATE TABLE sink_table WITH (
'connector'='jdbc',
'type'='sink',
'url' = 'jdbc:mysql://localhost:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = '123456',
'generate_sink_sql' = 'true',
'database' = 'seatunnel',
'table' = 'sink'
);

INSERT INTO sink_table SELECT id, name, age, email FROM source_table;

SQL配置文件说明

通用配置

/* config
env {
parallelism = 1
job.mode = "BATCH"
}
*/

SQL文件中通过 /* config */ 注释定义通用配置部分,内部可以使用hocon格式定义通用的配置,如env等。

SOURCE SQL语法

CREATE TABLE source_table WITH (
'connector'='jdbc',
'type'='source',
'url' = 'jdbc:mysql://localhost:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = '123456',
'query' = 'select * from source',
'properties' = '{
useSSL = false,
rewriteBatchedStatements = true
}'
);
  • 使用 CREATE TABLE ... WITH (...) 语法可创建源端表映射, TABLE表名为源端映射的表名,WITH语法中为源端相关的配置参数
  • 在WITH语法中有两个固定参数:connectortype,分别表示连接器插件名(如:jdbcFakeSource等)和源端类型(固定为:source
  • 其它参数名可以参考对应连接器插件的相关配置参数,但是格式需要改为'key' = 'value',的形式
  • 如果'value'为一个子配置,可以直接使用hocon格式的字符串,注意:如果使用hocon格式的子配置,内部的属性项之间必须用,分隔!如:
'properties' = '{
useSSL = false,
rewriteBatchedStatements = true
}'
  • 如果在'value'中使用到',需要用''进行转义,如:
'query' = 'select * from source where name = ''Joy Ding'''

SINK SQL语法

CREATE TABLE sink_table WITH (
'connector'='jdbc',
'type'='sink',
'url' = 'jdbc:mysql://localhost:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = '123456',
'generate_sink_sql' = 'true',
'database' = 'seatunnel',
'table' = 'sink'
);
  • 使用 CREATE TABLE ... WITH (...) 语法可创建目标端表映射, TABLE表名为目标端映射的表名,WITH语法中为目标端相关的配置参数
  • 在WITH语法中有两个固定参数:connectortype,分别表示连接器插件名(如:jdbcconsole等)和目标端类型(固定为:sink
  • 其它参数名可以参考对应连接器插件的相关配置参数,但是格式需要改为'key' = 'value',的形式

INSERT INTO SELECT语法

INSERT INTO sink_table SELECT id, name, age, email FROM source_table;
  • SELECT FROM 部分为源端映射表的表名,SELECT 部分的语法参考:SQL-transform query 配置项
  • INSERT INTO 部分为目标端映射表的表名
  • 注意:该语法不支持INSERT 中指定字段,如:INSERT INTO sink_table (id, name, age, email) SELECT id, name, age, email FROM source_table;

INSERT INTO SELECT TABLE语法

INSERT INTO sink_table SELECT source_table;
  • SELECT 部分直接使用源端映射表的表名,表示将源端表的所有数据插入到目标端表中
  • 使用该语法不会生成trasform的相关配置,这种语法一般用在多表同步的场景,示例:
CREATE TABLE source_table WITH (
'connector'='jdbc',
'type' = 'source',
'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = '123456',
'table_list' = '[
{
table_path = "source.table1"
},
{
table_path = "source.table2",
query = "select * from source.table2"
}
]'
);

CREATE TABLE sink_table WITH (
'connector'='jdbc',
'type' = 'sink',
'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = '123456',
'generate_sink_sql' = 'true',
'database' = 'sink'
);

INSERT INTO sink_table SELECT source_table;

CREATE TABLE AS语法

CREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table;
  • 该语法可以将一个SELECT查询结果作为一个临时表,用于的INSERT INTO操作
  • SELECT 部分的语法参考:SQL Transform query 配置项
CREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table;

INSERT INTO sink_table SELECT * FROM temp1;

SQL配置文件任务提交示例

./bin/seatunnel.sh --config ./config/sample.sql