跳到主要内容

· 阅读需 8 分钟

Apache SeaTunnel 元数据缓存

在大规模数据集成场景中,吞吐瓶颈往往不在数据通道本身,而在“元数据路径”上:启动时的 Connector/Jar 加载、运行中的状态管理与恢复、以及初始化阶段对外部系统(如数据库、Hive Metastore)的 Schema/分区查询。任务量一旦上到千级、万级,这些“看似轻量”的动作会被放大成集群级别的压力。

Apache SeaTunnel Engine(Zeta)把一部分高频、可复用且昂贵的元数据下沉到引擎侧进行缓存,并配合分布式存储与自动清理策略,让海量同步任务可以更稳定地并行运行。

SeaTunnel 分布式架构下的元数据流转

为什么“元数据”会成为瓶颈

以“万级小作业”并行启动为例,常见的元数据瓶颈主要来自三类:

  • 类加载与依赖隔离:每个作业独立创建 ClassLoader,会反复加载同一批 Connector 依赖,快速抬升 JVM Metaspace 压力。
  • 状态与恢复信息:Checkpoint、任务状态、历史作业信息等若缺少分层存储与自动清理,会带来内存与 IO 的双重负担。
  • 外部目录/Schema 查询:作业初始化阶段对源端数据库或 Metastore 的频繁请求,容易造成连接拥塞与元数据服务抖动。

下面从三条主线拆解 SeaTunnel 的“元数据缓存”思路与可落地的配置建议。

一:ClassLoader 缓存,降低 Metaspace 压力

当大量作业复用相同的 Source/Sink Connector 时,持续创建和销毁类加载器会带来明显的 Metaspace 抖动,甚至触发溢出。SeaTunnel Engine 提供 classloader-cache-mode,用于复用作业之间的 ClassLoader,减少重复加载和频繁回收的开销。

seatunnel.yaml 中开启(该配置默认开启,若你曾手动关闭可重新启用):

seatunnel:
engine:
classloader-cache-mode: true

适用场景

  • 作业规模大、启动频繁,且 Connector 类型相对有限(复用率高)。
  • JVM Metaspace 频繁增长,或出现与类加载相关的内存告警。

注意点

  • 如果集群长期运行且 Connector 类型非常分散,缓存会增加常驻的类元数据占用;建议结合监控观察 Metaspace 曲线,再决定是否开启或调整作业结构。

二:分布式状态与持久化,保证可恢复与可运营

SeaTunnel Engine 的容错语义基于 Chandy–Lamport Checkpoint 思想。为了兼顾性能与可靠性,它在引擎内部使用 Hazelcast 的分布式数据结构(如 IMap)承载一部分运行态信息,并通过外部存储(共享/分布式存储)完成故障恢复所需的数据落盘。

你通常需要关心三组配置:

1) Checkpoint 触发参数

seatunnel:
engine:
checkpoint:
interval: 300000
timeout: 10000

说明:如果在作业配置文件 env 中配置了 checkpoint.interval/checkpoint.timeout,会优先以作业配置为准。

2) IMap 备份与持久化(建议用于生产集群)

当集群节点数大于 1 时,建议至少配置 backup-count,以降低单点故障导致的内存态信息丢失风险;对于需要“全停全启后自动恢复”的场景,可进一步配置 IMap 外部持久化。

相关细节可参考文档:

  • /docs/seatunnel-engine/deployment
  • /docs/seatunnel-engine/checkpoint-storage

3) 历史作业信息的自动清理

SeaTunnel 将已完成作业的状态、计数器、错误日志等信息存放在 IMap 中。作业越多,累积越快。建议按需配置 history-job-expire-minutes,让过期信息自动淘汰,避免内存长期膨胀(默认 1440 分钟,即 1 天)。

seatunnel:
engine:
history-job-expire-minutes: 1440

三:Catalog/Schema 元数据缓存,减少源端压力

大量作业并行启动时,对外部系统的元数据请求(表结构、分区信息、约束信息等)很容易成为“隐形风暴”。SeaTunnel 在 Connector/Catalog 侧引入缓存与复用思路,尽量把高频查询前置到引擎侧,减少重复的网络往返与服务端解析开销。

  • JDBC 场景:初始化阶段会读取表结构、字段类型、主键等信息,用于校验与分片规划。建议在高并发启动时避免每个作业对同一张表重复拉取全量元数据(可通过作业编排层做批次启动/预热)。
  • Hive 场景:Hive Metastore 往往是共享服务且相对敏感,建议尽量复用 Catalog 实例与已加载的 Database/Table/Partition 信息,并在大规模分区表同步中关注 Metastore 的 QPS 与响应时间。

Flink 的设计重心是长生命周期的流作业与复杂算子状态;Spark 更偏向批处理与作业级 Context 管理。在“万级独立小任务并发”这个目标下,SeaTunnel Engine 的策略更强调把可复用的启动与运行元数据沉到引擎层:减少重复加载、减少重复查询、并对历史信息进行可控的生命周期管理,从而提升并发启动与稳定性。

生产落地建议

  • 启用合理备份:生产集群建议 backup-count >= 1,并评估是否需要 IMap 外部持久化以支持全停全启自动恢复。
  • 收敛 Connector 类型:尽量在同一集群里控制 Connector 组合的离散程度,让 classloader-cache-mode 的收益最大化。
  • 关注“元数据指标”:除了 JVM 指标,建议关注 Checkpoint 延迟/失败率、Hazelcast 内存使用、IMap 大小与增长速率、历史作业累积速度等。
  • 配置过期策略:根据排障与审计需求设置 history-job-expire-minutes,避免“为了可观测性而撑爆内存”。

元数据缓存相关指标示意

· 阅读需 10 分钟

Introduction

With the development of big data technology, the demand for data integration and data stream processing is increasing. Apache SeaTunnel, as an open-source data integration framework, not only supports multiple data sources and targets but also provides flexible APIs to meet various complex business requirements.

This article will deeply analyze the Apache SeaTunnel API to help developers better understand its usage scenarios and implementation methods.

Understanding SeaTunnel from Interface Definitions

From the official diagram, we can see that SeaTunnel defines the following types:

  1. Source API: Used to define data input sources.
  2. Transform API: Used to process and transform data.
  3. Sink API: Used to define data output targets.

Three types of operators

image1.png

So I want to start by looking at Apache SeaTunnel's design philosophy from the interface definitions.

SeaTunnelSource

SeaTunnelSource is the interface definition for data reading. In this interface, it defines how to extract data from a data source.

public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends Serializable>  
extends Serializable,
PluginIdentifierInterface,
SeaTunnelPluginLifeCycle,
SeaTunnelJobAware {
/**
* Returns the type of current Source, whether it is [bounded batch data] or [unbounded stream data]
*/
Boundedness getBoundedness();

/**
* This method will be deprecated in the future
* Catalog will be used to represent data, allowing more metadata to describe the data
*/
default SeaTunnelDataType<T> getProducedType() {
return (SeaTunnelDataType) getProducedCatalogTables().get(0).getSeaTunnelRowType();
}

/**
* SeaTunnel currently supports multi-table reading, so this returns a list structure
* Each catalog contains metadata information about the table being read
*/
default List<CatalogTable> getProducedCatalogTables() {
throw new UnsupportedOperationException(
"getProducedCatalogTables method has not been implemented.");
}

/**
* Create Reader, which is the class that actually reads the data
*/
SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception;

/**
* These two methods are for creating/restoring SplitEnumerator
*/
SourceSplitEnumerator<SplitT, StateT> createEnumerator(
SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception;
SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(
SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState)
throws Exception;

/**
* These two methods are generally not modified, can be overridden if custom serialization is needed
*/
default Serializer<SplitT> getSplitSerializer() {
return new DefaultSerializer<>();
}
default Serializer<StateT> getEnumeratorStateSerializer() {
return new DefaultSerializer<>();
}

}

From this interface, we can see two main methods:

  • createReader
  • createEnumerator

The SourceSplitEnumerator created by the createEnumerator method is responsible for splitting the data extraction tasks. The SourceReader created by the createReader method will perform the actual task reading based on these split tasks.

First, let's look at the code for SourceSplitEnumerator.

SourceSplitEnumerator

public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends AutoCloseable, CheckpointListener {

void open();

void run() throws Exception;

@Override
void close() throws IOException;

void addSplitsBack(List<SplitT> splits, int subtaskId);

int currentUnassignedSplitSize();

void handleSplitRequest(int subtaskId);

void registerReader(int subtaskId);

StateT snapshotState(long checkpointId) throws Exception;

default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}

interface Context<SplitT extends SourceSplit> {

int currentParallelism();

Set<Integer> registeredReaders();

void assignSplit(int subtaskId, List<SplitT> splits);

default void assignSplit(int subtaskId, SplitT split) {
assignSplit(subtaskId, Collections.singletonList(split));
}

void signalNoMoreSplits(int subtask);

void sendEventToSourceReader(int subtaskId, SourceEvent event);

MetricsContext getMetricsContext();

EventListener getEventListener();
}

}

The SourceSplitEnumerator interface defines some methods and an inner class Context. First, let's look at its own methods. We can see there are 3 lifecycle-related methods: open(), run(), and close(). These methods require connectors to handle resource creation or cleanup based on their implementations.

  • registerReader(int subtaskId) method: reader actively registers with the split enumerator
  • handleSplitRequest(int subtaskId): reader actively requests from the split enumerator to get the extraction tasks it will execute.
    (However, looking at the code implementation, this approach is less common; most of the time, the enumerator actively pushes tasks to reader)
  • addSplitsBack(List<SplitT> splits, int subtaskId): When a reader encounters an exception, its running tasks need to be reassigned. At this point, these tasks need to be added back to the queue for reassignment to other nodes for fault tolerance.

In the Context interface definition, there are two key methods:

  • assignSplit(int subtaskId, List<SplitT> splits): split enumerator actively pushes tasks to a specific reader
  • signalNoMoreSplits(int subtaskId): split enumerator tells a specific reader that no more tasks will be assigned to it.

SourceReader

