跳到主要内容
版本:2.3.4

Intro to schema feature

Why we need schema

Some NoSQL databases or message queue are not strongly limited schema, so the schema cannot be obtained through the api. At this time, a schema needs to be defined to convert to TableSchema and obtain data.

SchemaOptions

We can use SchemaOptions to define schema, the SchemaOptions contains some config to define the schema. e.g. columns, primaryKey, constraintKeys.

schema = {
table = "database.schema.table"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}

constraintKeys {
...
}
}

table

The table full name of the table identifier which the schema belongs to, it contains database, schema, table name. e.g. database.schema.table, database.table, table.

schema_first

Default is false.

If the schema_first is true, the schema will be used first, this means if we set table = "a.b", a will be parsed as schema rather than database, then we can support write table = "schema.table".

comment

The comment of the CatalogTable which the schema belongs to.

Columns

Columns is a list of config used to define the column in schema, each column can contains name, type, nullable, defaultValue, comment field.

columns = [
{
name = id
type = bigint
nullable = false
columnLength = 20
defaultValue = 0
comment = "primary key id"
}
]
FieldRequiredDefault ValueDescription
nameYes-The name of the column
typeYes-The data type of the column
nullableNotrueIf the column can be nullable
columnLengthNo0The length of the column which will be useful when you need to define the length
defaultValueNonullThe default value of the column
commentNonullThe comment of the column

What type supported at now

Data typeValue type in JavaDescription
stringjava.lang.Stringstring
booleanjava.lang.Booleanboolean
tinyintjava.lang.Byte-128 to 127 regular. 0 to 255 unsigned*. Specify the maximum number of digits in parentheses.
smallintjava.lang.Short-32768 to 32767 General. 0 to 65535 unsigned*. Specify the maximum number of digits in parentheses.
intjava.lang.IntegerAll numbers from -2,147,483,648 to 2,147,483,647 are allowed.
bigintjava.lang.LongAll numbers between -9,223,372,036,854,775,808 and 9,223,372,036,854,775,807 are allowed.
floatjava.lang.FloatFloat-precision numeric data from -1.79E+308 to 1.79E+308.
doublejava.lang.DoubleDouble precision floating point. Handle most decimals.
decimaljava.math.BigDecimalDOUBLE type stored as a string, allowing a fixed decimal point.
nulljava.lang.Voidnull
bytesbyte[]bytes.
datejava.time.LocalDateOnly the date is stored. From January 1, 0001 to December 31, 9999.
timejava.time.LocalTimeOnly store time. Accuracy is 100 nanoseconds.
timestampjava.time.LocalDateTimeStores a unique number that is updated whenever a row is created or modified. timestamp is based on the internal clock and does not correspond to real time. There can only be one timestamp variable per table.
roworg.apache.seatunnel.api.table.type.SeaTunnelRowRow type,can be nested.
mapjava.util.MapA Map is an object that maps keys to values. The key type includes int string boolean tinyint smallint bigint float double decimal date time timestamp null , and the value type includes int string boolean tinyint smallint bigint float double decimal date time timestamp null array map row.
arrayValueType[]A array is a data type that represents a collection of elements. The element type includes int string boolean tinyint smallint bigint float double.

How to declare type supported

SeaTunnel provides a simple and direct way to declare basic types. Basic type keywords include string, boolean, tinyint, smallint, int, bigint, float, double, date, time, timestamp, and null. The keyword names for basic types can be used directly as type declarations, and SeaTunnel is case-insensitive to type keywords. For example, if you need to declare a field with integer type, you can simply define the field as int or "int".

The null type declaration must be enclosed in double quotes, like "null". This approach helps avoid confusion with HOCON's null type which represents undefined object.

When declaring complex types (such as decimal, array, map, and row), pay attention to specific considerations.

  • When declaring a decimal type, precision and scale settings are required, and the type definition follows the format decimal(precision, scale). It's essential to emphasize that the declaration of the decimal type must be enclosed in "; you cannot use the type name directly, as with basic types. For example, when declaring a decimal field with precision 10 and scale 2, you specify the field type as "decimal(10,2)".
  • When declaring an array type, you need to specify the element type, and the type definition follows the format array<T>, where T represents the element type. The element type includes int,string,boolean,tinyint,smallint,bigint,float and double. Similar to the decimal type declaration, it also be enclosed in ". For example, when declaring a field with an array of integers, you specify the field type as "array<int>".
  • When declaring a map type, you need to specify the key and value types. The map type definition follows the format map<K,V>, where K represents the key type and V represents the value type. K can be any basic type and decimal type, and V can be any type supported by SeaTunnel. Similar to previous type declarations, the map type declaration must be enclosed in double quotes. For example, when declaring a field with map type, where the key type is string and the value type is integer, you can declare the field as "map<string, int>".
  • When declaring a row type, you need to define a HOCON object to describe the fields and their types. The field types can be any type supported by SeaTunnel. For example, when declaring a row type containing an integer field a and a string field b, you can declare it as {a = int, b = string}. Enclosing the definition in " as a string is also acceptable, so "{a = int, b = string}" is equivalent to {a = int, c = string}. Since HOCON is compatible with JSON, "{\"a\":\"int\", \"b\":\"string\"}" is equivalent to "{a = int, b = string}".

Here is an example of complex type declarations:

schema {
fields {
c_decimal = "decimal(10, 2)"
c_array = "array<int>"
c_row = {
c_int = int
c_string = string
c_row = {
c_int = int
}
}
# Hocon style declare row type in generic type
map0 = "map<string, {c_int = int, c_string = string, c_row = {c_int = int}}>"
# Json style declare row type in generic type
map1 = "map<string, {\"c_int\":\"int\", \"c_string\":\"string\", \"c_row\":{\"c_int\":\"int\"}}>"
}
}

PrimaryKey

Primary key is a config used to define the primary key in schema, it contains name, columns field.

primaryKey {
name = id
columns = [id]
}
FieldRequiredDefault ValueDescription
nameYes-The name of the primaryKey
columnsYes-The column list in the primaryKey

ConstraintKeys

Constraint keys is a list of config used to define the constraint keys in schema, it contains constraintName, constraintType, constraintColumns field.

constraintKeys = [
{
constraintName = "id_index"
constraintType = KEY
constraintColumns = [
{
columnName = "id"
sortType = ASC
}
]
},
]
FieldRequiredDefault ValueDescription
constraintNameYes-The name of the constraintKey
constraintTypeNoKEYThe type of the constraintKey
constraintColumnsYes-The column list in the primaryKey, each column should contains constraintType and sortType, sortType support ASC and DESC, default is ASC

What constraintType supported at now

ConstraintTypeDescription
INDEX_KEYkey
UNIQUE_KEYunique key

How to use schema

source {
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema {
table = "FakeDatabase.FakeTable"
columns = [
{
name = id
type = bigint
nullable = false
defaultValue = 0
comment = "primary key id"
},
{
name = name
type = "string"
nullable = true
comment = "name"
},
{
name = age
type = int
nullable = true
comment = "age"
}
]
primaryKey {
name = "id"
columnNames = [id]
}
constraintKeys = [
{
constraintName = "unique_name"
constraintType = UNIQUE_KEY
constraintColumns = [
{
columnName = "name"
sortType = ASC
}
]
},
]
}
}
}

Deprecated

If you only need to define the column, you can use fields to define the column, this is a simple way but will be remove in the future.

source {
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

When we should use it or not

If there is a schema configuration project in Options,the connector can then customize the schema. Like Fake Pulsar Http source connector etc.