Assert
Assert 数据接收器
描述
Assert 数据接收器是一个用于断言数据是否符合用户定义规则的数据接收器。用户可以通过配置规则来断言数据是否符合预期,如果数据不符合规则,将会抛出异常。
核心特性
配置
| Name | Type | Required | Default | 
|---|---|---|---|
| rules | ConfigMap | yes | - | 
| rules.field_rules | string | yes | - | 
| rules.field_rules.field_name | string|ConfigMap | yes | - | 
| rules.field_rules.field_type | string | no | - | 
| rules.field_rules.field_value | ConfigList | no | - | 
| rules.field_rules.field_value.rule_type | string | no | - | 
| rules.field_rules.field_value.rule_value | numeric | no | - | 
| rules.field_rules.field_value.equals_to | boolean|numeric|string|ConfigList|ConfigMap | no | - | 
| rules.row_rules | string | yes | - | 
| rules.row_rules.rule_type | string | no | - | 
| rules.row_rules.rule_value | string | no | - | 
| rules.catalog_table_rule | ConfigMap | no | - | 
| rules.catalog_table_rule.primary_key_rule | ConfigMap | no | - | 
| rules.catalog_table_rule.primary_key_rule.primary_key_name | string | no | - | 
| rules.catalog_table_rule.primary_key_rule.primary_key_columns | ConfigList | no | - | 
| rules.catalog_table_rule.constraint_key_rule | ConfigList | no | - | 
| rules.catalog_table_rule.constraint_key_rule.constraint_key_name | string | no | - | 
| rules.catalog_table_rule.constraint_key_rule.constraint_key_type | string | no | - | 
| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns | ConfigList | no | - | 
| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name | string | no | - | 
| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type | string | no | - | 
| rules.catalog_table_rule.column_rule | ConfigList | no | - | 
| rules.catalog_table_rule.column_rule.name | string | no | - | 
| rules.catalog_table_rule.column_rule.type | string | no | - | 
| rules.catalog_table_rule.column_rule.column_length | int | no | - | 
| rules.catalog_table_rule.column_rule.nullable | boolean | no | - | 
| rules.catalog_table_rule.column_rule.default_value | string | no | - | 
| rules.catalog_table_rule.column_rule.comment | comment | no | - | 
| rules.table-names | ConfigList | no | - | 
| rules.tables_configs | ConfigList | no | - | 
| rules.tables_configs.table_path | String | no | - | 
| common-options | no | - | 
rules [ConfigMap]
规则定义用户可用数据的规则。每个规则代表一个字段验证或行数量验证。
field_rules [ConfigList]
字段规则用于字段验证
field_name [string]
字段名
field_type [string | ConfigMap]
字段类型。字段类型应符合此指南。
field_value [ConfigList]
字段值规则定义数据值验证
rule_type [string]
规则类型。目前支持以下规则
- NOT_NULL 
值不能为空 - NULL 
值可以为空 - MIN 
定义数据的最小值 - MAX 
定义数据的最大值 - MIN_LENGTH 
定义字符串数据的最小长度 - MAX_LENGTH 
定义字符串数据的最大长度 - MIN_ROW 
定义最小行数 - MAX_ROW 
定义最大行数 
rule_value [numeric]
与规则类型相关的值。当rule_type为MIN、MAX、MIN_LENGTH、MAX_LENGTH、MIN_ROW或MAX_ROW时,用户需要为rule_value分配一个值。
equals_to [boolean | numeric | string | ConfigList | ConfigMap]
equals_to用于比较字段值是否等于配置的预期值。用户可以将所有类型的值分配给equals_to。这些类型在这里有详细说明。
例如,如果一个字段是一个包含三个字段的行,行类型的声明是{a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = string}},用户可以将值[["a", "b"], { k0 = 9999.99, k1 = 111.11 }, [123, "abcd"]]分配给equals_to。
定义字段值的方式与FakeSource一致。
equals_to不能应用于null类型字段。但是,用户可以使用规则类型NULL进行验证,例如{rule_type = NULL}。
catalog_table_rule [ConfigMap]
catalog_table_rule用于断言Catalog表是否与用户定义的表相同。
table-names [ConfigList]
用于断言表是否在数据中。
tables_configs [ConfigList]
用于断言多个表是否在数据中。
table_path [String]
表的路径。
common options
Sink 插件的通用参数,请参考 Sink Common Options 了解详情
示例
简单
整个Config遵循hocon风格
Assert {
    rules =
      {
        row_rules = [
          {
            rule_type = MAX_ROW
            rule_value = 10
          },
          {
            rule_type = MIN_ROW
            rule_value = 5
          }
        ],
        field_rules = [{
          field_name = name
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            },
            {
              rule_type = MIN_LENGTH
              rule_value = 5
            },
            {
              rule_type = MAX_LENGTH
              rule_value = 10
            }
          ]
        }, {
          field_name = age
          field_type = int
          field_value = [
            {
              rule_type = NOT_NULL
              equals_to = 23
            },
            {
              rule_type = MIN
              rule_value = 32767
            },
            {
              rule_type = MAX
              rule_value = 2147483647
            }
          ]
        }
        ]
        catalog_table_rule {
            primary_key_rule = {
                primary_key_name = "primary key"
                primary_key_columns = ["id"]
            }
            constraint_key_rule = [
                        {
                        constraint_key_name = "unique_name"
                        constraint_key_type = UNIQUE_KEY
                        constraint_key_columns = [
                            {
                                constraint_key_column_name = "id"
                                constraint_key_sort_type = ASC
                            }
                        ]
                        }
            ]
            column_rule = [
               {
                name = "id"
                type = bigint
               },
              {
                name = "name"
                type = string
              },
              {
                name = "age"
                type = int
              }
            ]
        }
      }
  }