public interface SourceReader<T, SplitT extends SourceSplit>
extends AutoCloseable, CheckpointListener {

void open() throws Exception;

@Override
void close() throws IOException;

void pollNext(Collector<T> output) throws Exception;

List<SplitT> snapshotState(long checkpointId) throws Exception;

void addSplits(List<SplitT> splits);

void handleNoMoreSplits();

default void handleSourceEvent(SourceEvent sourceEvent) {}

interface Context {

int getIndexOfSubtask();

Boundedness getBoundedness();

void signalNoMoreElement();

void sendSplitRequest();

void sendSourceEventToEnumerator(SourceEvent sourceEvent);

MetricsContext getMetricsContext();

EventListener getEventListener();
}

}

In the Reader interface definition, there is also an inner class Context.

Let's look at some of the main methods:

  • pollNext(Collector<T> output): The main method for data extraction. In this method, each connector will implement extracting data from its corresponding data source, converting it to SeaTunnel's internal data structure SeaTunnelRow, and then adding it to the Collector
  • addSplits(List<SplitT> splits): reader's handling after receiving tasks assigned by the split enumerator
  • snapshotState(long checkpointId): This method is called during checkpoint, requiring the reader to record some state for subsequent fault tolerance

Summary with a Diagram

image2.png

Here, a Split represents a task for splitting data source reads. It can be a partition of a Hive table, a partition of Kafka, or a split of a JDBC query statement. In short, the core idea is to split data reading into multiple independent reading tasks that can be assigned to different reader instances for execution, thereby speeding up queries.

For example: In batch processing, data can be split into N parts, allowing data extraction to be executed in parallel to achieve speed improvement.

For stream processing, there are two approaches. One is to split data into limited unbounded data streams (e.g., Kafka splits by partition, one partition per task, each task being an unbounded data stream).

Another approach is to generate infinite bounded data streams (also using Kafka as an example, extracting part of the data from a partition each time, infinitely generating task definitions).

As for how many Splits a data reading task will ultimately be divided into and how to implement the splitting, it's up to each connector's implementation. Each connector can decide based on the actual partitions being read or parameters.

Next, let's look at the code related to Transform!

SeaTunnelTransform

public interface SeaTunnelTransform<T>  
extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware {

default void open() {}

default void setTypeInfo(SeaTunnelDataType<T> inputDataType) {
throw new UnsupportedOperationException("setTypeInfo method is not supported");
}

// Get the data structure after Transform processing
CatalogTable getProducedCatalogTable();

// From this, we can see that Transform in SeaTunnel only supports map operations
// Operations like Join involving multiple data sources are not supported
T map(T row);

default void close() {}
}

In transform, there is one key method T map(T row), which performs map processing on an original data record to get a new data record. The structure of the new data is consistent with getProducedCatalogTable().

This code is based on version 2.3.6. The community is currently working on multi-table read/write functionality for Transform. Currently, only the map operator (one-to-one) is supported. I've seen the community also discussing whether to support the flatMap operator (one-to-many), which would allow data expansion operations during synchronization.

Now let's look at the Sink code!

SeaTunnelSink

public interface SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
extends Serializable,
PluginIdentifierInterface,
SeaTunnelPluginLifeCycle,
SeaTunnelJobAware {

@Deprecated
// These two methods are also marked as deprecated, will use Catalog for representation in the future
default void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
throw new UnsupportedOperationException("setTypeInfo method is not supported");
}
@Deprecated
default SeaTunnelDataType<IN> getConsumedType() {
throw new UnsupportedOperationException("getConsumedType method is not supported");
}

/**
* Create/restore SinkWriter, Writer is the class that actually performs data writing
*/
SinkWriter<IN, CommitInfoT, StateT> createWriter(SinkWriter.Context context) throws IOException;
default SinkWriter<IN, CommitInfoT, StateT> restoreWriter(
SinkWriter.Context context, List<StateT> states) throws IOException {
return createWriter(context);
}

default Optional<Serializer<StateT>> getWriterStateSerializer() {
return Optional.empty();
}

default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException {
return Optional.empty();
}

default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() {
return Optional.empty();
}

default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>
createAggregatedCommitter() throws IOException {
return Optional.empty();
}

default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() {
return Optional.empty();
}

}

In Sink, there are several key methods:

  • createWriter(SinkWriter.Context context): Create Writer instance. Similar to Source, the actual data writing is performed by the Writer.

  • createCommitter(): Optional. When two-phase commit is needed, create a SinkCommitter to complete the two-phase commit. This approach is no longer recommended; it's recommended to use createAggregatedCommitter() for two-phase commit.

  • createAggregatedCommitter(): Optional. Similar to SinkCommitter, used for two-phase commit during the commit phase.

    The difference is that SinkAggregatedCommitter runs as a single instance, not multiple instances, concentrating all commit tasks in one place for execution. So if a connector needs two-phase commit, it's recommended to use createAggregatedCommitter() to create it.

SinkWriter

public interface SinkWriter<T, CommitInfoT, StateT> {  

void write(T element) throws IOException;

default void applySchemaChange(SchemaChangeEvent event) throws IOException {}

Optional<CommitInfoT> prepareCommit() throws IOException;

default List<StateT> snapshotState(long checkpointId) throws IOException {
return Collections.emptyList();
}

void abortPrepare();

void close() throws IOException;

interface Context extends Serializable {

int getIndexOfSubtask();

default int getNumberOfParallelSubtasks() {
return 1;
}

MetricsContext getMetricsContext();

EventListener getEventListener();
}
}

We can see that the structure of SinkWriter is somewhat similar to SourceReader. Let's look at some key methods:

  • write(T element): Implementation of writing to the target database when receiving upstream data;

  • applySchemaChange(SchemaChangeEvent event): How downstream handles when upstream data table structure changes, such as adding/removing fields, modifying field names. But this depends on the specific implementation;

  • prepareCommit(): When two-phase commit is needed, generate information for this commit, which will be handed over to SinkCommitter/SinkAggregatedCommitter for two-phase commit. This method is called during checkpoint, meaning information is committed to the target connector only during each checkpoint;

  • snapshotState(): During checkpoint, store some state of the writer for subsequent fault tolerance;

Summary with a Diagram

image3.png

When data needs to be read from a data source, SourceSplitEnumerator first splits the tasks, then SourceReader executes the split data reading tasks. During reading, raw data needs to be converted to SeaTunnel's internal SeaTunnelRow, then passed downstream to Transform for data transformation. After transformation, it's handed to SinkWriter to write the SeaTunnelRow data to the corresponding connector. If two-phase commit is needed during writing, additional SinkCommitter related classes need to be implemented.

Collector

In the previous sections, we've described the functionality of source, transform, and sink.

But how is data passed between source, transform, and sink? If a source has multiple downstream components, how are messages distributed to all downstream components? This is where the Collector interface comes in.

From the data flow direction, only source to downstream or transform to downstream passes data, so we can see there are two Collector interface definitions, located in the source and transform packages respectively.

First, let's look at the Collector definition in source:

public interface Collector<T> {  

void collect(T record);

default void markSchemaChangeBeforeCheckpoint() {}

default void collect(SchemaChangeEvent event) {}

default void markSchemaChangeAfterCheckpoint() {}

Object getCheckpointLock();

default boolean isEmptyThisPollNext() {
return false;
}

default void resetEmptyThisPollNext() {}
}

The Collector definition in transform is relatively simple:

public interface Collector<T> {     
void collect(T record);
void close();
}

Collector decouples multiple operators. Each operator only needs to focus on how to process data, without worrying about who to send the result data to.

Updating the previous diagram (adding another Transform to show the multi-downstream scenario):

image4.png

· 阅读需 8 分钟

Amazon Aurora DSQL是亚马逊云科技于2024年12月推出的分布式SQL数据库,专为构建扩展性无限、高可用且免基础设施管理的应用程序设计,具有可用性高、无服务器模式架构、兼容性强、容错能力和安全级别高等特点。

由于Aurora DSQL的认证机制与IAM集成, 访问Aurora DSQL数据库需要通过IAM的身份来生成token 进行访问,而token 默认只有15分钟有效期,因此目前一些主流的数据同步工具暂不支持将其他数据库的数据迁移到Aurora DSQL。

基于这种情况,本文作者基于数据同步工具Apache SeaTunnel开发了一个专门针对Aurora DSQL的sink Connector,以满足从其他数据库迁移数据到Aurora DSQL需求。

SeaTunnel 介绍

SeaTunnel是一个非常易用、多模态、超高性能的分布式数据集成平台,专注于数据集成和数据同步,主要旨在解决数据集成领域的常见问题。

SeaTunnel 相关特性

  • 丰富且可扩展的Connector: 目前,SeaTunnel 支持超过 190 个Connector且数量还在增加,像主流数据库MySQL 、Oracle、SQLServer、PostgreSQL等都已经提供了Connector支持。插件式设计让用户可以轻松开发自己的Connector并将其集成到SeaTunnel项目中。
  • 批流集成:基于SeaTunnel Connector API开发的Connector完美兼容离线同步、实时同步、全量同步、增量同步等场景。 它们大大降低了管理数据集成任务的难度。
  • 分布式快照:支持分布式快照算法,保证数据一致性。
  • 多引擎支持:SeaTunnel默认使用SeaTunnel引擎(Zeta)进行数据同步。 SeaTunnel还支持使用Flink或Spark作为Connector的执行引擎,以适应企业现有的技术组件。 SeaTunnel 支持 Spark 和 Flink 的多个版本。
  • JDBC复用、数据库日志多表解析:SeaTunnel支持多表或全库同步,解决了过度JDBC连接的问题; 支持多表或全库日志读取解析,解决了CDC多表同步场景下需要处理日志重复读取解析的问题。
  • 高吞吐量、低延迟:SeaTunnel支持并行读写,提供稳定可靠、高吞吐量、低延迟的数据同步能力。
  • 完善的实时监控:SeaTunnel支持数据同步过程中每一步的详细监控信息,让用户轻松了解同步任务读写的数据数量、数据大小、QPS等信息。

SeaTunnel 工作流程

图一 Seatunnel工作流图

SeaTunnel的工作流程如上图所示,用户配置作业信息并选择提交作业的执行引擎。Source Connector负责并行读取源端数据并将数据发送到下游Transform或直接发送到Sink,Sink将数据写入目的地。

从源码构建SeaTunnel

git clone https://github.com/apache/seatunnel.git
cd seatunnel
sh ./mvnw clean install -DskipTests -Dskip.spotless=true
cp seatunnel-dist/target/apache-seatunnel-${version}-bin.tar.gz /The-Path-You-Want-To-Copy
cd /The-Path-You-Want-To-Copy
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

