跳到主要内容
版本:Next

Schema 特性简介

为什么我们需要Schema

某些NoSQL数据库或消息队列没有严格限制schema,因此无法通过api获取schema。 这时需要定义一个schema来转换为TableSchema并获取数据。

SchemaOptions

我们可以使用SchemaOptions定义schema, SchemaOptions包含了一些定义schema的配置。 例如:columns, primaryKey, constraintKeys。

schema = {
table = "database.schema.table"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}

constraintKeys {
...
}
}

table

schema所属的表标识符的表全名,包含数据库、schema、表名。 例如 database.schema.tabledatabase.tabletable

schema_first

默认是false。

如果schema_first是true, schema会优先使用, 这意味着如果我们设置 table = "a.b", a 会被解析为schema而不是数据库, 那么我们可以支持写入 table = "schema.table".

comment

schema所属的 CatalogTable 的注释。

Columns

Columns 是用于定义模式中的列的配置列表,每列可以包含名称(name)、类型(type)、是否可空(nullable)、默认值(defaultValue)、注释(comment)字段。

columns = [
{
name = id
type = bigint
nullable = false
columnLength = 20
defaultValue = 0
comment = "primary key id"
}
]
字段是否必须默认值描述
nameYes-列的名称
typeYes-列的数据类型
nullableNotrue列是否可空
columnLengthNo0列的长度,当您需要定义长度时将很有用
columnScaleNo-列的精度,当您需要定义精度时将很有用
defaultValueNonull列的默认值
commentNonull列的注释

目前支持哪些类型

数据类型Java中的值类型描述
stringjava.lang.String字符串
booleanjava.lang.Boolean布尔
tinyintjava.lang.Byte常规-128 至 127 。 0 到 255 无符号*。 指定括号中的最大位数。
smallintjava.lang.Short常规-32768 至 32767。 0 到 65535 无符号*。 指定括号中的最大位数。
intjava.lang.Integer允许从 -2,147,483,648 到 2,147,483,647 的所有数字。
bigintjava.lang.Long允许 -9,223,372,036,854,775,808 和 9,223,372,036,854,775,807 之间的所有数字。
floatjava.lang.Float从-1.79E+308 到 1.79E+308浮点精度数值数据。
doublejava.lang.Double双精度浮点。 处理大多数小数。
decimaljava.math.BigDecimalDouble 类型存储为字符串,允许固定小数点。
nulljava.lang.Voidnull
bytesbyte[]字节。
datejava.time.LocalDate仅存储日期。从0001年1月1日到9999 年 12 月 31 日。
timejava.time.LocalTime仅存储时间。精度为 100 纳秒。
timestampjava.time.LocalDateTime存储不带时区的日期和时间信息,表示事件发生的本地时间。不包含任何偏移量或时区相关信息。
timestamp_tzjava.time.OffsetDateTime存储带有 UTC 偏移量的日期和时间信息,包含本地日期时间和 UTC 偏移量。在处理多时区场景时,可以提供更精确的时间信息。
roworg.apache.seatunnel.api.table.type.SeaTunnelRow行类型,可以嵌套。
mapjava.util.MapMap 是将键映射到值的对象。 键类型包括: int string boolean tinyint smallint bigint float double decimal date time timestamp null , and the value type includes int string boolean tinyint smallint bigint float double decimal date time timestamp null array map row.
arrayValueType[]数组是一种表示元素集合的数据类型。 元素类型包括: int string boolean tinyint smallint bigint float double.

如何声明支持的类型

SeaTunnel 提供了一种简单直接的方式来声明基本类型。基本类型的关键字包括:string, boolean, tinyint, smallint, int, bigint, float, double, date, time, timestamp, 和 null。基本类型的关键字名称可以直接用作类型声明,并且SeaTunnel对类型关键字不区分大小写。 例如,如果您需要声明一个整数类型的字段,您可以简单地将字段定义为int"int"

null 类型声明必须用双引号引起来, 例如:"null"。 这种方法有助于避免与 HOCON 中表示未定义的对象的 null 类型混淆。

