CatalogTable and Metadata Management
1. Overview
1.1 Problem Background
Data integration requires explicit schema management:
- Schema Definition: How to define and validate table schemas?
- Schema Propagation: How to pass schema through Source → Transform → Sink?
- Schema Evolution: How to handle runtime DDL changes (ADD/DROP columns)?
- Type Mapping: How to map types between different data sources?
- Metadata Completeness: How to capture complete table metadata (constraints, partitions)?
1.2 Design Goals
SeaTunnel's metadata management aims to:
- Type Safety: Explicit schema validation at job submission
- Completeness: Capture all table metadata (columns, constraints, partitions, options)
- Evolution Support: Handle runtime schema changes (DDL synchronization)
- Engine Independence: Schema representation independent of execution engine
- Ease of Use: Simple API for schema creation and transformation
2. Core Concepts
2.1 CatalogTable
Complete representation of a table with all metadata.
public class CatalogTable implements Serializable {
// Table identifier
private final TableIdentifier tableId;
// Schema definition
private final TableSchema tableSchema;
// Table options (connector-specific configuration)
private final Map<String, String> options;
// Partition keys
private final List<String> partitionKeys;
// Comment
private final String comment;
// Catalog name
private final String catalogName;
}
Key Components:
TableIdentifier: Unique table identity (catalog.database[.schema].table)TableSchema: Schema with columns, primary key, constraintsoptions: Connector-specific settings (e.g., Kafka topic, JDBC table name)partitionKeys: Partition columns for partitioned tables
2.2 TableSchema
Schema definition with columns and constraints.
public class TableSchema implements Serializable {
// Column definitions
private final List<Column> columns;
// Primary key
private final PrimaryKey primaryKey;
// Unique/foreign key constraints
private final List<ConstraintKey> constraintKeys;
}
2.3 Column
Column definition with type and constraints.
public class Column implements Serializable {
private final String name;
private final SeaTunnelDataType<?> dataType;
private final String comment;
// Column options
private final Map<String, Object> options;
// Constraints
private final boolean nullable;
private final Object defaultValue;
}
2.4 SeaTunnelDataType
Unified type system across connectors.
Basic Types:
// Numeric
DataTypes.TINYINT()
DataTypes.SMALLINT()
DataTypes.INT()
DataTypes.BIGINT()
DataTypes.FLOAT()
DataTypes.DOUBLE()
DataTypes.DECIMAL(precision, scale)
// String
DataTypes.STRING()
DataTypes.CHAR(length)
DataTypes.VARCHAR(length)
// Binary
DataTypes.BYTES()
// Date/Time
DataTypes.DATE()
DataTypes.TIME()
DataTypes.TIMESTAMP()
// Boolean
DataTypes.BOOLEAN()
Complex Types:
// Array
DataTypes.ARRAY(elementType)
// Map
DataTypes.MAP(keyType, valueType)
// Row (Struct)
DataTypes.ROW(fields)
3. Schema Creation
3.1 Builder Pattern
CatalogTable catalogTable = CatalogTable.of(
TableIdentifier.of("my_catalog", "my_db", "my_table"),
TableSchema.builder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("created_at", DataTypes.TIMESTAMP())
.primaryKey("id")
.build(),
Map.of("connector", "jdbc"),
Collections.emptyList(), // No partitions
"User table"
);
3.2 Column Builder
Column column = Column.builder()
.name("user_id")
.dataType(DataTypes.BIGINT())
.nullable(false)
.defaultValue(0L)
.comment("User identifier")
.build();
3.3 Primary Key and Constraints
TableSchema schema = TableSchema.builder()
.column("id", DataTypes.BIGINT())
.column("email", DataTypes.STRING())
.column("username", DataTypes.STRING())
// Primary key
.primaryKey("id")
// Unique constraint
.constraint(ConstraintKey.of(
ConstraintKey.ConstraintType.UNIQUE_KEY,
"uk_email",
Arrays.asList(
ConstraintKey.ConstraintKeyColumn.of("email", null)
)
))
.build();
4. Schema Propagation
4.1 Source → Transform → Sink Flow
┌──────────────┐
│ Source │
│ │
│ produces │
│ CatalogTable │
└──────┬───────┘
│
▼ (Input Schema)
┌──────────────┐
│ Transform │
│ │
│ modifies │
│ CatalogTable │
└──────┬───────┘
│
▼ (Output Schema)
┌──────────────┐
│ Sink │
│ │
│ validates │
│ CatalogTable │
└──────────────┘
4.2 Source Schema Production
public class JdbcSource implements SeaTunnelSource<...> {
@Override
public List<CatalogTable> getProducedCatalogTables() {
// Read schema from database metadata
DatabaseMetaData metaData = connection.getMetaData();
ResultSet columns = metaData.getColumns(null, schema, table, null);
String database = "...";
// Build schema
TableSchema.Builder builder = TableSchema.builder();
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
int jdbcType = columns.getInt("DATA_TYPE");
SeaTunnelDataType<?> type = JdbcTypeConverter.convert(jdbcType);
builder.column(columnName, type);
}
return Collections.singletonList(
CatalogTable.of(
TableIdentifier.of(catalog, database, schema, table),
builder.build()
)
);
}
}
4.3 Transform Schema Transformation
public class SqlTransform implements SeaTunnelTransform {
@Override
public CatalogTable getProducedCatalogTable() {
CatalogTable inputTable = getInputCatalogTable();
// Parse SQL to infer output schema
// Example: SELECT id, UPPER(name) as name_upper, age FROM input
TableSchema outputSchema = TableSchema.builder()
.column("id", inputTable.getColumn("id").getDataType())
.column("name_upper", DataTypes.STRING()) // Transformed
.column("age", inputTable.getColumn("age").getDataType())
.build();
return inputTable.copy(outputSchema);
}
}
4.4 Sink Schema Validation
public class JdbcSink implements SeaTunnelSink<...> {
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
// Validate input schema matches target table
CatalogTable inputTable = getInputCatalogTable();
CatalogTable targetTable = readTargetTableSchema();
// Check column compatibility
for (Column inputColumn : inputTable.getColumns()) {
Column targetColumn = targetTable.getColumn(inputColumn.getName());
if (targetColumn == null) {
throw new SchemaException("Column not found: " + inputColumn.getName());
}
if (!isCompatible(inputColumn.getDataType(), targetColumn.getDataType())) {
throw new SchemaException("Incompatible types for " + inputColumn.getName());
}
}
return Optional.of(targetTable);
}
}
5. Schema Evolution
5.1 SchemaChangeEvent
Represents DDL changes captured by CDC sources.
public abstract class SchemaChangeEvent implements Serializable {
private final TableIdentifier tableId;
}
public class AlterTableAddColumnEvent extends SchemaChangeEvent {
private final Column column;
}
public class AlterTableDropColumnEvent extends SchemaChangeEvent {
private final String columnName;
}
public class AlterTableModifyColumnEvent extends SchemaChangeEvent {
private final Column column;
}
5.2 CDC Source Schema Evolution
public class MysqlCDCSource {
private void handleDDL(String ddl) {
// Parse DDL statement
if (ddl.contains("ADD COLUMN")) {
Column newColumn = parseDDL(ddl);
// Create schema change event
SchemaChangeEvent event = new AlterTableAddColumnEvent(
tableId,
newColumn
);
// Emit event downstream
collector.collect(event);
}
}
}
5.3 Transform Schema Evolution Mapping
public class SqlTransform {
@Override
public SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent event) {
if (event instanceof AlterTableAddColumnEvent) {
AlterTableAddColumnEvent addEvent = (AlterTableAddColumnEvent) event;
// Map column through transform logic
Column transformedColumn = transformColumn(addEvent.getColumn());
return new AlterTableAddColumnEvent(
event.getTableId(),
transformedColumn
);
}
return event; // Pass through
}
}
5.4 Sink Schema Evolution Application
public class JdbcSink {
private void applySchemaChange(SchemaChangeEvent event) {
if (event instanceof AlterTableAddColumnEvent) {
AlterTableAddColumnEvent addEvent = (AlterTableAddColumnEvent) event;
Column column = addEvent.getColumn();
// Generate DDL
String ddl = String.format(
"ALTER TABLE %s ADD COLUMN %s %s",
event.getTableId().getTableName(),
column.getName(),
toSqlType(column.getDataType())
);
// Execute DDL
statement.execute(ddl);
LOG.info("Applied schema change: {}", ddl);
}
}
}
6. Type Mapping
6.1 JDBC Type Mapping
public class JdbcTypeConverter {
public static SeaTunnelDataType<?> convert(int jdbcType) {
switch (jdbcType) {
case Types.TINYINT:
return DataTypes.TINYINT();
case Types.SMALLINT:
return DataTypes.SMALLINT();
case Types.INTEGER:
return DataTypes.INT();
case Types.BIGINT:
return DataTypes.BIGINT();
case Types.FLOAT:
case Types.REAL:
return DataTypes.FLOAT();
case Types.DOUBLE:
return DataTypes.DOUBLE();
case Types.DECIMAL:
case Types.NUMERIC:
return DataTypes.DECIMAL(precision, scale);
case Types.CHAR:
return DataTypes.CHAR(length);
case Types.VARCHAR:
return DataTypes.VARCHAR(length);
case Types.LONGVARCHAR:
return DataTypes.STRING();
case Types.DATE:
return DataTypes.DATE();
case Types.TIME:
return DataTypes.TIME();
case Types.TIMESTAMP:
return DataTypes.TIMESTAMP();
case Types.BOOLEAN:
return DataTypes.BOOLEAN();
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
return DataTypes.BYTES();
default:
throw new UnsupportedTypeException("Unsupported JDBC type: " + jdbcType);
}
}
}
6.2 Kafka (Avro) Type Mapping
public class AvroTypeConverter {
public static SeaTunnelDataType<?> convert(Schema avroSchema) {
switch (avroSchema.getType()) {
case INT:
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
case FLOAT:
return DataTypes.FLOAT();
case DOUBLE:
return DataTypes.DOUBLE();
case BOOLEAN:
return DataTypes.BOOLEAN();
case STRING:
return DataTypes.STRING();
case BYTES:
return DataTypes.BYTES();
case ARRAY:
return DataTypes.ARRAY(convert(avroSchema.getElementType()));
case MAP:
return DataTypes.MAP(
DataTypes.STRING(),
convert(avroSchema.getValueType())
);
case RECORD:
// Convert to ROW type
List<TableSchema.Column> fields = new ArrayList<>();
for (Schema.Field field : avroSchema.getFields()) {
fields.add(new Column(
field.name(),
convert(field.schema())
));
}
return DataTypes.ROW(fields);
default:
throw new UnsupportedTypeException("Unsupported Avro type: " + avroSchema.getType());
}
}
}
7. Partitioned Tables
7.1 Partition Definition
CatalogTable catalogTable = CatalogTable.of(
tableId,
schema,
options,
Arrays.asList("year", "month", "day"), // Partition keys
comment
);
7.2 Partition-Aware Source
public class HiveSource {
@Override
public CatalogTable getProducedCatalogTable() {
// Read Hive table metadata
Table hiveTable = hiveMetastore.getTable(dbName, tableName);
// Extract partition keys
List<String> partitionKeys = hiveTable.getPartitionKeys().stream()
.map(FieldSchema::getName)
.collect(Collectors.toList());
return CatalogTable.of(
tableId,
schema,
options,
partitionKeys,
comment
);
}
}
7.3 Partition-Aware Sink
public class IcebergSink {
private void write(SeaTunnelRow row, CatalogTable table) {
// Extract partition values from row
Map<String, Object> partitionValues = new HashMap<>();
for (String partitionKey : table.getPartitionKeys()) {
int index = table.getSchema().indexOf(partitionKey);
partitionValues.put(partitionKey, row.getField(index));
}
// Write to correct partition
PartitionSpec spec = PartitionSpec.builderFor(schema)
.identity("year")
.identity("month")
.identity("day")
.build();
DataFile dataFile = writeToPartition(partitionValues, row);
icebergTable.newAppend().appendFile(dataFile).commit();
}
}
8. Best Practices
8.1 Schema Definition
Prefer Explicit Schema:
// ✅ GOOD: Explicit schema
TableSchema schema = TableSchema.builder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.build();
// ❌ BAD: Implicit schema (relies on inference)
// Schema inferred from first row - risky!
Use Appropriate Types:
// ✅ GOOD: Use specific types
.column("price", DataTypes.DECIMAL(10, 2))
.column("created_at", DataTypes.TIMESTAMP())
// ❌ BAD: Overly generic types
.column("price", DataTypes.STRING()) // Should be DECIMAL
.column("created_at", DataTypes.STRING()) // Should be TIMESTAMP
8.2 Schema Validation
Validate Early:
// In Source
@Override
public void open() {
CatalogTable catalogTable = getProducedCatalogTables().get(0);
validateSchema(catalogTable); // Fail fast
}
// In Sink
@Override
public void open() {
CatalogTable inputTable = getInputCatalogTable();
CatalogTable targetTable = getWriteCatalogTable().orElseThrow(IllegalStateException::new);
validateCompatibility(inputTable, targetTable); // Fail fast
}
8.3 Type Compatibility
Type Widening (Safe):
// INT → BIGINT (safe)
// FLOAT → DOUBLE (safe)
// VARCHAR(10) → VARCHAR(20) (safe)
Type Narrowing (Unsafe):
// BIGINT → INT (may overflow)
// DOUBLE → FLOAT (precision loss)
// VARCHAR(20) → VARCHAR(10) (truncation)
9. Configuration
9.1 Schema Override
source {
JDBC {
url = "..."
query = "SELECT * FROM users"
# Override inferred schema
schema {
fields {
id = "BIGINT"
name = "STRING"
age = "INT"
}
}
}
}
9.2 Schema Evolution Control
sink {
JDBC {
url = "..."
# Schema evolution options
schema-evolution {
enabled = true
auto-create-table = true
auto-add-column = true
auto-drop-column = false # Dangerous!
}
}
}