从源码构建成功后,所有的Connector插件和一些必要的依赖(例如:mysql驱动)都包含在二进制包中。您可以直接使用Connector插件,而无需单独安装它们。

使用Seatunnel同步MySQL数据到Aurora DSQL 配置示例

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 6000
checkpoint.timeout = 1200000
}
source {
MySQL-CDC {
username = "user name"
password = "password"
table-names = ["db.table1"]
url = "jdbc:mysql://dbhost:3306/db?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&connectTimeout=120000&socketTimeout=120000&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
table-names-config = [
{
table = "db.table1"
primaryKeys = ["id"]
}
]
}
}
transform {

}
sink {
Jdbc {
url="jdbc:postgresql://<dsql_endpoint>:5432/postgres"
dialect="dsql"
driver = "org.postgresql.Driver"
username = "admin"
access_key_id = "ACCESSKEYIDEXAMPLE"
secret_access_key = "SECRETACCESSKEYEXAMPLE"
region = "us-east-1"
database = "postgres"
generate_sink_sql = true
primary_keys = ["id"]
max_retries="3"
batch_size =1000
}
}

运行数据同步任务

将上面的配置保存为mysql-to-dsql.conf 文件(请注意需要将示例中的值替换为真实的参数),存放在apache-seatunnel-${version} 的config 目录下,执行以下命令:

cd "apache-seatunnel-${version}"
./bin/seatunnel.sh --config ./config/mysql-to-dsql.conf -m local

图二 数据同步日志信息

命令执行成功后,您可以通过新产生的日志观察任务执行情况,如果出现错误,也可以根据异常信息进行定位,比如数据库连接超时、表不存在情况。而正常情况下,数据会成功写入目标 Aurora DSQL,如上图所示。

总结

Aurora DSQL是一款高度安全、易扩展、无服务器基础设施的分布式数据库,它的认证方式与IAM身份结合,因此目前缺少合适的工具可以将数据同步到Aurora DSQL中,尤其是在实时数据同步方面。SeaTunnel 是一款非常优秀数据集成和数据同步工具,目前支持多种数据源的数据同步,并且基于SeaTunnel 也可以非常灵活地实现自定义的数据同步需求,比如全量同步/增量实时同步。基于这种灵活性,本文作者开发了一种专门针对于Aurora DSQL 的Sink Connector, 以满足对于Aurora DSQL 数据同步需求。

参考文档

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

谭志强,亚马逊云科技迁移解决方案架构师,主要负责企业级客户的上云或跨云迁移工作,具有十几年 IT 专业服务经验,历任程序设计师、项目经理、技术顾问、解决方案架构师。

· 阅读需 9 分钟

This article provides a detailed walkthrough of how to achieve full data synchronization from MySQL to PostgreSQL using Apache SeaTunnel 2.3.9. We cover the complete end-to-end process — from environment setup to production validation. Let’s dive into the MySQL-to-PostgreSQL synchronization scenario.

Version Requirements:

  • MySQL: MySQL 8.3
  • PostgreSQL: PostgreSQL 13.2
  • Apache SeaTunnel: Apache-SeaTunnel-2.3.9

Preliminaries

Verify Version Information

Run the following SQL command to check the version:

-- Check version information 
select version();

Enable Master-Slave Replication

-- View replication-related variables
show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');

For MySQL CDC data synchronization, SeaTunnel needs to read the MySQLbinlogand act as a slave node in the MySQL cluster.

Note: In MySQL 8.0+,binlogis enabled by default, but replication mode must be enabled manually.

-- Enable master-slave replication (execute in sequence)
-- SET GLOBAL gtid_mode=OFF;
-- SET GLOBAL enforce_gtid_consistency=OFF;
SET GLOBAL gtid_mode=OFF_PERMISSIVE;
SET GLOBAL gtid_mode=ON_PERMISSIVE;
SET GLOBAL enforce_gtid_consistency=ON;
SET GLOBAL gtid_mode=ON;

Grant Necessary User Permissions

A user must haveREPLICATION SLAVEandREPLICATION CLIENTprivileges:

-- Grant privileges to the user
CREATE USER 'test'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test';
FLUSH PRIVILEGES;

SeaTunnel Cluster Setup

Cluster Logging

By default, SeaTunnel logs output to a single file. For production, it’s preferable to have separate log files per job. Update the logging configuration inlog4j2.properties

############################ log output to file #############################
# rootLogger.appenderRef.file.ref = fileAppender
# Change log output to use independent log files for each job
rootLogger.appenderRef.file.ref = routingAppender
############################ log output to file #############################

Client Configuration

For production clusters, it is recommended to install SeaTunnel under the/optdirectory and point theSEATUNNEL_HOMEenvironment variable accordingly.

If multiple versions exist, create a symbolic link to align with the server deployment directory:

# Create a symlink
ln -s /opt/apache-seatunnel-2.3.9 /opt/seatunnel
# Set environment variable
export SEATUNNEL_HOME=/opt/seatunnel

Environment Variables Configuration

For Linux servers, add the following lines to/etc/profile.d/seatunnel.sh

echo 'export SEATUNNEL_HOME=/opt/seatunnel' >> /etc/profile.d/seatunnel.sh
echo 'export PATH=$SEATUNNEL_HOME/bin:$PATH' >> /etc/profile.d/seatunnel.sh
source /etc/profile.d/seatunnel.sh

Job Configuration

Note: The configuration below does not cover all options but illustrates common production settings.

env {
job.mode = "STREAMING"
job.name = "DEMO"
parallelism = 3
checkpoint.interval = 30000 # 30 seconds
checkpoint.timeout = 30000 # 30 seconds

job.retry.times = 3
job.retry.interval.seconds = 3 # 3 seconds
}

The first step is setting up theenvmodule, which operates in a streaming mode. Therefore, it’s essential to specify the configuration mode asSTREAMING.

Task Naming and Management

Configuring a task name is crucial for identifying and managing jobs in a production environment. Naming conventions based on database or table names can help with monitoring and administration.

Parallelism Settings

Here, we set the parallelism to3, but this value can be adjusted based on the cluster size and database performance.

Checkpoint Configuration

  • Checkpoint Frequency: Set to 30 seconds. If higher precision is required, this can be reduced to 10 secondsor less.
  • Checkpoint Timeout: If a checkpoint takes too long, the job is considered failed. Set to30 seconds.
  • Automatic Retry: Configured to3 retries, with a retry interval of3 seconds(adjustable based on system requirements).
source {
MySQL-CDC {
base-url = "jdbc:mysql://192.168.8.101:3306/test?serverTimezone=Asia/Shanghai"
username = "test"
password = "123456"

database-names = ["test"]
# table-names = ["test.test_001","test.test_002"]
table-pattern = "test\\.test_.*" # The first dot is a literal character, requiring escaping; the second dot represents any single character.
table-names-config = [
{"table":"test.test_002","primaryKeys":["id"]}
]

startup.mode = "initial" # First sync all historical data, then incremental updates
snapshot.split.size = "8096"
snapshot.fetch.size = "1024"
server-id = "6500-8500"
connect.timeout.ms = 30000
connect.max-retries = 3
connection.pool.size = 20

exactly_once = false # In analytical scenarios, disabling exactly-once consistency allows some duplicates/losses for better performance.
schema-changes.enabled = true # Enable schema evolution to avoid frequent modifications; supports add, rename, drop operations.
}
}

Key MySQL CDC Configurations

  1. Time Zone Configuration: It’s recommended to specify the MySQL connection timezone to prevent discrepancies when extractingdatetimeortimestampdata.
  2. User Credentials
  • The username and password must have replication privileges , allowing access to the bin_log logs.
  • The account should be able to query all tables under the designated databases.

Database & Table Selection

Typically, each database is assigned to a separate task. Here, we specify only thetestdatabase.
Two methods can be used:

  1. Direct table name selection
  2. Regular expression-based table matching(recommended for large datasets or entire database synchronization).

Important:
When using regular expressions, both the database name and table name must be included. The.character, which separates them, must be escaped (\\.).

For example, to match tables prefixed withtest_, we use:

test\\.test_.*
  • The first dot (.) represents a literal separator, requiring escaping (\\.).
  • The second dot (.) representsany single characterin regex.

Additionally, for tables without primary keys, logical primary keys can be specified manually to facilitate data synchronization.

Startup Mode

The default startup mode isinitial, which means:

  1. Full historical data syncfirst
  2. Incremental updatesafterward

Sharding & Fetching

  • The default values for shard size and batch fetch size work well.
  • If the server has higher performance, these values can be increased.

Server ID

  • MySQL requires unique server IDs for replication nodes.
  • Apache SeaTunnel must masquerade as a MySQL replica.
  • If not configured, a default value is used, but manual specification is recommended to avoid conflicts.
  • The server ID range must be greater than the parallelism level, or errors may occur.

Timeouts & Retries

  • Connection Timeout: For large datasets, increase this value accordingly.
  • Auto-Retry Interval: If handling a high volume of tables, consider extending retry intervals.

Exactly-Once Consistency

For CDC-based data synchronization,exactly-once consistency is often not required in analytical scenarios.

  • Disablingit can significantly boost performance.
  • However, if strict consistency is required, it can be enabled at the cost of reduced performance.

Schema Evolution

It’s highly recommended to enable schema evolution, which:

  • Allows automatic table modifications (e.g., adding/removing fields)
  • Reduces the need for manual job updates when the schema changes

However,downstream tasks may fail if they rely on a field that was modified.
Supported schema changes:
✔️ADD COLUMN
✔️DROP COLUMN
✔️RENAME COLUMN
✔️MODIFY COLUMN

Note: Schema evolution does not supportCREATE TABLEorDROP TABLE.

Configuring the Sink (PostgreSQL)

The sink configuration inserts data into PostgreSQL.

sink {
jdbc {
url = "jdbc:postgresql://192.168.8.101:5432/test"
driver = "org.postgresql.Driver"
user = "postgres"
password = "123456"

generate_sink_sql = true
database = "test"
table = "${database_name}.${table_name}"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
# enable_upsert = false
}
}

Key Considerations:

JDBC Connection

  • Specify PostgreSQL driver, user, and password.

Auto SQL Generation

  • Enabling generate_sink_sql lets SeaTunnel automatically create tables and generate INSERT,DELETE, andUPDATEstatements.