声明复杂类型(例如 decimalarraymaprow)时,请注意具体注意事项。

  • 声明decimal类型时,需要设置精度(precision)和小数位数(scale),类型定义遵循“decimal(precision, scale)”格式。 需要强调的是,十进制类型的声明必须用 " 括起来;不能像基本类型一样直接使用类型名称。例如,当声明精度为 10、小数位数为 2 的十进制字段时,您可以指定字段类型为"decimal(10,2)"
  • 声明array类型时,需要指定元素类型,类型定义遵循 array<T> 格式,其中 T 代表元素类型。元素类型包括int,string,boolean,tinyint,smallint,bigint,floatdouble。与十进制类型声明类似,它也用 " 括起来。例如,在声明具有整数数组的字段时,将字段类型指定为 "array<int>"
  • 声明map类型时,需要指定键和值类型。map类型定义遵循map<K,V>格式,其中K表示键类型,V表示值类型。 K可以是任何基本类型和十进制类型,V可以是 SeaTunnel 支持的任何类型。 与之前的类型声明类似,map类型声明必须用双引号引起来。 例如,当声明一个map类型的字段时,键类型为字符串,值类型为整数,则可以将该字段声明为"map<string, int>"
  • 声明row类型时,需要定义一个 HOCON 对象来描述字段及其类型。 字段类型可以是 SeaTunnel 支持的任何类型。 例如,当声明包含整数字段“a”和字符串字段“b”的行类型时,可以将其声明为“{a = int, b = string}”。 将定义作为字符串括在 " 中也是可以接受的,因此 "{a = int, b = string}" 相当于 {a = int, c = string}。由于 HOCON 与 JSON 兼容, "{\"a\":\"int\", \"b\":\"string\"}" 等价于 "{a = int, b = string}"

以下是复杂类型声明的示例:

schema {
fields {
c_decimal = "decimal(10, 2)"
c_array = "array<int>"
c_row = {
c_int = int
c_string = string
c_row = {
c_int = int
}
}
# 在泛型中Hocon风格声明行类型
map0 = "map<string, {c_int = int, c_string = string, c_row = {c_int = int}}>"
# 在泛型中Json风格声明行类型
map1 = "map<string, {\"c_int\":\"int\", \"c_string\":\"string\", \"c_row\":{\"c_int\":\"int\"}}>"
}
}

主键(PrimaryKey)

主键是用于定义模式中主键的配置,它包含name、columns字段。

primaryKey {
name = id
columns = [id]
}
字段是否必须默认值描述
name-主键名称
columns-主键中的列列表

约束键(constraintKeys)

约束键是用于定义模式中约束键的配置列表,它包含constraintName,constraintType,constraintColumns字段。

constraintKeys = [
{
constraintName = "id_index"
constraintType = KEY
constraintColumns = [
{
columnName = "id"
sortType = ASC
}
]
},
]
字段是否必须默认值描述
constraintName-约束键的名称
constraintTypeKEY约束键的类型
constraintColumns-PrimaryKey中的列列表,每列应包含constraintType和sortType,sortType支持ASC和DESC,默认为ASC

目前支持哪些约束类型

约束类型描述
INDEX_KEY
UNIQUE_KEY唯一键

多表Schema

tables_configs = [
{
schema {
table = "database.schema.table1"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}
constraintKeys {
...
}
}
},
{
schema = {
table = "database.schema.table2"
schema_first = false
comment = "comment"
columns = [
...
]
primaryKey {
...
}
constraintKeys {
...
}
}
}
]

如何使用schema

推荐

source {
FakeSource {
parallelism = 2
plugin_output = "fake"
row.num = 16
schema {
table = "FakeDatabase.FakeTable"
columns = [
{
name = id
type = bigint
nullable = false
defaultValue = 0
comment = "primary key id"
},
{
name = name
type = "string"
nullable = true
comment = "name"
},
{
name = age
type = int
nullable = true
comment = "age"
}
]
primaryKey {
name = "id"
columnNames = [id]
}
constraintKeys = [
{
constraintName = "unique_name"
constraintType = UNIQUE_KEY
constraintColumns = [
{
columnName = "name"
sortType = ASC
}
]
},
]
}
}
}

已弃用

如果你只需要定义列,你可以使用字段来定义列,这是一种简单的方式,但将来会被删除。

source {
FakeSource {
parallelism = 2
plugin_output = "fake"
row.num = 16
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

我们什么时候应该使用它,什么时候不应该使用它

如果选项中有schema配置项目,则连接器可以自定义schema。 比如 Fake Pulsar Http 源连接器等。