复杂
这里有一个更复杂的例子,涉及到equals_to。
source {
  FakeSource {
    row.num = 1
    schema = {
      fields {
        c_null = "null"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_date = date
        c_timestamp = timestamp
        c_time = time
        c_bytes = bytes
        c_array = "array<int>"
        c_map = "map<time, string>"
        c_map_nest = "map<string, {c_int = int, c_string = string}>"
        c_row = {
          c_null = "null"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_decimal = "decimal(30, 8)"
          c_date = date
          c_timestamp = timestamp
          c_time = time
          c_bytes = bytes
          c_array = "array<int>"
          c_map = "map<string, string>"
        }
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [
          null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
          "bWlJWmo=",
          [0, 1, 2],
          "{ 12:01:26 = v0 }",
          { k1 = [123, "BBB-BB"]},
          [
            null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
            "bWlJWmo=",
            [0, 1, 2],
            { k0 = v0 }
          ]
        ]
      }
    ]
    plugin_output = "fake"
  }
}
sink{
  Assert {
    plugin_input = "fake"
    rules =
      {
        row_rules = [
          {
            rule_type = MAX_ROW
            rule_value = 1
          },
          {
            rule_type = MIN_ROW
            rule_value = 1
          }
        ],
        field_rules = [
            {
                field_name = c_null
                field_type = "null"
                field_value = [
                    {
                        rule_type = NULL
                    }
                ]
            },
            {
                field_name = c_string
                field_type = string
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "AAA"
                    }
                ]
            },
            {
                field_name = c_boolean
                field_type = boolean
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = false
                    }
                ]
            },
            {
                field_name = c_tinyint
                field_type = tinyint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 1
                    }
                ]
            },
            {
                field_name = c_smallint
                field_type = smallint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 1
                    }
                ]
            },
            {
                field_name = c_int
                field_type = int
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 333
                    }
                ]
            },
            {
                field_name = c_bigint
                field_type = bigint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 323232
                    }
                ]
            },
            {
                field_name = c_float
                field_type = float
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 3.1
                    }
                ]
            },
            {
                field_name = c_double
                field_type = double
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 9.33333
                    }
                ]
            },
            {
                field_name = c_decimal
                field_type = "decimal(30, 8)"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 99999.99999999
                    }
                ]
            },
            {
                field_name = c_date
                field_type = date
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "2012-12-21"
                    }
                ]
            },
            {
                field_name = c_timestamp
                field_type = timestamp
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "2012-12-21T12:34:56"
                    }
                ]
            },
            {
                field_name = c_time
                field_type = time
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "12:34:56"
                    }
                ]
            },
            {
                field_name = c_bytes
                field_type = bytes
                field_value = [
                      {
                          rule_type = NOT_NULL
                          equals_to = "bWlJWmo="
                      }
                ]
            },
            {
                field_name = c_array
                field_type = "array<int>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = [0, 1, 2]
                    }
                ]
            },
            {
                field_name = c_map
                field_type = "map<time, string>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "{ 12:01:26 = v0 }"
                    }
                ]
            },
            {
                field_name = c_map_nest
                field_type = "map<string, {c_int = int, c_string = string}>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = { k1 = [123, "BBB-BB"] }
                    }
                ]
            },
            {
                field_name = c_row
                field_type = {
                    c_null = "null"
                    c_string = string
                    c_boolean = boolean
                    c_tinyint = tinyint
                    c_smallint = smallint
                    c_int = int
                    c_bigint = bigint
                    c_float = float
                    c_double = double
                    c_decimal = "decimal(30, 8)"
                    c_date = date
                    c_timestamp = timestamp
                    c_time = time
                    c_bytes = bytes
                    c_array = "array<int>"
                    c_map = "map<string, string>"
                }
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = [
                           null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
                           "bWlJWmo=",
                           [0, 1, 2],
                           { k0 = v0 }
                        ]
                    }
                ]
            }
        ]
    }
  }
}
验证多表
验证多个表
env {
  parallelism = 1
  job.mode = BATCH
}
source {
  FakeSource {
    tables_configs = [
      {
        row.num = 16
        schema {
          table = "test.table1"
          fields {
            c_int = int
            c_bigint = bigint
          }
        }
      },
      {
        row.num = 17
        schema {
          table = "test.table2"
          fields {
            c_string = string
            c_tinyint = tinyint
          }
        }
      }
    ]
  }
}
transform {
}
sink {
  Assert {
    rules =
      {
        tables_configs = [
          {
            table_path = "test.table1"
            row_rules = [
              {
                rule_type = MAX_ROW
                rule_value = 16
              },
              {
                rule_type = MIN_ROW
                rule_value = 16
              }
            ],
            field_rules = [{
              field_name = c_int
              field_type = int
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }, {
              field_name = c_bigint
              field_type = bigint
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }]
          },
          {
            table_path = "test.table2"
            row_rules = [
              {
                rule_type = MAX_ROW
                rule_value = 17
              },
              {
                rule_type = MIN_ROW
                rule_value = 17
              }
            ],
            field_rules = [{
              field_name = c_string
              field_type = string
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }, {
              field_name = c_tinyint
              field_type = tinyint
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }]
          }
        ]
      }
  }
}
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 | 
| [improve] add assert options (#8620) | https://github.com/apache/seatunnel/commit/b159cc0c7 | 2.3.10 | 
| [Feature][API] Support timestamp with timezone offset (#8367) | https://github.com/apache/seatunnel/commit/e18bfeabd | 2.3.9 | 
| [fix][connector-v2][connector-assert] Optimize Assert Sink verification method (#8356) | https://github.com/apache/seatunnel/commit/5c9159d7c | 2.3.9 | 
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 | 
| [Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df4 | 2.3.9 | 
| [Feature][Transform-V2] Support transform with multi-table (#7628) | https://github.com/apache/seatunnel/commit/72c9c4576 | 2.3.9 | 
| [Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d66 | 2.3.9 | 
| [Fix][API] Fix column length can not be long (#8039) | https://github.com/apache/seatunnel/commit/16cf632d3 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Feature][Connector-V2] Assert support multi-table check (#7687) | https://github.com/apache/seatunnel/commit/c4778a249 | 2.3.8 | 
| [Feature][Transform] Add embedding transform (#7534) | https://github.com/apache/seatunnel/commit/3310cfcd3 | 2.3.8 | 
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446 | 2.3.7 | 
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122 | 2.3.6 | 
| [Hotfix] fix http source can not read yyyy-MM-dd HH:mm:ss format bug & Improve DateTime Utils (#6601) | https://github.com/apache/seatunnel/commit/19888e796 | 2.3.5 | 
| [Feature][Connector-V2][Assert] Support field type assert and field value equality assert for full data types (#6275) | https://github.com/apache/seatunnel/commit/576919bfa | 2.3.4 | 
| [Feature][Connector-V2][Assert] Support check the precision and scale of Decimal type. (#6110) | https://github.com/apache/seatunnel/commit/dd64ed52d | 2.3.4 | 
| [Hotfix][SQL Transform] Fix cast to timestamp, date, time bug (#5812) | https://github.com/apache/seatunnel/commit/de181de02 | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 | 
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 | 
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682) | https://github.com/apache/seatunnel/commit/86cba8745 | 2.3.4 | 
| [Fix] Fix log error when multi-table sink close (#5683) | https://github.com/apache/seatunnel/commit/fea4b6f26 | 2.3.4 | 
| Support config tableIdentifier for schema (#5628) | https://github.com/apache/seatunnel/commit/652921fb7 | 2.3.4 | 
[Feature] Add table-names from FakeSource/Assert to produce/assert multi-table (#5604) | https://github.com/apache/seatunnel/commit/2c67cd8f3 | 2.3.4 | 
| [Improve] Remove useless ReadonlyConfig flatten feature (#5612) | https://github.com/apache/seatunnel/commit/243edfef3 | 2.3.4 | 
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 | 
| [Improve][connector-assert]support 'DECIMAL' type and fix 'Number' type precision issue (#5479) | https://github.com/apache/seatunnel/commit/d308e2773 | 2.3.4 | 
| [Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709b | 2.3.4 | 
| [Feature][Transform] Add SimpleSQL transform plugin (#4148) | https://github.com/apache/seatunnel/commit/b914d49ab | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 | 
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 | 
| [Improve][Connector-V2][Assert] Unified exception for assert connector (#3331) | https://github.com/apache/seatunnel/commit/e74c9bc6f | 2.3.0 | 
| [improve][connector] The Factory#factoryIdentifier must be consistent with PluginIdentifierInterface#getPluginName (#3328) | https://github.com/apache/seatunnel/commit/d9519d696 | 2.3.0 | 
| [Improve][Connector-V2] Add Clickhouse and Assert Source/Sink Factory (#3306) | https://github.com/apache/seatunnel/commit/9e4a12838 | 2.3.0 | 
| [Feature][Connector-v2] improve assert sink connector (#2844) | https://github.com/apache/seatunnel/commit/967fec0e9 | 2.3.0-beta | 
| [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755 | 2.2.0-beta | 
| [improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3 | 2.2.0-beta | 
| [checkstyle] Improved validation scope of MagicNumber (#2194) | https://github.com/apache/seatunnel/commit/6d08b5f36 | 2.2.0-beta | 
| [API-DRAFT][MERGE] update license and pom.xml | https://github.com/apache/seatunnel/commit/5ae8865b7 | 2.2.0-beta | 
| add assert sink to Api draft (#2071) | https://github.com/apache/seatunnel/commit/fc640b52b | 2.2.0-beta |