Schema Handling

  • PostgreSQL uses Database → Schema → Table, while MySQL has only Database → Table.
  • Ensure the schema is correctly mapped to avoid data mismatches.

User Permissions

  • The PostgreSQL user must have table creation permissions if using auto-schema generation.

For more details, refer to the official documentation:
🔗SeaTunnel MySQL-CDC Connector Docs

Using Placeholders in Sink Configuration

Apache SeaTunnel supports placeholders, which dynamically adjust table names based on the source data.

For example:

table = "${database\_name}.${table\_name}"
  • Ensures each table syncs correctly without manual specification.
  • Supports concatenation and dynamic formatting.

Schema Save Mode and Data Append Strategy

Theschema_save_modeparameter plays a crucial role in database-wide synchronization. It simplifies the process by automatically creating tables in the target database, eliminating the need for manual table creation steps.

Another key configuration isAPPEND_DATA, which is particularly useful when the target database already contains previously synchronized data. This setting prevents the accidental deletion of existing records , making it a safer choice for most scenarios. However, if your use case requires a different approach, you can modify this setting according to the official documentation guidelines.

Enable Upsert for Performance Optimization

Another important parameter isenable_upsert. If you can guarantee that the source data contains no duplicate records , disabling upsert (enable_upsert = false) can significantly enhance synchronization performance . This is because, without upsert, the system does not need to check for existing records before inserting new ones.

However, if there is a possibility of duplicate records in the source data, it is strongly recommended to keep Upsert enabled (enable_upsert = true). This ensures that records are inserted or updated based on their primary key** , preventing duplication issues.

For detailed parameter explanations and further customization options , please refer to the official Apache SeaTunnel documentation.

Task Submission and Monitoring

Once your configuration file is ready, submit the job using the SeaTunnel command-line tool:

./bin/start-seatunnel.sh --config /path/to/config.yaml --async

Key Parameters:

  • --config: Specifies the path to your configuration file.
  • --async: Submits the job asynchronously, allowing the command line to exit while the job continues in the background.

After submission, you can monitor the job via SeaTunnel’s cluster UI. In version 2.3.9, SeaTunnel provides a visual interface where you can view job logs, execution status, and data throughput details.

Data Synchronization Demonstration

For this demonstration, we created two tables (test_001andtest_002) and inserted sample data into MySQL. Using SeaTunnel's synchronization tasks, the data was successfully synchronized to PostgreSQL. The demonstration included insertions, deletions, updates, and even table schema modifications—all of which were reflected in real time on PostgreSQL.

Key Points:

  • Schema Synchronization:
    SeaTunnel supports automatic table schema synchronization. When the source MySQL table structure changes, the target PostgreSQL table automatically updates.
  • Data Consistency:
    SeaTunnel ensures data consistency by accurately synchronizing all insert, delete, and update operations to the target database.

About SeaTunnel

Apache SeaTunnel focuses on data integration and synchronization, addressing common challenges such as:

  • Diverse Data Sources:
    Supporting hundreds of data sources, even as new ones emerge.
  • Complex Sync Scenarios:
    Including full, incremental, CDC, real-time, and whole-database synchronizations.
  • High Resource Demands:
    Traditional tools often require extensive computing or JDBC resources for real-time sync of many small tables.
  • Monitoring and Quality:
    Sync processes can suffer from data loss or duplication, and effective monitoring is essential.
  • Complex Technology Stacks:
    Multiple sync programs may be needed for different systems.
  • Management Challenges:
    Offline and real-time sync are often developed and managed separately, increasing complexity.

· 阅读需 5 分钟

SeaTunnel version 2.3.1 was released recently. This is a high-profile release with many important function updates and optimizations. At the level of programming user experience, the new version improves the stability of SeaTunnel Zeta and CI/CD; at the level of connectors, the new version implements 7+ new connectors and fixes existing commonly used connectors bugs, and improved security. The community refactored multiple underlying base classes and added an important feature, AI Compatible. With the optimized API, users can use ChatGPT 4.0 to quickly build the SaaS Connector they need.

Major Feature update

01 SeaTunnel Zeta

The first version of the data integration engine-SeaTunnel Zeta is introduced in the SeaTunnel 2.3.0 release and has received feedback from numerous community users. In SeaTunnel version 2.3.1, we have fixed all the bugs reported by users, optimized the use of memory and threads, and greatly improved the stability of Zeta.

In version 2.3.1, the community also added several new Zeta features, including a dedicated JVM parameter configuration file, client output of job monitoring information, Rest API for Zeta cluster information and job information, etc.

At the checkpoint level, version 2.3.1 Zeta supports using OSS as checkpoint storage. It also supports savepoint running jobs and resuming jobs from savepoints.

In addition, version 2.3.1 also adds a set of Zeta’s Rest API, which can be used to obtain the list of jobs running on Zeta, the status information of jobs, and the monitoring indicators of Zeta cluster nodes. For specific usage methods, please refer to https:/ /seatunnel.apache.org/docs/seatunnel-engine/rest-api/

02 AI Compatible

In SeaTunnel 2.3.1, the HTTP interface and related APIs are reconstructed, and the SaaS Connector-related API and Connector construction process are simplified according to the existing xGPT level capabilities so that ChatGPT 4.0 can directly generate SaaS Connectors and quickly generate various SaaS Connector interfaces. Under normal circumstances, the results obtained by this method are 95% similar to the code written by open-source contributors (see appendix).

Of course, because ChatGPT4.0 will be updated in October 2021, it is necessary to provide some latest vectorized documents for the latest SaaS interface adaptation to have the latest interface adaptation. However, this refactored API and code framework allows users to generate Connectors more quickly and contribute to the open-source community, making the SeaTunnel interface more powerful.

Connector

01 7+ new connectors

While fixing the bugs of known connectors and optimizing the connectors, the community has added 7 new connectors including SAP HANA, Persistiq, TDEngine, SelectDB Cloud, Hbase, FieldMapper Transform, and SimpleSQL Transform.

02 Reimplement SQL Transform

Since the previous SQL Transform connector was defined based on Flink SQL and Spark SQL, SQL Transform cannot adapt to the execution of multiple engines, so we removed the SQL Transform function in version 2.3.0. In version 2.3.1, we reimplemented SQL Transform. SQL Transform is an API that does not depend on a task-specific execution engine and can perfectly run on three different engines: Flink/Spark/Zeta. Special thanks to contributor Ma Chengyuan (GitHub ID: rewerma) for leading and contributing this important Feature.

For the functions already supported by SQL Transform, please refer to https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions

03 New SQL Server CDC

At the CDC connector level, the community has newly added a SQL Server CDC connector, and made a lot of optimizations to MySQL CDC, improving the stability of MySQL CDC.

04 Added CDC connector to output debezium-json format function

In addition, version 2.3.1 also added the function of the CDC connector to output debezium-json format. Users can use MySQL CDC to read binlog and output data in debezium-json format to Kafka, so that users can create new synchronization tasks to read The data in debezium-json format in Kafka is synchronized to the target data source, or you can directly write other programs to read the data in debezium-json format in Kafka to perform some indicator calculations.

Safety

Before version 2.3.1, users need to configure the database username, password, and other information in plain text in the config file, which may cause some security problems. In version 2.3.1, we added the configuration file encryption function, and users can fill in the encrypted database username, password, and other information in the config file. When the job is running, SeaTunnel will decrypt the content in the config file based on the default encryption and decryption algorithm. At the same time, the encryption function provides SPI, by which users can customize the parameter list of encryption and decryption and the algorithm of encryption and decryption based on their own needs.

For how to use this function, please refer to https://seatunnel.apache.org/docs/2.3.1/connector-v2/Config-Encryption-Decryption

Third-party engine support

SeaTunnel version 2.3.1 supports Spark version 3.3, as well as Flink 1.14.6, Flink 1.15, Flink 1.16, and other versions, basically covering the mainstream versions of Spark and Flink.

Client

The new version introduces an SPI for job configuration. Previously, only hocon json configuration files were supported. Now SPI is opened to the users to customize the format of job configuration files to meet different business system integration requirements.

Optimization

SeaTunnel 2.1.3 version has made many important optimizations, including changes in core components, connector components, CI/CD, Zeta(ST-Engine), and E2E components, involving updating new functions, improving existing functions, and optimizing tests and deployment processes. Some notable changes include adding parallelism and column projection interfaces in Core API, introducing MySQL-CDC source factory in Connector-V2 and supporting only-once semantics for JDBC source connectors, improving CI/CD process and stability for E2E In Zeta (ST-Engine), the logic of restarting the job when all nodes are down is added, and the timeout period for writing data is configurable.

For a detailed list, see the Release Note [Improve] section.

Document

In addition, the new version also has a series of updates to the documentation, including adding transform v2 documentation and some hints, as well as improving the documentation of various connectors.

See the Release Note [Docs] section for details.

Document address: https://seatunnel.apache.org/versions/

Release Note

https://github.com/apache/incubator-seatunnel/blob/2.3.1/release-note.md

Acknowledgement to the contributors

contributors

· 阅读需 3 分钟

cover

SeaTunnel Zeta has been officially released with the joint efforts of the community. After comparing the performance of SeaTunnel with DataX and Airbyte, we also compared the performance of SeaTunnel with the popular data synchronization tool AWS GLUE.

The results showed that SeaTunnel batch syncs MySQL data to MySQL 420% faster than GLUE.

To ensure the accuracy of the test, we took on the test under the same test environment: under the same resource conditions, we tested SeaTunnel and AWS GLUE to synchronize data from MySQL to MySQL in batches and compared the time required for the two tools.

1

We created a table in MySQL containing 31 fields, with the primary key selected as an incrementing ID, and all other fields generated randomly, without setting any indexes. The table creation statement is as follows:

create table test.type_source_table
(
id int auto_increment
primary key,
f_binary binary(64) null,
f_blob blob null,
f_long_varbinary mediumblob null,
f_longblob longblob null,
f_tinyblob tinyblob null,
f_varbinary varbinary(100) null,
f_smallint smallint null,
f_smallint_unsigned smallint unsigned null,
f_mediumint mediumint null,
f_mediumint_unsigned mediumint unsigned null,
f_int int null,
f_int_unsigned int unsigned null,
f_integer int null,
f_integer_unsigned int unsigned null,
f_bigint bigint null,
f_bigint_unsigned bigint unsigned null,
f_numeric decimal null,
f_decimal decimal null,
f_float float null,
f_double double null,
f_double_precision double null,
f_longtext longtext null,
f_mediumtext mediumtext null,
f_text text null,
f_tinytext tinytext null,
f_varchar varchar(100) null,
f_date date null,
f_datetime datetime null,
f_time time null,
f_timestamp timestamp null
);

