贡献 Transform 指南
本文描述了如何理解、开发和贡献一个 transform。
我们也提供了 transform e2e test 来验证 transform 的数据输入和输出。
概念
在 SeaTunnel 中你可以通过 connector 读写数据, 但如果你需要在读取数据后或者写入数据前处理数据, 你需要使用 transform。
使用 transform 可以简单修改数据行和字段, 例如拆分字段、修改字段的值或者删除字段。
类型转换
Transform 从上游(source 或者 transform)获取类型输入,然后给下游(sink 或者 transform)输出新的类型,这个过程就是类型转换。
案例 1:删除字段
| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |
| A         | B         |
|-----------|-----------|
| STRING    | INT       |
案例 2:字段排序
| B         | C         | A         |
|-----------|-----------|-----------|
| INT       | BOOLEAN   | STRING    |
| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |
案例 3:修改字段类型
| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |
| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | STRING    | STRING    |
案例 4:添加新的字段
| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |
| A         | B         | C         | D         |
|-----------|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   | DOUBLE    |
数据转换
转换类型后,Transform 会从上游(source 或者 transform)获取数据行, 使用新的数据类型编辑数据后输出到下游(sink 或者 transform)。这个过程叫数据转换。
翻译
Transform 已经从 execution engine 中解耦, 任何 transform 实现可以不需要修改和配置的适用所有引擎, 这就需要翻译层来做 transform 和 execution engine 的适配。
案例:翻译数据类型和数据
原始数据:
| A         | B         | C         |
|-----------|-----------|-----------|
| STRING    | INT       | BOOLEAN   |
类型转换:
| A                 | B                 | C                 |
|-------------------|-------------------|-------------------|
| ENGINE<STRING>    | ENGINE<INT>       | ENGINE<BOOLEAN>   |
数据转换:
| A                 | B                 | C                 |
|-------------------|-------------------|-------------------|
| ENGINE<"test">    | ENGINE<1>         |  ENGINE<false>    |
核心 APIs
SeaTunnelTransform
SeaTunnelTransform 提供了所有主要的 API, 你可以继承它实现任何转换。
- 从上游获取数据类型。
/**
 * Set the data type info of input data.
 *
 * @param inputDataType The data type info of upstream input.
 */
 void setTypeInfo(SeaTunnelDataType<T> inputDataType);
- 输出新的数据类型给下游。
/**
 * Get the data type of the records produced by this transform.
 *
 * @return Produced data type.
 */
SeaTunnelDataType<T> getProducedType();
- 修改输入数据并且输出新的数据到下游。
/**
 * Transform input data to {@link this#getProducedType()} types data.
 *
 * @param row the data need be transform.
 * @return transformed data.
 */
T map(T row);
SingleFieldOutputTransform
SingleFieldOutputTransform 抽象了一个单字段修改操作
- 定义输出字段
/**
 * Outputs new field
 *
 * @return
 */
protected abstract String getOutputFieldName();
- 定义输出字段类型
/**
 * Outputs new field datatype
 *
 * @return
 */
protected abstract SeaTunnelDataType getOutputFieldDataType();
- 定义输出字段值
/**
 * Outputs new field value
 * 
 * @param inputRow The inputRow of upstream input.
 * @return
 */
protected abstract Object getOutputFieldValue(SeaTunnelRowAccessor inputRow);
MultipleFieldOutputTransform
MultipleFieldOutputTransform 抽象了多字段修改操作
- 定义多个输出的字段
/**
 * Outputs new fields
 *
 * @return
 */
protected abstract String[] getOutputFieldNames();
- 定义输出字段的类型
/**
 * Outputs new fields datatype
 *
 * @return
 */
protected abstract SeaTunnelDataType[] getOutputFieldDataTypes();
- 定义输出字段的值
/**
 * Outputs new fields value
 *
 * @param inputRow The inputRow of upstream input.
 * @return
 */
protected abstract Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow);
AbstractSeaTunnelTransform
AbstractSeaTunnelTransform 抽象了数据类型和字段的修改操作
- 转换输入的行类型到新的行类型
/**
 * Outputs transformed row type.
 *
 * @param inputRowType upstream input row type
 * @return
 */
protected abstract SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType);
- 转换输入的行数据到新的行数据
/**
 * Outputs transformed row data.
 * 
 * @param inputRow upstream input row data
 * @return
 */
protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow);
开发一个 Transform
Transform 必须实现下面其中一个 API:
- SeaTunnelTransform
- AbstractSeaTunnelTransform
- SingleFieldOutputTransform
- MultipleFieldOutputTransform
将实现类放入模块 seatunnel-transforms-v2。
案例: 拷贝字段到一个新的字段
@AutoService(SeaTunnelTransform.class)
public class CopyFieldTransform extends SingleFieldOutputTransform {
    private String srcField;
    private int srcFieldIndex;
    private SeaTunnelDataType srcFieldDataType;
    private String destField;
    @Override
    public String getPluginName() {
        return "Copy";
    }
    @Override
    protected void setConfig(Config pluginConfig) {
        this.srcField = pluginConfig.getString("src_field");
        this.destField = pluginConfig.getString("dest_fields");
    }
    @Override
    protected void setInputRowType(SeaTunnelRowType inputRowType) {
        srcFieldIndex = inputRowType.indexOf(srcField);
        srcFieldDataType = inputRowType.getFieldType(srcFieldIndex);
    }
    @Override
    protected String getOutputFieldName() {
        return destField;
    }
    @Override
    protected SeaTunnelDataType getOutputFieldDataType() {
        return srcFieldDataType;
    }
    @Override
    protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
        return inputRow.getField(srcFieldIndex);
    }
}
- getPluginName方法用来定义 transform 的名字。
- @AutoService 注解用来自动生成 META-INF/services/org.apache.seatunnel.api.transform.SeaTunnelTransform文件
- setConfig方法用来注入用户配置。
Transform 测试工具
当你添加了一个新的插件, 推荐添加一个 e2e 测试用例来测试。
我们有 seatunnel-e2e/seatunnel-transforms-v2-e2e 来帮助你实现。
例如, 如果你想要添加一个 CopyFieldTransform 的测试用例, 你可以在 seatunnel-e2e/seatunnel-transforms-v2-e2e
模块中添加一个新的测试用例, 并且在用例中继承 TestSuiteBase 类。
public class TestCopyFieldTransformIT extends TestSuiteBase {
    @TestTemplate
    public void testCopyFieldTransform(TestContainer container) {
        Container.ExecResult execResult = container.executeJob("/copy_transform.conf");
        Assertions.assertEquals(0, execResult.getExitCode());
    }
}
一旦你的测试用例实现了 TestSuiteBase 接口, 并且添加 @TestTemplate 注解,它会在所有引擎运行作业,你只需要用你自己的 SeaTunnel 配置文件执行 executeJob 方法,
它会提交 SeaTunnel 作业。