Intro To Config File
In SeaTunnel, the most important thing is the config file, through which users can customize their own data synchronization requirements to maximize the potential of SeaTunnel. So next, I will introduce you how to configure the config file.
The main format of the config file is hocon
, for more details you can refer to HOCON-GUIDE,
BTW, we also support the json
format, but you should keep in mind that the name of the config file should end with .json
.
We also support the SQL
format, please refer to SQL configuration for more details.
Example
Before you read on, you can find config file examples Here from the binary package's config directory.
Config File Structure
The config file is similar to the below one:
:::warn
The old configuration name source_table_name
/result_table_name
is deprecated, please migrate to the new name plugin_input
/plugin_output
as soon as possible.
:::
hocon
env {
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "fake"
row.num = 100
schema = {
fields {
name = "string"
age = "int"
card = "int"
}
}
}
}
transform {
Filter {
plugin_input = "fake"
plugin_output = "fake1"
fields = [name, card]
}
}
sink {
Clickhouse {
host = "clickhouse:8123"
database = "default"
table = "seatunnel_console"
fields = ["name", "card"]
username = "default"
password = ""
plugin_input = "fake1"
}
}
As you can see, the config file contains several sections: env, source, transform, sink. Different modules have different functions. After you understand these modules, you will see how SeaTunnel works.
env
Used to add some engine optional parameters, no matter which engine (Zeta, Spark or Flink), the corresponding optional parameters should be filled in here.
Note that we have separated the parameters by engine, and for the common parameters, we can configure them as before. For flink and spark engine, the specific configuration rules of their parameters can be referred to JobEnvConfig.
source
Source is used to define where SeaTunnel needs to fetch data, and use the fetched data for the next step.
Multiple sources can be defined at the same time. The supported source can be found
in Source of SeaTunnel. Each source has its own specific parameters to define how to
fetch data, and SeaTunnel also extracts the parameters that each source will use, such as
the plugin_output
parameter, which is used to specify the name of the data generated by the current
source, which is convenient for follow-up used by other modules.
transform
When we have the data source, we may need to further process the data, so we have the transform module. Of course, this uses the word 'may', which means that we can also directly treat the transform as non-existent, directly from source to sink. Like below.
env {
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "fake"
row.num = 100
schema = {
fields {
name = "string"
age = "int"
card = "int"
}
}
}
}
sink {
Clickhouse {
host = "clickhouse:8123"
database = "default"
table = "seatunnel_console"
fields = ["name", "age", "card"]
username = "default"
password = ""
plugin_input = "fake"
}
}
Like source, transform has specific parameters that belong to each module. The supported transform can be found in Transform V2 of SeaTunnel
sink
Our purpose with SeaTunnel is to synchronize data from one place to another, so it is critical to define how and where data is written. With the sink module provided by SeaTunnel, you can complete this operation quickly and efficiently. Sink and source are very similar, but the difference is reading and writing. So please check out Supported Sinks.
Other Information
You will find that when multiple sources and multiple sinks are defined, which data is read by each sink, and
which is the data read by each transform? We introduce two key configurations called plugin_output
and
plugin_input
. Each source module will be configured with a plugin_output
to indicate the name of the
data source generated by the data source, and other transform and sink modules can use plugin_input
to
refer to the corresponding data source name, indicating that I want to read the data for processing. Then
transform, as an intermediate processing module, can use both plugin_output
and plugin_input
configurations at the same time. But you will find that in the above example config, not every module is
configured with these two parameters, because in SeaTunnel, there is a default convention, if these two
parameters are not configured, then the generated data from the last module of the previous node will be used.
This is much more convenient when there is only one source.
Multi-line Support
In hocon
, multiline strings are supported, which allows you to include extended passages of text without worrying about newline characters or special formatting. This is achieved by enclosing the text within triple quotes """
. For example:
var = """
Apache SeaTunnel is a
next-generation high-performance,
distributed, massive data integration tool.
"""
sql = """ select * from "table" """
Json Format Support
Before writing the config file, please make sure that the name of the config file should end with .json
.
{
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"plugin_output": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
{
"plugin_name": "Filter",
"plugin_input": "fake",
"plugin_output": "fake1",
"fields": ["name", "card"]
}
],
"sink": [
{
"plugin_name": "Clickhouse",
"host": "clickhouse:8123",
"database": "default",
"table": "seatunnel_console",
"fields": ["name", "card"],
"username": "default",
"password": "",
"plugin_input": "fake1"
}
]
}
Config Variable Substitution
In a config file, we can define variables and replace them at runtime. However, note that only HOCON format files are supported.
Usage of Variables:
${varName}
: If the variable is not provided, an exception will be thrown.${varName:default}
: If the variable is not provided, the default value will be used. If you set a default value, it should be enclosed in double quotes.${varName:}
: If the variable is not provided, an empty string will be used.
If you do not set the variable value through -i
, you can also pass the value by setting the system environment variables. Variable substitution supports obtaining variable values through environment variables.
For example, you can set the environment variable in the shell script as follows:
export varName="value with space"
Then you can use the variable in the config file.
If you set a variable without a default value in the configuration file but do not pass it during execution, the value of the variable will be retained and the system will not throw an exception. But please ensure that other processes can correctly parse the variable value. For example, ElasticSearch's index needs to support a format like '${xxx}' to dynamically specify the index. If other processes are not supported, the program may not run properly.
Example:
env {
job.mode = "BATCH"
job.name = ${jobName}
parallelism = 2
}
source {
FakeSource {
plugin_output = "${resName:fake_test}_table"
row.num = "${rowNum:50}"
string.template = ${strTemplate}
int.template = [20, 21]
schema = {
fields {
name = "${nameType:string}"
age = ${ageType}
}
}
}
}
transform {
sql {
plugin_input = "${resName:fake_test}_table"
plugin_output = "sql"
query = "select * from ${resName:fake_test}_table where name = '${nameVal}' "
}
}
sink {
Console {
plugin_input = "sql"
username = ${username}
password = ${password}
}
}
In the configuration above, we have defined several variables like ${rowNum}
, ${resName}
. We can replace these parameters using the following shell command:
./bin/seatunnel.sh -c <this_config_file>
-i jobName='this_is_a_job_name'
-i strTemplate=['abc','d~f','hi']
-i ageType=int
-i nameVal=abc
-i username=seatunnel=2.3.1
-i password='$a^b%c.d~e0*9('
-m local
In this case, resName
, rowNum
, and nameType
are not set, so they will take their default values.
The final submitted configuration would be:
env {
job.mode = "BATCH"
job.name = "this_is_a_job_name"
parallelism = 2
}
source {
FakeSource {
plugin_output = "fake_test_table"
row.num = 50
string.template = ['abc','d~f','hi']
int.template = [20, 21]
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
transform {
sql {
plugin_input = "fake_test_table"
plugin_output = "sql"
query = "select * from fake_test_table where name = 'abc' "
}
}
sink {
Console {
plugin_input = "sql"
username = "seatunnel=2.3.1"
password = "$a^b%c.d~e0*9("
}
}
Important Notes:
- If a value contains special characters like
(
, enclose it in single quotes ('
). - If the substitution variable contains double or single quotes (e.g.,
"resName"
or"nameVal"
), you need to include them with the value. - The value cannot contain spaces (
' '
). For example,-i jobName='this is a job name'
will be replaced withjob.name = "this"
. You can use environment variables to pass values with spaces. - For dynamic parameters, you can use the following format:
-i date=$(date +"%Y%m%d")
. - Cannot use specified system reserved characters; they will not be replaced by
-i
, such as:${database_name}
,${schema_name}
,${table_name}
,${schema_full_name}
,${table_full_name}
,${primary_key}
,${unique_key}
,${field_names}
. For details, please refer to Sink Parameter Placeholders.