SeaTunnel Task Configuration

In SeaTunnel, we split the data according to the ID field and process it in multiple sub-tasks. Here is the configuration file for SeaTunnel:

env {
job.mode = "BATCH"
checkpoint.interval = 300000
}
source {
Jdbc {
url = "jdbc:mysql://XXX:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "password"
connection_check_timeout_sec = 100
query = "select id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_time, f_timestamp from test"
partition_column = "id"
partition_num = 40
parallelism = 2
}
}
sink {
Jdbc {
url = "jdbc:mysql://XXX:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "password"
query = "insert into test_1 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
}
}

Under fixed JVM memory of 4G and parallelism of 2, SeaTunnel completed the synchronization in 1965 seconds. Based on this conclusion, we tested the speed of GLUE under the same memory and concurrency settings.

GLUE Task Configuration

We created a MySQL-to-MySQL job as follows:

2

Configuration source connect with the target:

3

Job configuration:

4

5

Adjust the memory: job parameters configuration

6

— conf spark.yarn.executor.memory=4g

Under this configuration, GLUE took 8191 seconds to complete the synchronization.

Conclusion

After comparing the best configurations, we conducted a more in-depth comparison for different memory sizes. The following chart shows the comparison results obtained through repeated testing under the same environment.

7

The unit is seconds.

8

Note: This comparison is based on SeaTunnel: commit ID f57b897, and we welcome to download and test it!

· 阅读需 6 分钟

Written by Wang Hailin, Apache SeaTunnel PPMC

Preface

Currently, SeaTunnel supports database change data capture (CDC https://github.com/apache/incubator-seatunnel/issues/3175), to transfer data changes to downstream systems in real time. SeaTunnel categorizes the captured data changes into the following 4 types:

  • INSERT: Data insertion
  • UPDATE_BEFORE: Historical value before data change
  • UPDATE_AFTER: New value after data change
  • DELETE: Data deletion

To handle the above data change operations, the Sink Connector needs to support writing behavior. This article will introduce how the ClickHouse Sink Connector supports writing these CDC types of data changes.

For CDC scenarios, the primary key is a necessary condition, so first, it needs to support the general requirements of INSERT, UPDATE, DELETE, etc. based on the primary key and ensure that the writing order is consistent with the CDC event order. In addition, considering the complexity of the data source in practice, it also needs to support UPSERT writing. Finally, according to the characteristics of ClickHouse itself, corresponding optimizations need to be made, such as UPDATE and DELETE being heavyweight operations in ClickHouse, which should be optimized based on the corresponding table engine's characteristics.

Overall design

The current ClickHouse Sink Connector is based on the JDBC Driver implementation, and a group of JDBC executors can be designed to encapsulate the processing of different types of data, making it convenient to switch or combine implementations based on actual scenarios and encapsulate implementation details.

JdbcBatchStatementExecutor is the top-level interface of the executor.

public interface JdbcBatchStatementExecutor extends AutoCloseable {

void prepareStatements(Connection connection) throws SQLException;

void addToBatch(SeaTunnelRow record) throws SQLException;

void executeBatch() throws SQLException;

void closeStatements() throws SQLException;

@Override
default void close() throws SQLException {
closeStatements();
}
}

JdbcBatchStatementExecutor has the following implementation classes:

SimpleBatchStatementExecutor // implements simple SQL Batch execution logic 
InsertOrUpdateBatchStatementExecutor // implements INSERT, UPDATE update, also supports UPSERT mode
ReduceBufferedBatchStatementExecutor // memory accumulation, when refreshing to the database, the data change type (INSERT, UPDATE, DELETE) is distributed to the specific execution executor

Handling of cases where the primary key is not specified

Currently, in CDC processing, the primary key is a necessary condition. If the Sink Connector is not specified in the primary key column configuration, it uses the append-only mode to write, calling SimpleBatchStatementExecutor directly.

CDC data process

We divide the execution logic of data processing as follows: different data types enter the corresponding Executor and are finally transformed into their respective SQL statements for execution, and Jdbc Batch batching is used during this process.

CDC Event
/ \
/ \
/ \
/ \
DELETE Executor INSERT OR UPDATE Executor
/ \
/ \
/ \
/ \
INSERT Executor UPDATE Executor

Maintaining the Order of CDC Data

CDC events are ordered, and writing must be processed in the order in which the events occur, otherwise data inconsistencies may occur.

In the previous logic, data of different types were distributed to their respective Executors and Jdbc Batch was used for batch submission to improve write performance, but categorizing batching can result in the order of submissions not being consistent with the CDC event order.

We can add an execution barrier marker, when the processed data row is of the same type as the previous data row, it can be batched, if not, the previous batch is first flushed to the database, ensuring that the data write order is strictly consistent with the CDC event order.

Example for InsertOrUpdateBatchStatementExecutor

public class InsertOrUpdateBatchStatementExecutor implements JdbcBatchStatementExecutor {
@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
boolean currentChangeFlag = hasInsert(record);
if (currentChangeFlag) {
if (preChangeFlag != null && !preChangeFlag) {
updateStatement.executeBatch();
updateStatement.clearBatch();
}
valueRowConverter.toExternal(record, insertStatement);
insertStatement.addBatch();
} else {
if (preChangeFlag != null && preChangeFlag) {
insertStatement.executeBatch();
insertStatement.clearBatch();
}
valueRowConverter.toExternal(record, updateStatement);
updateStatement.addBatch();
}
preChangeFlag = currentChangeFlag;
submitted = false;
}

@Override
public void executeBatch() throws SQLException {
if (preChangeFlag != null) {
if (preChangeFlag) {
insertStatement.executeBatch();
insertStatement.clearBatch();
} else {
updateStatement.executeBatch();
updateStatement.clearBatch();
}
}
submitted = true;
}
}

Of course, this will significantly slow down the batch processing, so we use ReduceBufferedBatchStatementExecutorto add a memory buffer layer, and when executing batch submissions, we distribute submissions to the database.

Example for ReduceBufferedBatchStatementExecutor

public class ReduceBufferedBatchStatementExecutor implements JdbcBatchStatementExecutor {
private final LinkedHashMap<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> buffer = new LinkedHashMap<>();

@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
buffer.put(record, ...);
}

@Override
public void executeBatch() throws SQLException {
Boolean preChangeFlag = null;
Set<Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>>> entrySet = buffer.entrySet();
for (Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> entry : entrySet) {
Boolean currentChangeFlag = entry.getValue().getKey();
if (currentChangeFlag) {
if (preChangeFlag != null && !preChangeFlag) {
deleteExecutor.executeBatch();
}
insertOrUpdateExecutor.addToBatch(entry.getValue().getValue());
} else {
if (preChangeFlag != null && preChangeFlag) {
insertOrUpdateExecutor.executeBatch();
}
deleteExecutor.addToBatch(entry.getKey());
}
preChangeFlag = currentChangeFlag;
}

if (preChangeFlag != null) {
if (preChangeFlag) {
insertOrUpdateExecutor.executeBatch();
} else {
deleteExecutor.executeBatch();
}
}
buffer.clear();
}
}

Implementing a General UPSERT Write

In InsertOrUpdateBatchStatementExecutor, you can configure to turn on UPSERT, when processing INSERT or UPDATE data types, it will first use the primary key to query the data row to see if it already exists and then decide to use INSERT or UPDATE SQL for writing.

Note: This configuration is optional and will slow down the write speed, only opens when certain special scenarios are required.

Example for InsertOrUpdateBatchStatementExecutor

public class InsertOrUpdateBatchStatementExecutor implements JdbcBatchStatementExecutor {
@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
boolean currentChangeFlag = hasInsert(record);
...
}

private boolean hasInsert(SeaTunnelRow record) throws SQLException {
if (upsertMode()) {
return !exist(keyExtractor.apply(record));
}
switch (record.getRowKind()) {
case INSERT:
return true;
case UPDATE_AFTER:
return false;
default:
throw new UnsupportedOperationException();
}
}

private boolean exist(SeaTunnelRow pk) throws SQLException {
keyRowConverter.toExternal(pk, existStatement);
try (ResultSet resultSet = existStatement.executeQuery()) {
return resultSet.next();
}
}
}

Optimizing UPSERT for ReplacingMergeTree Engine

The ReplacingMergeTree table engine can configure an ORDER BY field, and when executing the INSERT INTO statement, it covers the records with the same ORDER BY field. We can also utilize this feature to implement UPSERT.

When the user writes to the ReplacingMergeTree table engine and the table's ORDER BY field is the same as the primary key field configured in the Sink Connector, both INSERT/UPDATE_AFTER data types are processed as INSERT to implement UPSERT.

Optimizing Updates for the MergeTree Engine

DELETE and UPDATE are heavyweight operations in ClickHouse, but there is an experimental lightweight deletion (https://clickhouse.com/docs/en/sql-reference/statements/delete) for MergeTree engine, which performs better than the heavyweight deletion. We allow the user to configure the lightweight deletion.

When the user writes to the MergeTree table engine and enables the lightweight deletion, we treat both DELETE/UPDATE_BEFORE data types as lightweight deletions, and treat both INSERT/UPDATE_AFTER data types as INSERTs, avoiding the UPDATE operation and using the lightweight deletion.

Contribution to improving the related functions is welcomed, if you have any questions, please raise an issue on SeaTunnel GitHub (https://www.github.com/apache/incubator-seatunnel), and we will reply as soon as possible.

Reference

· 阅读需 12 分钟

In the recently released SeaTunnel 2.3.0 official version, the community self-developed engine SeaTunnel Zeta which has been under preparation for more than a year——is officially released, and it will be used as the default engine of SeaTunnel in the future, providing users with high throughput, low latency, reliable consistent synchronization job operation guarantee.

Why does SeaTunnel develop its synchronization engine? What is the positioning of the SeaTunnel Engine? How is it different from traditional computing engines? What is the design idea? What is unique about the architectural design? These questions will be answered in this article.

  • Why develop our engine
  • SeaTunnel Engine Positioning
  • Design ideas
  • Architecture design
  • Unique advantages and features
  • Current basic functions and features
  • Future optimization plan

01 Why develop our engine

It was a year ago that the SeaTunnel community publicly stated for the first time that it would develop its engine. The reason why the team decided to develop a self-developed engine was that SeaTunnel's connector can run only on Flink or Spark, and Flink and Spark, as computing engines, have many unsolvable problems when integrating and synchronizing data.

Refer to: Why do we self-develop the big data synchronization engine SeaTunnel Zeta? https://github.com/apache/incubator-seatunnel/issues/1954

02 Design ideas

The general idea of engine design is as follows:

  1. Simple and easy to use, the new engine minimizes the dependence on third-party services, and can realize cluster management, snapshot storage, and cluster HA functions without relying on big data components such as Zookeeper and HDFS. This is very useful for users who do not have a big data platform or are unwilling to rely on a big data platform for data synchronization.
  2. More resource-saving, at the CPU level, Zeta Engine internally uses Dynamic Thread Sharing (dynamic thread sharing) technology. In the real-time synchronization scenario, if the number of tables is large but the amount of data in each table is small, Zeta Engine will Synchronous tasks run in shared threads, which can reduce unnecessary thread creation and save system resources. On the read and data write side, the Zeta Engine is designed to minimize the number of JDBC connections. In the CDC scenario, Zeta Engine will try to reuse log reading and parsing resources as much as possible.
  3. More stable. In this version, Zeta Engine uses Pipeline as the minimum granularity of Checkpoint and fault tolerance for data synchronization tasks. The failure of a task will only affect the tasks that have upstream and downstream relationships with it. Try to avoid task failures that cause the entire Job to fail. or rollback. At the same time, for scenarios where the source data has a storage time limit, Zeta Engine supports enabling data cache to automatically cache the data read from the source, and then the downstream tasks read the cached data and write it to the target. In this scenario, even if the target end fails and data cannot be written, it will not affect the normal reading of the source end, preventing the source end data from being deleted due to expiration.
  4. Faster, Zeta Engine’s execution plan optimizer will optimize the execution plan to reduce the possible network transmission of data, thereby reducing the loss of overall synchronization performance caused by data serialization and deserialization, and completing faster Data synchronization operations. Of course, it also supports speed limiting, so that sync jobs can be performed at a reasonable speed.
  5. Data synchronization support for all scenarios. SeaTunnel aims to support full synchronization and incremental synchronization under offline batch synchronization, and support real-time synchronization and CDC.

03 Architecture design

SeaTunnel Engine is mainly composed of a set of APIs for data synchronization processing and a core computing engine. Here we mainly introduce the architecture design of the SeaTunnel Engine core engine. picture

SeaTunnel Engine consists of three main services: CoordinatorService, TaskExecutionService, and SlotService.

Coordinator Service

CoordinatorService is the Master service of the cluster, which provides the generation process of each job from LogicalDag to ExecutionDag, and then to PhysicalDag, and finally creates the JobMaster of the job for scheduling execution and status monitoring of the job. CoordinatorService is mainly composed of 4 large functional modules:

  1. JobMaster is responsible for the generation process from LogicalDag to ExecutionDag to PhysicalDag of a single job, and is scheduled to run by PipelineBaseScheduler.
  2. CheckpointCoordinator, responsible for the Checkpoint process control of the job.
  3. ResourceManager is responsible for the application and management of job resources. It currently supports Standalone mode and will support On Yarn and On K8s in the future.
  4. Metrics Service, responsible for the statistics and summary of job monitoring information.

TaskExecutionService

TaskExecutionService is the Worker service of the cluster, which provides the real runtime environment of each Task in the job. TaskExecutionService uses Dynamic Thread Sharing technology to reduce CPU usage.

SlotService

SlotService runs on each node of the cluster and is mainly responsible for the division, application, and recycling of resources on the node.

04 Unique advantages and features

Autonomous cluster

SeaTunnel Engine has realized autonomous clustering (no centralization). To achieve cluster autonomy and job fault tolerance without relying on third-party service components (such as Zookeeper), SeaTunnel Engine uses Hazelcast as the underlying dependency. Hazelcast provides a distributed memory network, allowing users to operate a distributed collection like a normal Java collection locally. SeaTunnel saves the status information of the job in the memory grid of Hazelcast. When the Master node switches, it can Job state recovery based on data in the Hazelcast in-memory grid. At the same time, we have also implemented the persistence of Hazelcast memory grid data, and persisted the job status information to the storage (database of JDBC protocol, HDFS, cloud storage) in the form of WAL. In this way, even if the entire cluster hangs and restarts, the runtime information of the job can be repaired.

Data cache

SeaTunnel Engine is different from the traditional Spark/Flink computing engine, it is an engine specially used for data synchronization. The SeaTunnel engine naturally supports data cache. When multiple synchronous jobs in the cluster share a data source, the SeaTunnel engine will automatically enable the data cache. The source of a job will read the data and write it into the cache, and all other jobs will no longer read data from the data source but are automatically optimized to read data from the Cache. The advantage of this is that it can reduce the reading pressure of the data source and reduce the impact of data synchronization on the data source.

Speed control

SeaTunnel Engine supports the speed limit during data synchronization, which is very useful when reading data sources with high concurrency. A reasonable speed limit can not only ensure that the data is synchronized on time, but also minimize the pressure on the data source.

Shared connection pool to reduce database pressure

At present, the underlying operating tools and data synchronization tools provided by computing engines such as Spark/Flink cannot solve the problem that each table needs a JDBC connection when the entire database is synchronized. Database connections are resources for the database. Too many database connections will put great pressure on the database, resulting in a decrease in the stability of database read and write delays. This is a very serious accident for business databases. To solve this problem, SeaTunnel Engine uses a shared connection pool to ensure that multiple tables can share JDBC connections, thereby reducing the use of database connections.

Breakpoint resume (incremental/full volume)

SeaTunnel Engine supports resumed uploads under offline synchronization. When the amount of data is large, a data synchronization job often needs to run for tens of minutes or several hours. If the middle job hangs up and reruns, it means wasting time. SeaTunnel Engine will continue to save the state (checkpoint) during the offline synchronization process. When the job hangs up and reruns, it will continue to run from the last checkpoint, which effectively solves the data that may be caused by hardware problems such as node downtime. Delay.

The Schema revolution route

Schema evolution is a feature that allows users to easily change a table's current schema to accommodate data that changes over time. Most commonly, it is used when performing an append or overwrite operation, to automatically adjust the schema to include one or more new columns.

This capability is required in real-time data warehouse scenarios. Currently, the Flink and Spark engines do not support this feature.

Fine-grained fault-tolerant design

Flink's design is fault tolerance and rollback at the entire job level. If a task fails, the entire job will be rolled back and restarted. The design of SeaTunnel Engine takes into account that in the data synchronization scenario, in many q cases, the failure of a task should only need to focus on fault tolerance for tasks that have upstream and downstream relationships with it. Based on this design principle, SeaTunnel Engine will first generate a logical DAG according to the user-configured job configuration file, then optimize the logical DAG, and finally generate a pipeline (a connected subgraph in a job DAG) to call and execute jobs at the granularity. fault tolerance.

A typical usage scenario is:

Use the CDC connector to read data from MySQL's binlog and write it to another MySQL. If you use Flink or Spark engine, once the target MySQL cannot write, it will cause the task of CDC to read the binlog to be terminated. If MySQL is set If the expiration time of the log is set, the problem of the target MySQL is solved, but the log of the source MySQL is cleared, which leads to data loss and other problems.

SeaTunnel Engine will automatically optimize this synchronization task, automatically add the source to the target Cache, and then further optimize this job into two Pipelines, pipeline#1 is responsible for reading data from the CDC and writing it to the SeaTunnel Cache, and pipeline#2 is responsible for reading data from the SeaTunnel Cache Cache reads data and writes to target MySQL. If there is a problem with the target MySQL and cannot be written, the pipeline#2 of this synchronization job will be terminated, and the pipeline#1 will still run normally. This design fundamentally solves the above problems and is more in line with the processing logic of the data synchronization engine.

Dynamically share threads to reduce resource usage

SeaTunnel Engine's Task design uses shared thread technology. Different from Flink/Spark, SeaTunnel Engine does not simply allow a Task to occupy a thread, but through a dynamic perception method - Dynamic Thread Sharing (Dynamic Thread Sharing) To judge whether a Task should share a thread with other Tasks or should monopolize a thread.

Compared with single-threaded serial computing, multi-threaded parallel computing has better performance advantages, but if each Task uses an independent thread to run, when there are many tables for data synchronization and the number of Tasks is large, it will be in the Worker node Start very many threads on it. When the number of CPU cores is fixed, the more threads, the better. When the number of threads is too large, the CPU needs to spend a lot of time on thread context switching, which will affect computing performance.

Flink/Spark usually limits the maximum number of tasks running on each node. In this way, it can avoid starting too many threads. To run more tasks on one node, SeaTunnel Engine can share thread technology. Let those tasks with a small amount of data share threads, and tasks with a large amount of data exclusively use threads. This method makes it possible for SeaTunnel Engine to run hundreds or thousands of table synchronization tasks on one node, with less resource occupation. Complete the synchronization of more tables.

05 Basic functions and features

2.3.0 is the first official version of SeaTunnel Engine, which implements some basic functions. For the detailed design, please refer to: https://github.com/apache/incubator-seatunnel/issues/2272

[ Cluster Management ]

  • Support stand-alone operation
  • Support cluster operation
  • Autonomous cluster (no centralization), no need to specify a Master node for the SeaTunnel Engine cluster, SeaTunnel Engine elects the Master node by itself during operation and automatically selects a new Master node after the Master node hangs up.
  • Automatic discovery of cluster nodes, the nodes with the same cluster_name will automatically form a cluster.

[ Core function ]

  • Supports running jobs in Local mode. The cluster is automatically destroyed after the job runs.
  • It supports running jobs in Cluster mode (single machine or cluster) and submitting jobs to the SeaTunnel Engine service through SeaTunnel Client. After the job is completed, the service continues to run and waits for the next job submission.
  • Support offline batch synchronization.
  • Support real-time synchronization.
  • Batch and flow integration, all SeaTunnel V2 version connectors can run in SeaTunnel Engine.
  • Supports distributed snapshot algorithm cooperates with SeaTunnel V2 connector to support two-phase commit, and ensures data exactly-once.
  • Supports job invocation at the Pipeline level to ensure that it can be started even when resources are limited.
  • Supports job fault tolerance at the Pipeline level. The failure of a Task only affects the Pipeline it is in, and only the Task under the Pipeline needs to be rolled back.
  • Supports dynamic thread sharing to achieve real-time synchronization of a large number of small data sets.

06 Future optimization plan

  • Support Cache mode, and first support Kafka as Cache
  • Support JobHistory, support the persistence of JobHistory.
  • Support indicator (Reader Rows, QPS, Reader Bytes) monitoring and indicator query
  • Support dynamic modification of the execution plan.
  • Support CDC.
  • Support whole database synchronization
  • Support multi-table synchronization
  • Support for Schema Revolution

· 阅读需 12 分钟

Apache IoTDB (Internet of Things Database) is a software system that integrates the collection, storage, management, and analysis of time series data of the Internet of Things, which can meet the needs of massive data storage, high-speed data reading, and complex data analysis in the field of Industrial Internet of Things. Currently, SeaTunnel already supports IoTDB Connector, realizing the connection of data synchronization scenarios in the IoT field.

At the SeaTunnel community online meeting in October this year, SeaTunnel Committer Wang Hailin introduced the implementation process of SeaTunnel’s access to IoTDB, allowing users to have a deeper understanding of the operation method and principle of IoTDB data synchronization.

The topic I’m sharing today is using SeaTunnel to play around with data synchronization in IoTDB.

This session is divided into 6 subsections. Firstly, we will have an understanding of the basic concept of SeaTunnel, and on this basis, we will focus on the functional features of IoTDB Connector, then we will analyze the data read and write functions of IoTDB Connector and the parsing of the implementation, and finally, we will show some typical usage scenarios and cases to let you understand how to use Finally, we will show some typical usage scenarios and cases to understand how to use the IoTDB Connector to implement into production environments. The last point is the community’s next steps for the IoTDB Connector and guidance on how to get involved in contributing.

Introduction to SeaTunnel basic concepts

This is the basic architecture of SeaTunnel, an engine built for data synchronization, with a set of abstract APIs for reading data from and writing to a variety of data sources.

The left-hand side briefly lists the Source scenarios, for example, we abstract the Source’s API, Type, and State, to read the data source, unifying the data types of the various data sources to the abstract type defined in it, and some state recovery and retention of the read location during the reading process.

This is an abstraction for Source, and we have done a similar abstraction for Sink, i.e. how data is written, and how the data type matches the real data source type, and how the state is restored and retained.

Based on these APIs, we will have a translation layer to translate these APIs to the corresponding execution engine. SeaTunnel currently supports three execution engines, Spark, Flink, and our own execution engine, SeaTunnel Engine, which will be released soon.

This is roughly what SeaTunnel does, SeaTunnel relies on Source and Sink to read and write data for data synchronization, we call them Connectors. The Connector consists of a Source and a Sink.

From the diagram above we see the different data sources, Source is responsible for reading data from the various data sources and transforming it into SeaTunnelRow abstraction layer and Type to form the abstraction layer, Sink is responsible for pulling data from the abstraction layer and writing it to the concrete data store to transform it into the store concrete format.

The combination of Source + Abstraction Layer + Sink enables the synchronization of data between multiple heterogeneous data sources.

I’ll use a simple example below to illustrate how SeaTunnel’s Source and Sink work.

We can specify the number of Sources, Sink configuration file combinations through the configuration file The commands in the toolkit provided by SeaTunnel take the configuration file with them and when executed enable data handling.

This is the Connector ecosystem that is currently supported by SeaTunnel, such as the data sources supported by JBDC, HDFS, Hive, Pulsar, message queues, etc. are currently supported.

The list in the picture is not exhaustive of the Connectors supported by SeaTunnel. Under the GitHub SeaTunnel project, you can see the Plugins directory, where supported Connector plugins are constantly being added and where you can see the latest access in real-time.

IoTDB Connector Features

Below is information about access to the IoTDB Connector.

Firstly, we would like to introduce the functional features of IoTDB, the IoTDB Connector integrated with SeaTunnel, and what exactly it supports for your reference.

Source Features

Firstly, there are the typical usage scenarios supported by Source, such as bulk reading of devices, field projection, data type mapping, parallel reading, etc.

As you can see above, IoTDB supports all features except once, exactly once and stream mode, such as batch reads, IoTDB has a SQL syntax similar to group by device, which allows you to read data from multiple devices in a single batch. For basic data type projection, the SQL in IoTDB will take time by default when looking up any metric, or group by the device will take the device column, and we also support projection onto SeaTunnel columns by default.

The only data type not supported is Victor, all others are supported.

For the parallel read piece, the IoTDB data is actually timestamped and we use timestamped ranges to achieve parallel reads.

The recovery of the state, since we have divided the time range read into different splits, can be done based on the Split location information.

Sink functional features

The diagram above shows the features already supported by SeaTunnel. Regarding metadata extraction, we support the extraction of metadata such as measurement, device, etc. from SeaTunnelRow and the extraction or use of current processing time from SeaTunnelRow. Batch commits and exception retries are also supported.

IoTDB data reading analysis

Next, we analyze the implementation and support for data reading.

Data type mapping

The first is the data type mapping, which actually reads the IoTDB data type to SeaTunnel, so it has to be converted to the SeaTunnel data type. The BOOLEAN, INT32, INT64, etc. listed here all have corresponding SeaTunnel data types. INT32 can be mapped according to the read type on the SeaTunnel, or to TINYINT, SMALLINT, or INT when the range of values is small.

The Vector type is not currently supported.

This is the corresponding example code showing how the mapping is done where the type conversion is done.

Field projection

The other is the field projection when reading, we can automatically map Time fields when reading IoTDB data, or we can choose to map some of the data to SeaTunnel, such as TIMESTAMP, or BIGINT.

The SQL extraction of column codes allows you to extract only some of the columns you need, and when used on SeaTunnel, you can specify the name, type, etc. of the column after it is mapped to SeaTunnel via fields. The final result of the data read on SeaTunnel is shown in the figure above.

We have just seen that we do not have the time column in the SQL, but the actual result is that there is this column, so we support the projection of the time column field, the time column can actually be projected into different data types, the user can convert according to their needs. The diagram above shows the implementation logic.

Batch read Device

This is a common requirement, as we are likely to synchronize data in large batches with the same data structure.

SeaTunnel supports the align-by-device syntax so that device columns can also be projected onto the SeaTunnelRow

Assuming there is a table in IoTDB, we project the device column onto SeaTunnel by making it data as well through syntax. After configuring the device name column and specifying the data type, we end up reading the data on SeaTunnel in the format shown above, containing the Time, device column, and the actual data value. This makes it possible to read data from the same device in bulk.

Parallel reading

The other is a parallel read.

  • Split We have scoped the table by the Time column and if we are reading in parallel we may want to scope the table to allow parallel threads/processes to read a specific range of data. By configuring the three parameters, the end result will be a query SQL, where the original SQL is divided into different splits with query conditions to achieve the actual read SQL.

  • Allocate Split to the reader Once the split is done, there is an allocation logic to follow in order to distribute it to each parallel reader.

This logic is based on the ID of the split to the reader, which may be more random, or more uniform if the ID of the split is more hashed, depending on the Connector.

The result achieved is shown in the picture.

Status recovery

There is also state recovery involved when reading because if the task is large, the reading will take longer, and if there is an error or exception in the middle, you have to consider how to recover the state from the point where the error occurred, and then read it again afterward.

SeaTunnel’s state recovery is mainly through the reader storing the unread Split information into the state, and then the engine will periodically take a snapshot of the state when reading so that we can restore the last snapshot when we recover and continue reading afterward.

IoTDB Connector Data Write Analysis

The next step is the parsing of the data writes.

Data type mapping

Data writing also involves data type mapping, but here, in contrast to data reading, it maps the SeaTunnel data types to the IoTDB data types. As IoTDB only has INT32, the writing process involves lifting the data types TINYINT and SMALLINT. All other data types can be converted one-to-one; ARRAY and VECTOR data types are not yet supported.

The above diagram shows the corresponding code, the implementation logic will need to be seen in our specific mapping.

Dynamic injection of metadata

SeaTunnel supports the dynamic injection of metadata.

When heterogeneous data sources are written to the IoTDB, device, measurement, and time are extracted from each row of data, either by serializing the SeaTunnelRow with a fixed column value as configured. Alternatively, the system time can be used as the time, or the current system time can be populated if no time column is specified, and the storage group can be configured to be automatically appended to the device prefix.

For example, suppose that the structure of a row in SeaTunnel reading the data format shown above can be configured to synchronize to the IoTDB and the result obtained is as follows.

The temperature and humidity columns we need were extracted, and ts and device names were extracted as the original data for the IoTDB.

Batch commits and exception retries

In addition, Sink needs to handle batch and retry when writing. For batches, we can configure the appropriate batch configuration, including support for configuring the number and interval of batch commits; if the data is cached to memory, you can enable a separate thread for timed commits.

For retries, SeaTunnel supports the configuration of the number of retries, the waiting interval and the maximum number of retries, as well as the possibility to end a retry if it encounters a non-recoverable error when it has finished.

IoTDB Connector Usage Examples

After the previous analysis of reading and writing data, let’s look at three typical examples of usage scenarios.

Exporting data from IoTDB

The first scenario is exporting data from the IoTDB, the example I have given here is reading data from the IoTDB to the Console.

  • Read in parallel, output to Console

Parallelism: 2

Number of batches: 24

Time frame: 2022–09–25 ~ 2022–09–26

Let’s assume that we have a data table in IoTDB and we want to export the data to the Console. The whole configuration is shown above and needs to map the columns of data we want to export and the time range to check.

This is the simplest example, but in practice, the Sink side may be more complex, so you will need to refer to the documentation of the corresponding data source for the appropriate configuration.

Importing data to IoTDB

  • Read database, batch write to IoTDB
    • Batch writing: one commit every 1024 entries or every 1000 ms
    • -Extracting metadata device, timestamp, measurement
    • -Specify the storage group: root.test_group

Another typical usage scenario is to import data from other data sources into IoTDB. suppose I have an external database table with columns like ts, temperature, humidity, etc. and we import it into IoTDB, requiring the columns of temperature and humidity, but the rest can be left out. The whole configuration is shown in the diagram above, you can refer to it.

On the Sink side, you mainly have to specify the Key of the device column, such as from which data the device is extracted, from which class the time is extracted, which columns to write to the IoTDB, etc.

As you can see, we can configure the storage group, which is the storage group of the IoTDB, which can be specified by the storage group.

Synchronizing data between IoTDB

The third scenario is to synchronize data between IoTDB and IoTDB and write to IoTDB in bulk, suppose there is a table in IoTDB that needs to be synchronized to another IoTDB, after synchronization the storage group has changed and the name of the indicator of the data column has also changed, then you can use projection to rewrite the indicator name and use SQL to rewrite the storage group.

How to get involved in contribution

Finally, a few words about the next steps for the IoTDB Connector and how you can get involved in improving the Connector and contributing new features that are needed.

Next steps for the IoTDB Connector

  • Support for reading and writing vector data types
  • Support for tsfile reads and writes
  • Support for writing tsfile and reloading to IoTDB

· 阅读需 10 分钟

Apache SeaTunnel Committer | Zongwen Li

Introduction to Apache SeaTunnel

Apache SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data.

Apache SeaTunnel will try its best to solve the problems that may be encountered in the process of mass data synchronization, such as data loss and duplication, task accumulation and delay, low throughput, etc.

Milestones of SeaTunnel

SeaTunnel, formerly known as Waterdrop, was open-sourced on GitHub in 2017.

In October 2021, the Waterdrop community joined the Apache incubator and changed its name to SeaTunnel.

SeaTunnel Growth

When SeaTunnel entered the Apache incubator, the SeaTunnel community ushered in rapid growth.

As of now, the SeaTunnel community has a total of 151 contributors, 4314 Stars, and 804 forks.

Pain points of Existing engines

There are many pain points faced by the existing computing engines in the field of data integration, and we will talk about this first. The pain points usually lie in three directions:

  • The fault tolerance ability of the engine;
  • Difficulty in configuration, operation, and maintenance of engine jobs;
  • The resource usage of the engine.

fault tolerance

Global Failover Global-failover For distributed streaming processing systems, high throughput and low latency are often the most important requirements. At the same time, fault tolerance is also very important in distributed systems. For scenarios that require high correctness, the implementation of exactly once is often very important.

In a distributed streaming processing system, since the computing power, network, load, etc. of each node are different, the state of each node cannot be directly merged to obtain a true global state. To obtain consistent results, the distributed processing system needs to be resilient to node failure, that is, it can recover to consistent results when it fails.

Although it is claimed in their official blog that Spark’s Structured Streaming uses the Chandy-Lamport algorithm for Failover processing, it does not disclose more details.

Flink implemented Checkpoint as a fault-tolerant mechanism based on the above algorithm and published related papers: Lightweight Asynchronous Snapshots for Distributed Dataflows

In the current industrial implementation, when a job fails, all nodes of the job DAG need to failover, and the whole process will last for a long time, which will cause a lot of upstream data to accumulate.

Loss of Data

The previous problem will cause a long-time recovery, and the business service may accept a certain degree of data delay.

In a worse case, a single sink node cannot be recovered for a long time, and the source data has a limited storage time, such as MySQL and Oracle log data, which will lead to data loss.

Configuration is cumbersome

Single table Configuration

The previous examples are cases regarding a small number of tables, but in real business service development, we usually need to synchronize thousands of tables, which may be divided into databases and tables at the same time;

The status quo is that we need to configure each table, a large number of table synchronization takes a lot of time for users, and it is prone to problems such as field mapping errors, which are difficult to maintain.

Not supporting Schema Evolution

Not-supports-DDL Besides, according to the research report of Fivetran, 60% of the company’s schema will change every month, and 30% will change every week.

However, none of the existing engines supports Schema Evolution. After changing the Schema each time, the user needs to reconfigure the entire link, which makes the maintenance of the job very cumbersome.

The high volume of resource usage

The database link takes up too much

If our Source or Sink is of JDBC type, since the existing engine only supports one or more links per table, when there are many tables to be synchronized, more link resources will be occupied, which will bring a great burden to the database server.

Operator pressure is uncontrollable

In the existing engine, a buffer and other control operators are used to control the pressure, that is, the back pressure mechanism; since the back pressure is transmitted level by level, there will be pressure delay, and at the same time, the processing of data will not be smooth enough, increasing the GC time, fault-tolerant completion time, etc.

Another case is that neither the source nor the sink has reached the maximum pressure, but the user still needs to control the synchronization rate to prevent too much impact on the source database or the target database, which cannot be controlled through the back pressure mechanism.

Architecture goals of Apache SeaTunnel Engine

To solve these severe issues faced by computing engines, we self-developed our engine expertise in big data integration.

Firstly, let’s get through what goals this engine wants to achieve.

Pipeline Failover

In the data integration case, there is a possibility that a job can synchronize hundreds of sheets, and the failure of one node or one table will lead to the failure of all tables, which is too costly.

We expect that unrelated Job Tasks will not affect each other during fault tolerance, so we call a vertex collection with upstream and downstream relationships a Pipeline, and a Job can consist of one or more pipelines.

Regional Failover

Now if there is an exception in the pipeline, we still need to failover all the vertex in the pipeline; but can we restore only part of the vertex? For example, if the Source fails, the Sink does not need to restart. In the case of a single Source and multiple Sinks, if a single Sink fails, only the Sink and Source that failed will be restored; that is, only the node that failed and its upstream nodes will be restored.

Obviously, the stateless vertex does not need to be restarted, and since SeaTunnel is a data integration framework, we do not have aggregation state vertexes such as Agg and Count, so we only need to consider Sink;

  • Sink does not support idempotence & 2PC; no restart and restart will result in the same data duplication, which can only be solved by Sink without restarting;
  • Sink supports idempotence, but does not support 2PC: because it is idempotent writing, it does not matter whether the source reads data inconsistently every time, and it does not need to be restarted;
  • Sink supports 2PC:
  • If the Source supports data consistency, if an abort is not executed, the processed old data will be automatically ignored through the channel data ID, and at the same time, it will face the problem that the transaction session time may time out;
  • If the Source does not support data consistency, perform abort on the Sink to discard the last data, which has the same effect as restarting but does not require initialization operations such as re-establishing links;
  • That is, the simplest implementation is to execute abort. We use the pipeline as the minimum granularity for fault-tolerant management, and use the Chandy-Lamport algorithm to realize fault-tolerant distributed jobs.

Data Cache

For sink failure, when data cannot be written, a possible solution is to work two jobs at the same time.

One job reads the database logs using the CDC source connector and then writes the data to Kafka using the Kafka Sink connector. Another job reads data from Kafka using the Kafka source connector and writes data to the destination using the destination sink connector.

This solution requires users to have a deep understanding of the underlying technology, and both tasks will increase the difficulty of operation and maintenance. Because every job needs JobMaster, it requires more resources.

Ideally, the user only knows that they will be reading data from the source and writing data to the sink, and at the same time, during this process, the data can be cached in case the sink fails. The sync engine needs to automatically add caching operations to the execution plan and ensure that the source still works in the event of a sink failure. In this process, the engine needs to ensure that the data written to the cache and read from the cache are transactional, to ensure data consistency.

Sharding & Multi-table Sync

For a large number of table synchronization, we expect that a single Source can support reading multiple structural tables, and then use the side stream output to keep consistent with a single table stream.

The advantage of this is that it can reduce the link occupation of the data source and improve the utilization rate of thread resources.

At the same time, in SeaTunnel Engine, these multiple tables will be regarded as a pipeline, which will increase the granularity of fault tolerance; there are trade-offs, and the user can choose how many tables a pipeline can pass through.

Schema Evolution

Schema Evolution is a feature that allows users to easily change the current schema of a table to accommodate changing data over time. Most commonly, it is used when performing an append or overwrite operation, to automatically adjust the schema to include one or more new columns.

This feature is required for real-time data warehouse scenarios. Currently, the Flink and Spark engines do not support this feature.

In SeaTunnel Engine, we will use the Chandy-Lamport algorithm to send DDL events, make them flow in the DAG graph and change the structure of each operator, and then synchronize them to the Sink.

Shared Resource

Shared-resource The Multi-table feature can reduce the use of some Source and Sink link resources. At the same time, we have implemented Dynamic Thread Resource Sharing in SeaTunnel Engine, reducing the resource usage of the engine on the server.

Speed Control

As for the problems that cannot be solved by the back pressure mechanism, we will optimize the Buffer and Checkpoint mechanism:

  • Firstly, We try to allow Buffer to control the amount of data in a period;
  • Secondly, by the Checkpoint mechanism, the engine can lock the buffer after the Checkpoint reaches the maximum number of parallelism and executes an interval time, prohibiting the writing of Source data, achieving the result of taking the pressure proactively, avoiding issues like back pressure delay or failure to be delivered to Source. The above is the design goal of SeaTunnel Engine, hoping to help you better solve the problems that bother you in data integration. In the future, we will continue to optimize the experience of using SeaTunnel so that more people are willing to use it.

The future of Apache SeaTunnel

As an Apache incubator project, the Apache SeaTunnel community is developing rapidly. In the following community planning, we will focus on four directions:

Support more data integration scenarios (Apache SeaTunnel Engine) It is used to solve the pain points that existing engines cannot solve, such as the synchronization of the entire database, the synchronization of table structure changes, and the large granularity of task failure;

Guys who are interested in the engine can pay attention to this Umbrella: https://github.com/apache/incubator-seatunnel/issues/2272

Expand and improve Connector & Catalog ecology Support more Connector & Catalog, such as TiDB, Doris, Stripe, etc., and improve existing connectors, improve their usability and performance, etc.; Support CDC connector for real-time incremental synchronization scenarios.

Guys who are interested in connectors can pay attention to this Umbrella: https://github.com/apache/incubator-seatunnel/issues/1946

Support for more versions of the engines Such as Spark 3.x, Flink 1.14.x, etc.

Guys who are interested in supporting Spark 3.3 can pay attention to this PR: https://github.com/apache/incubator-seatunnel/pull/2574

Easier to use (Apache SeaTunnel Web) Provides a web interface to make operations more efficient in the form of DAG/SQL Simple and more intuitive display of Catalog, Connector, Job, etc.; Access to the scheduling platform to make task management easier

Guys who are interested in Web can pay attention to our Web sub-project: https://github.com/apache/incubator-seatunnel-web