跳到主要内容

6 篇博文 含有标签「Meetup」

查看所有标签

· 阅读需 12 分钟

In the recently released SeaTunnel 2.3.0 official version, the community self-developed engine SeaTunnel Zeta which has been under preparation for more than a year——is officially released, and it will be used as the default engine of SeaTunnel in the future, providing users with high throughput, low latency, reliable consistent synchronization job operation guarantee.

Why does SeaTunnel develop its synchronization engine? What is the positioning of the SeaTunnel Engine? How is it different from traditional computing engines? What is the design idea? What is unique about the architectural design? These questions will be answered in this article.

  • Why develop our engine
  • SeaTunnel Engine Positioning
  • Design ideas
  • Architecture design
  • Unique advantages and features
  • Current basic functions and features
  • Future optimization plan

01 Why develop our engine

It was a year ago that the SeaTunnel community publicly stated for the first time that it would develop its engine. The reason why the team decided to develop a self-developed engine was that SeaTunnel's connector can run only on Flink or Spark, and Flink and Spark, as computing engines, have many unsolvable problems when integrating and synchronizing data.

Refer to: Why do we self-develop the big data synchronization engine SeaTunnel Zeta? https://github.com/apache/incubator-seatunnel/issues/1954

02 Design ideas

The general idea of engine design is as follows:

  1. Simple and easy to use, the new engine minimizes the dependence on third-party services, and can realize cluster management, snapshot storage, and cluster HA functions without relying on big data components such as Zookeeper and HDFS. This is very useful for users who do not have a big data platform or are unwilling to rely on a big data platform for data synchronization.
  2. More resource-saving, at the CPU level, Zeta Engine internally uses Dynamic Thread Sharing (dynamic thread sharing) technology. In the real-time synchronization scenario, if the number of tables is large but the amount of data in each table is small, Zeta Engine will Synchronous tasks run in shared threads, which can reduce unnecessary thread creation and save system resources. On the read and data write side, the Zeta Engine is designed to minimize the number of JDBC connections. In the CDC scenario, Zeta Engine will try to reuse log reading and parsing resources as much as possible.
  3. More stable. In this version, Zeta Engine uses Pipeline as the minimum granularity of Checkpoint and fault tolerance for data synchronization tasks. The failure of a task will only affect the tasks that have upstream and downstream relationships with it. Try to avoid task failures that cause the entire Job to fail. or rollback. At the same time, for scenarios where the source data has a storage time limit, Zeta Engine supports enabling data cache to automatically cache the data read from the source, and then the downstream tasks read the cached data and write it to the target. In this scenario, even if the target end fails and data cannot be written, it will not affect the normal reading of the source end, preventing the source end data from being deleted due to expiration.
  4. Faster, Zeta Engine’s execution plan optimizer will optimize the execution plan to reduce the possible network transmission of data, thereby reducing the loss of overall synchronization performance caused by data serialization and deserialization, and completing faster Data synchronization operations. Of course, it also supports speed limiting, so that sync jobs can be performed at a reasonable speed.
  5. Data synchronization support for all scenarios. SeaTunnel aims to support full synchronization and incremental synchronization under offline batch synchronization, and support real-time synchronization and CDC.

03 Architecture design

SeaTunnel Engine is mainly composed of a set of APIs for data synchronization processing and a core computing engine. Here we mainly introduce the architecture design of the SeaTunnel Engine core engine. picture

SeaTunnel Engine consists of three main services: CoordinatorService, TaskExecutionService, and SlotService.

Coordinator Service

CoordinatorService is the Master service of the cluster, which provides the generation process of each job from LogicalDag to ExecutionDag, and then to PhysicalDag, and finally creates the JobMaster of the job for scheduling execution and status monitoring of the job. CoordinatorService is mainly composed of 4 large functional modules:

  1. JobMaster is responsible for the generation process from LogicalDag to ExecutionDag to PhysicalDag of a single job, and is scheduled to run by PipelineBaseScheduler.
  2. CheckpointCoordinator, responsible for the Checkpoint process control of the job.
  3. ResourceManager is responsible for the application and management of job resources. It currently supports Standalone mode and will support On Yarn and On K8s in the future.
  4. Metrics Service, responsible for the statistics and summary of job monitoring information.

TaskExecutionService

TaskExecutionService is the Worker service of the cluster, which provides the real runtime environment of each Task in the job. TaskExecutionService uses Dynamic Thread Sharing technology to reduce CPU usage.

SlotService

SlotService runs on each node of the cluster and is mainly responsible for the division, application, and recycling of resources on the node.

04 Unique advantages and features

Autonomous cluster

SeaTunnel Engine has realized autonomous clustering (no centralization). To achieve cluster autonomy and job fault tolerance without relying on third-party service components (such as Zookeeper), SeaTunnel Engine uses Hazelcast as the underlying dependency. Hazelcast provides a distributed memory network, allowing users to operate a distributed collection like a normal Java collection locally. SeaTunnel saves the status information of the job in the memory grid of Hazelcast. When the Master node switches, it can Job state recovery based on data in the Hazelcast in-memory grid. At the same time, we have also implemented the persistence of Hazelcast memory grid data, and persisted the job status information to the storage (database of JDBC protocol, HDFS, cloud storage) in the form of WAL. In this way, even if the entire cluster hangs and restarts, the runtime information of the job can be repaired.

Data cache

SeaTunnel Engine is different from the traditional Spark/Flink computing engine, it is an engine specially used for data synchronization. The SeaTunnel engine naturally supports data cache. When multiple synchronous jobs in the cluster share a data source, the SeaTunnel engine will automatically enable the data cache. The source of a job will read the data and write it into the cache, and all other jobs will no longer read data from the data source but are automatically optimized to read data from the Cache. The advantage of this is that it can reduce the reading pressure of the data source and reduce the impact of data synchronization on the data source.

Speed control

SeaTunnel Engine supports the speed limit during data synchronization, which is very useful when reading data sources with high concurrency. A reasonable speed limit can not only ensure that the data is synchronized on time, but also minimize the pressure on the data source.

Shared connection pool to reduce database pressure

At present, the underlying operating tools and data synchronization tools provided by computing engines such as Spark/Flink cannot solve the problem that each table needs a JDBC connection when the entire database is synchronized. Database connections are resources for the database. Too many database connections will put great pressure on the database, resulting in a decrease in the stability of database read and write delays. This is a very serious accident for business databases. To solve this problem, SeaTunnel Engine uses a shared connection pool to ensure that multiple tables can share JDBC connections, thereby reducing the use of database connections.

Breakpoint resume (incremental/full volume)

SeaTunnel Engine supports resumed uploads under offline synchronization. When the amount of data is large, a data synchronization job often needs to run for tens of minutes or several hours. If the middle job hangs up and reruns, it means wasting time. SeaTunnel Engine will continue to save the state (checkpoint) during the offline synchronization process. When the job hangs up and reruns, it will continue to run from the last checkpoint, which effectively solves the data that may be caused by hardware problems such as node downtime. Delay.

The Schema revolution route

Schema evolution is a feature that allows users to easily change a table's current schema to accommodate data that changes over time. Most commonly, it is used when performing an append or overwrite operation, to automatically adjust the schema to include one or more new columns.

This capability is required in real-time data warehouse scenarios. Currently, the Flink and Spark engines do not support this feature.

Fine-grained fault-tolerant design

Flink's design is fault tolerance and rollback at the entire job level. If a task fails, the entire job will be rolled back and restarted. The design of SeaTunnel Engine takes into account that in the data synchronization scenario, in many q cases, the failure of a task should only need to focus on fault tolerance for tasks that have upstream and downstream relationships with it. Based on this design principle, SeaTunnel Engine will first generate a logical DAG according to the user-configured job configuration file, then optimize the logical DAG, and finally generate a pipeline (a connected subgraph in a job DAG) to call and execute jobs at the granularity. fault tolerance.

A typical usage scenario is:

Use the CDC connector to read data from MySQL's binlog and write it to another MySQL. If you use Flink or Spark engine, once the target MySQL cannot write, it will cause the task of CDC to read the binlog to be terminated. If MySQL is set If the expiration time of the log is set, the problem of the target MySQL is solved, but the log of the source MySQL is cleared, which leads to data loss and other problems.

SeaTunnel Engine will automatically optimize this synchronization task, automatically add the source to the target Cache, and then further optimize this job into two Pipelines, pipeline#1 is responsible for reading data from the CDC and writing it to the SeaTunnel Cache, and pipeline#2 is responsible for reading data from the SeaTunnel Cache Cache reads data and writes to target MySQL. If there is a problem with the target MySQL and cannot be written, the pipeline#2 of this synchronization job will be terminated, and the pipeline#1 will still run normally. This design fundamentally solves the above problems and is more in line with the processing logic of the data synchronization engine.

Dynamically share threads to reduce resource usage

SeaTunnel Engine's Task design uses shared thread technology. Different from Flink/Spark, SeaTunnel Engine does not simply allow a Task to occupy a thread, but through a dynamic perception method - Dynamic Thread Sharing (Dynamic Thread Sharing) To judge whether a Task should share a thread with other Tasks or should monopolize a thread.

Compared with single-threaded serial computing, multi-threaded parallel computing has better performance advantages, but if each Task uses an independent thread to run, when there are many tables for data synchronization and the number of Tasks is large, it will be in the Worker node Start very many threads on it. When the number of CPU cores is fixed, the more threads, the better. When the number of threads is too large, the CPU needs to spend a lot of time on thread context switching, which will affect computing performance.

Flink/Spark usually limits the maximum number of tasks running on each node. In this way, it can avoid starting too many threads. To run more tasks on one node, SeaTunnel Engine can share thread technology. Let those tasks with a small amount of data share threads, and tasks with a large amount of data exclusively use threads. This method makes it possible for SeaTunnel Engine to run hundreds or thousands of table synchronization tasks on one node, with less resource occupation. Complete the synchronization of more tables.

05 Basic functions and features

2.3.0 is the first official version of SeaTunnel Engine, which implements some basic functions. For the detailed design, please refer to: https://github.com/apache/incubator-seatunnel/issues/2272

[ Cluster Management ]

  • Support stand-alone operation
  • Support cluster operation
  • Autonomous cluster (no centralization), no need to specify a Master node for the SeaTunnel Engine cluster, SeaTunnel Engine elects the Master node by itself during operation and automatically selects a new Master node after the Master node hangs up.
  • Automatic discovery of cluster nodes, the nodes with the same cluster_name will automatically form a cluster.

[ Core function ]

  • Supports running jobs in Local mode. The cluster is automatically destroyed after the job runs.
  • It supports running jobs in Cluster mode (single machine or cluster) and submitting jobs to the SeaTunnel Engine service through SeaTunnel Client. After the job is completed, the service continues to run and waits for the next job submission.
  • Support offline batch synchronization.
  • Support real-time synchronization.
  • Batch and flow integration, all SeaTunnel V2 version connectors can run in SeaTunnel Engine.
  • Supports distributed snapshot algorithm cooperates with SeaTunnel V2 connector to support two-phase commit, and ensures data exactly-once.
  • Supports job invocation at the Pipeline level to ensure that it can be started even when resources are limited.
  • Supports job fault tolerance at the Pipeline level. The failure of a Task only affects the Pipeline it is in, and only the Task under the Pipeline needs to be rolled back.
  • Supports dynamic thread sharing to achieve real-time synchronization of a large number of small data sets.

06 Future optimization plan

  • Support Cache mode, and first support Kafka as Cache
  • Support JobHistory, support the persistence of JobHistory.
  • Support indicator (Reader Rows, QPS, Reader Bytes) monitoring and indicator query
  • Support dynamic modification of the execution plan.
  • Support CDC.
  • Support whole database synchronization
  • Support multi-table synchronization
  • Support for Schema Revolution

· 阅读需 12 分钟

Apache IoTDB (Internet of Things Database) is a software system that integrates the collection, storage, management, and analysis of time series data of the Internet of Things, which can meet the needs of massive data storage, high-speed data reading, and complex data analysis in the field of Industrial Internet of Things. Currently, SeaTunnel already supports IoTDB Connector, realizing the connection of data synchronization scenarios in the IoT field.

At the SeaTunnel community online meeting in October this year, SeaTunnel Committer Wang Hailin introduced the implementation process of SeaTunnel’s access to IoTDB, allowing users to have a deeper understanding of the operation method and principle of IoTDB data synchronization.

The topic I’m sharing today is using SeaTunnel to play around with data synchronization in IoTDB.

This session is divided into 6 subsections. Firstly, we will have an understanding of the basic concept of SeaTunnel, and on this basis, we will focus on the functional features of IoTDB Connector, then we will analyze the data read and write functions of IoTDB Connector and the parsing of the implementation, and finally, we will show some typical usage scenarios and cases to let you understand how to use Finally, we will show some typical usage scenarios and cases to understand how to use the IoTDB Connector to implement into production environments. The last point is the community’s next steps for the IoTDB Connector and guidance on how to get involved in contributing.

Introduction to SeaTunnel basic concepts

This is the basic architecture of SeaTunnel, an engine built for data synchronization, with a set of abstract APIs for reading data from and writing to a variety of data sources.

The left-hand side briefly lists the Source scenarios, for example, we abstract the Source’s API, Type, and State, to read the data source, unifying the data types of the various data sources to the abstract type defined in it, and some state recovery and retention of the read location during the reading process.

This is an abstraction for Source, and we have done a similar abstraction for Sink, i.e. how data is written, and how the data type matches the real data source type, and how the state is restored and retained.

Based on these APIs, we will have a translation layer to translate these APIs to the corresponding execution engine. SeaTunnel currently supports three execution engines, Spark, Flink, and our own execution engine, SeaTunnel Engine, which will be released soon.

This is roughly what SeaTunnel does, SeaTunnel relies on Source and Sink to read and write data for data synchronization, we call them Connectors. The Connector consists of a Source and a Sink.

From the diagram above we see the different data sources, Source is responsible for reading data from the various data sources and transforming it into SeaTunnelRow abstraction layer and Type to form the abstraction layer, Sink is responsible for pulling data from the abstraction layer and writing it to the concrete data store to transform it into the store concrete format.

The combination of Source + Abstraction Layer + Sink enables the synchronization of data between multiple heterogeneous data sources.

I’ll use a simple example below to illustrate how SeaTunnel’s Source and Sink work.

We can specify the number of Sources, Sink configuration file combinations through the configuration file The commands in the toolkit provided by SeaTunnel take the configuration file with them and when executed enable data handling.

This is the Connector ecosystem that is currently supported by SeaTunnel, such as the data sources supported by JBDC, HDFS, Hive, Pulsar, message queues, etc. are currently supported.

The list in the picture is not exhaustive of the Connectors supported by SeaTunnel. Under the GitHub SeaTunnel project, you can see the Plugins directory, where supported Connector plugins are constantly being added and where you can see the latest access in real-time.

IoTDB Connector Features

Below is information about access to the IoTDB Connector.

Firstly, we would like to introduce the functional features of IoTDB, the IoTDB Connector integrated with SeaTunnel, and what exactly it supports for your reference.

Source Features

Firstly, there are the typical usage scenarios supported by Source, such as bulk reading of devices, field projection, data type mapping, parallel reading, etc.

As you can see above, IoTDB supports all features except once, exactly once and stream mode, such as batch reads, IoTDB has a SQL syntax similar to group by device, which allows you to read data from multiple devices in a single batch. For basic data type projection, the SQL in IoTDB will take time by default when looking up any metric, or group by the device will take the device column, and we also support projection onto SeaTunnel columns by default.

The only data type not supported is Victor, all others are supported.

For the parallel read piece, the IoTDB data is actually timestamped and we use timestamped ranges to achieve parallel reads.

The recovery of the state, since we have divided the time range read into different splits, can be done based on the Split location information.

Sink functional features

The diagram above shows the features already supported by SeaTunnel. Regarding metadata extraction, we support the extraction of metadata such as measurement, device, etc. from SeaTunnelRow and the extraction or use of current processing time from SeaTunnelRow. Batch commits and exception retries are also supported.

IoTDB data reading analysis

Next, we analyze the implementation and support for data reading.

Data type mapping

The first is the data type mapping, which actually reads the IoTDB data type to SeaTunnel, so it has to be converted to the SeaTunnel data type. The BOOLEAN, INT32, INT64, etc. listed here all have corresponding SeaTunnel data types. INT32 can be mapped according to the read type on the SeaTunnel, or to TINYINT, SMALLINT, or INT when the range of values is small.

The Vector type is not currently supported.

This is the corresponding example code showing how the mapping is done where the type conversion is done.

Field projection

The other is the field projection when reading, we can automatically map Time fields when reading IoTDB data, or we can choose to map some of the data to SeaTunnel, such as TIMESTAMP, or BIGINT.

The SQL extraction of column codes allows you to extract only some of the columns you need, and when used on SeaTunnel, you can specify the name, type, etc. of the column after it is mapped to SeaTunnel via fields. The final result of the data read on SeaTunnel is shown in the figure above.

We have just seen that we do not have the time column in the SQL, but the actual result is that there is this column, so we support the projection of the time column field, the time column can actually be projected into different data types, the user can convert according to their needs. The diagram above shows the implementation logic.

Batch read Device

This is a common requirement, as we are likely to synchronize data in large batches with the same data structure.

SeaTunnel supports the align-by-device syntax so that device columns can also be projected onto the SeaTunnelRow

Assuming there is a table in IoTDB, we project the device column onto SeaTunnel by making it data as well through syntax. After configuring the device name column and specifying the data type, we end up reading the data on SeaTunnel in the format shown above, containing the Time, device column, and the actual data value. This makes it possible to read data from the same device in bulk.

Parallel reading

The other is a parallel read.

  • Split We have scoped the table by the Time column and if we are reading in parallel we may want to scope the table to allow parallel threads/processes to read a specific range of data. By configuring the three parameters, the end result will be a query SQL, where the original SQL is divided into different splits with query conditions to achieve the actual read SQL.

  • Allocate Split to the reader Once the split is done, there is an allocation logic to follow in order to distribute it to each parallel reader.

This logic is based on the ID of the split to the reader, which may be more random, or more uniform if the ID of the split is more hashed, depending on the Connector.

The result achieved is shown in the picture.

Status recovery

There is also state recovery involved when reading because if the task is large, the reading will take longer, and if there is an error or exception in the middle, you have to consider how to recover the state from the point where the error occurred, and then read it again afterward.

SeaTunnel’s state recovery is mainly through the reader storing the unread Split information into the state, and then the engine will periodically take a snapshot of the state when reading so that we can restore the last snapshot when we recover and continue reading afterward.

IoTDB Connector Data Write Analysis

The next step is the parsing of the data writes.

Data type mapping

Data writing also involves data type mapping, but here, in contrast to data reading, it maps the SeaTunnel data types to the IoTDB data types. As IoTDB only has INT32, the writing process involves lifting the data types TINYINT and SMALLINT. All other data types can be converted one-to-one; ARRAY and VECTOR data types are not yet supported.

The above diagram shows the corresponding code, the implementation logic will need to be seen in our specific mapping.

Dynamic injection of metadata

SeaTunnel supports the dynamic injection of metadata.

When heterogeneous data sources are written to the IoTDB, device, measurement, and time are extracted from each row of data, either by serializing the SeaTunnelRow with a fixed column value as configured. Alternatively, the system time can be used as the time, or the current system time can be populated if no time column is specified, and the storage group can be configured to be automatically appended to the device prefix.

For example, suppose that the structure of a row in SeaTunnel reading the data format shown above can be configured to synchronize to the IoTDB and the result obtained is as follows.

The temperature and humidity columns we need were extracted, and ts and device names were extracted as the original data for the IoTDB.

Batch commits and exception retries

In addition, Sink needs to handle batch and retry when writing. For batches, we can configure the appropriate batch configuration, including support for configuring the number and interval of batch commits; if the data is cached to memory, you can enable a separate thread for timed commits.

For retries, SeaTunnel supports the configuration of the number of retries, the waiting interval and the maximum number of retries, as well as the possibility to end a retry if it encounters a non-recoverable error when it has finished.

IoTDB Connector Usage Examples

After the previous analysis of reading and writing data, let’s look at three typical examples of usage scenarios.

Exporting data from IoTDB

The first scenario is exporting data from the IoTDB, the example I have given here is reading data from the IoTDB to the Console.

  • Read in parallel, output to Console

Parallelism: 2

Number of batches: 24

Time frame: 2022–09–25 ~ 2022–09–26

Let’s assume that we have a data table in IoTDB and we want to export the data to the Console. The whole configuration is shown above and needs to map the columns of data we want to export and the time range to check.

This is the simplest example, but in practice, the Sink side may be more complex, so you will need to refer to the documentation of the corresponding data source for the appropriate configuration.

Importing data to IoTDB

  • Read database, batch write to IoTDB
    • Batch writing: one commit every 1024 entries or every 1000 ms
    • -Extracting metadata device, timestamp, measurement
    • -Specify the storage group: root.test_group

Another typical usage scenario is to import data from other data sources into IoTDB. suppose I have an external database table with columns like ts, temperature, humidity, etc. and we import it into IoTDB, requiring the columns of temperature and humidity, but the rest can be left out. The whole configuration is shown in the diagram above, you can refer to it.

On the Sink side, you mainly have to specify the Key of the device column, such as from which data the device is extracted, from which class the time is extracted, which columns to write to the IoTDB, etc.

As you can see, we can configure the storage group, which is the storage group of the IoTDB, which can be specified by the storage group.

Synchronizing data between IoTDB

The third scenario is to synchronize data between IoTDB and IoTDB and write to IoTDB in bulk, suppose there is a table in IoTDB that needs to be synchronized to another IoTDB, after synchronization the storage group has changed and the name of the indicator of the data column has also changed, then you can use projection to rewrite the indicator name and use SQL to rewrite the storage group.

How to get involved in contribution

Finally, a few words about the next steps for the IoTDB Connector and how you can get involved in improving the Connector and contributing new features that are needed.

Next steps for the IoTDB Connector

  • Support for reading and writing vector data types
  • Support for tsfile reads and writes
  • Support for writing tsfile and reloading to IoTDB

· 阅读需 10 分钟

Apache SeaTunnel Committer | Zongwen Li

Introduction to Apache SeaTunnel

Apache SeaTunnel is a very easy-to-use ultra-high-performance distributed data integration platform that supports real-time synchronization of massive data.

Apache SeaTunnel will try its best to solve the problems that may be encountered in the process of mass data synchronization, such as data loss and duplication, task accumulation and delay, low throughput, etc.

Milestones of SeaTunnel

SeaTunnel, formerly known as Waterdrop, was open-sourced on GitHub in 2017.

In October 2021, the Waterdrop community joined the Apache incubator and changed its name to SeaTunnel.

SeaTunnel Growth

When SeaTunnel entered the Apache incubator, the SeaTunnel community ushered in rapid growth.

As of now, the SeaTunnel community has a total of 151 contributors, 4314 Stars, and 804 forks.

Pain points of Existing engines

There are many pain points faced by the existing computing engines in the field of data integration, and we will talk about this first. The pain points usually lie in three directions:

  • The fault tolerance ability of the engine;
  • Difficulty in configuration, operation, and maintenance of engine jobs;
  • The resource usage of the engine.

fault tolerance

Global Failover Global-failover For distributed streaming processing systems, high throughput and low latency are often the most important requirements. At the same time, fault tolerance is also very important in distributed systems. For scenarios that require high correctness, the implementation of exactly once is often very important.

In a distributed streaming processing system, since the computing power, network, load, etc. of each node are different, the state of each node cannot be directly merged to obtain a true global state. To obtain consistent results, the distributed processing system needs to be resilient to node failure, that is, it can recover to consistent results when it fails.

Although it is claimed in their official blog that Spark’s Structured Streaming uses the Chandy-Lamport algorithm for Failover processing, it does not disclose more details.

Flink implemented Checkpoint as a fault-tolerant mechanism based on the above algorithm and published related papers: Lightweight Asynchronous Snapshots for Distributed Dataflows

In the current industrial implementation, when a job fails, all nodes of the job DAG need to failover, and the whole process will last for a long time, which will cause a lot of upstream data to accumulate.

Loss of Data

The previous problem will cause a long-time recovery, and the business service may accept a certain degree of data delay.

In a worse case, a single sink node cannot be recovered for a long time, and the source data has a limited storage time, such as MySQL and Oracle log data, which will lead to data loss.

Configuration is cumbersome

Single table Configuration

The previous examples are cases regarding a small number of tables, but in real business service development, we usually need to synchronize thousands of tables, which may be divided into databases and tables at the same time;

The status quo is that we need to configure each table, a large number of table synchronization takes a lot of time for users, and it is prone to problems such as field mapping errors, which are difficult to maintain.

Not supporting Schema Evolution

Not-supports-DDL Besides, according to the research report of Fivetran, 60% of the company’s schema will change every month, and 30% will change every week.

However, none of the existing engines supports Schema Evolution. After changing the Schema each time, the user needs to reconfigure the entire link, which makes the maintenance of the job very cumbersome.

The high volume of resource usage

The database link takes up too much

If our Source or Sink is of JDBC type, since the existing engine only supports one or more links per table, when there are many tables to be synchronized, more link resources will be occupied, which will bring a great burden to the database server.

Operator pressure is uncontrollable

In the existing engine, a buffer and other control operators are used to control the pressure, that is, the back pressure mechanism; since the back pressure is transmitted level by level, there will be pressure delay, and at the same time, the processing of data will not be smooth enough, increasing the GC time, fault-tolerant completion time, etc.

Another case is that neither the source nor the sink has reached the maximum pressure, but the user still needs to control the synchronization rate to prevent too much impact on the source database or the target database, which cannot be controlled through the back pressure mechanism.

Architecture goals of Apache SeaTunnel Engine

To solve these severe issues faced by computing engines, we self-developed our engine expertise in big data integration.

Firstly, let’s get through what goals this engine wants to achieve.

Pipeline Failover

In the data integration case, there is a possibility that a job can synchronize hundreds of sheets, and the failure of one node or one table will lead to the failure of all tables, which is too costly.

We expect that unrelated Job Tasks will not affect each other during fault tolerance, so we call a vertex collection with upstream and downstream relationships a Pipeline, and a Job can consist of one or more pipelines.

Regional Failover

Now if there is an exception in the pipeline, we still need to failover all the vertex in the pipeline; but can we restore only part of the vertex? For example, if the Source fails, the Sink does not need to restart. In the case of a single Source and multiple Sinks, if a single Sink fails, only the Sink and Source that failed will be restored; that is, only the node that failed and its upstream nodes will be restored.

Obviously, the stateless vertex does not need to be restarted, and since SeaTunnel is a data integration framework, we do not have aggregation state vertexes such as Agg and Count, so we only need to consider Sink;

  • Sink does not support idempotence & 2PC; no restart and restart will result in the same data duplication, which can only be solved by Sink without restarting;
  • Sink supports idempotence, but does not support 2PC: because it is idempotent writing, it does not matter whether the source reads data inconsistently every time, and it does not need to be restarted;
  • Sink supports 2PC:
  • If the Source supports data consistency, if an abort is not executed, the processed old data will be automatically ignored through the channel data ID, and at the same time, it will face the problem that the transaction session time may time out;
  • If the Source does not support data consistency, perform abort on the Sink to discard the last data, which has the same effect as restarting but does not require initialization operations such as re-establishing links;
  • That is, the simplest implementation is to execute abort. We use the pipeline as the minimum granularity for fault-tolerant management, and use the Chandy-Lamport algorithm to realize fault-tolerant distributed jobs.

Data Cache

For sink failure, when data cannot be written, a possible solution is to work two jobs at the same time.

One job reads the database logs using the CDC source connector and then writes the data to Kafka using the Kafka Sink connector. Another job reads data from Kafka using the Kafka source connector and writes data to the destination using the destination sink connector.

This solution requires users to have a deep understanding of the underlying technology, and both tasks will increase the difficulty of operation and maintenance. Because every job needs JobMaster, it requires more resources.

Ideally, the user only knows that they will be reading data from the source and writing data to the sink, and at the same time, during this process, the data can be cached in case the sink fails. The sync engine needs to automatically add caching operations to the execution plan and ensure that the source still works in the event of a sink failure. In this process, the engine needs to ensure that the data written to the cache and read from the cache are transactional, to ensure data consistency.

Sharding & Multi-table Sync

For a large number of table synchronization, we expect that a single Source can support reading multiple structural tables, and then use the side stream output to keep consistent with a single table stream.

The advantage of this is that it can reduce the link occupation of the data source and improve the utilization rate of thread resources.

At the same time, in SeaTunnel Engine, these multiple tables will be regarded as a pipeline, which will increase the granularity of fault tolerance; there are trade-offs, and the user can choose how many tables a pipeline can pass through.

Schema Evolution

Schema Evolution is a feature that allows users to easily change the current schema of a table to accommodate changing data over time. Most commonly, it is used when performing an append or overwrite operation, to automatically adjust the schema to include one or more new columns.

This feature is required for real-time data warehouse scenarios. Currently, the Flink and Spark engines do not support this feature.

In SeaTunnel Engine, we will use the Chandy-Lamport algorithm to send DDL events, make them flow in the DAG graph and change the structure of each operator, and then synchronize them to the Sink.

Shared Resource

Shared-resource The Multi-table feature can reduce the use of some Source and Sink link resources. At the same time, we have implemented Dynamic Thread Resource Sharing in SeaTunnel Engine, reducing the resource usage of the engine on the server.

Speed Control

As for the problems that cannot be solved by the back pressure mechanism, we will optimize the Buffer and Checkpoint mechanism:

  • Firstly, We try to allow Buffer to control the amount of data in a period;
  • Secondly, by the Checkpoint mechanism, the engine can lock the buffer after the Checkpoint reaches the maximum number of parallelism and executes an interval time, prohibiting the writing of Source data, achieving the result of taking the pressure proactively, avoiding issues like back pressure delay or failure to be delivered to Source. The above is the design goal of SeaTunnel Engine, hoping to help you better solve the problems that bother you in data integration. In the future, we will continue to optimize the experience of using SeaTunnel so that more people are willing to use it.

The future of Apache SeaTunnel

As an Apache incubator project, the Apache SeaTunnel community is developing rapidly. In the following community planning, we will focus on four directions:

Support more data integration scenarios (Apache SeaTunnel Engine) It is used to solve the pain points that existing engines cannot solve, such as the synchronization of the entire database, the synchronization of table structure changes, and the large granularity of task failure;

Guys who are interested in the engine can pay attention to this Umbrella: https://github.com/apache/incubator-seatunnel/issues/2272

Expand and improve Connector & Catalog ecology Support more Connector & Catalog, such as TiDB, Doris, Stripe, etc., and improve existing connectors, improve their usability and performance, etc.; Support CDC connector for real-time incremental synchronization scenarios.

Guys who are interested in connectors can pay attention to this Umbrella: https://github.com/apache/incubator-seatunnel/issues/1946

Support for more versions of the engines Such as Spark 3.x, Flink 1.14.x, etc.

Guys who are interested in supporting Spark 3.3 can pay attention to this PR: https://github.com/apache/incubator-seatunnel/pull/2574

Easier to use (Apache SeaTunnel Web) Provides a web interface to make operations more efficient in the form of DAG/SQL Simple and more intuitive display of Catalog, Connector, Job, etc.; Access to the scheduling platform to make task management easier

Guys who are interested in Web can pay attention to our Web sub-project: https://github.com/apache/incubator-seatunnel-web

· 阅读需 19 分钟

Bo Bi, data engineer at Mafengwo

During the joint Apache SeaTunnel & IoTDB Meetup on October 15, Bo Bi, the data engineer at a leading Chinese travel-social e-commerce platform Mafengwo, introduced the basic principles of SeaTunnel and related enterprise practice thinking, the pain points and optimization thinking in typical scenarios of Mafengwo’s big data development and scheduling platform, and shared his experience of participating in community contributions. We hope to help you understand SeaTunnel and the paths and skills of community building at the same time.

Introduction to the technical principle of SeaTunnel

SeaTunnel is a distributed, high-performance data integration platform for the synchronization and transformation of large volumes of data (offline and real-time)

The diagram above shows the workflow of SeaTunnel, which in simple terms consists of 3 parts: input, transformation, and output; more complex data processing is just a combination of several actions.

In a synchronization scenario, such as importing Kafka to Elasticsearch, Kafka is the Source of the process and Elasticsearch is the Sink of the process.

If, during the import process, the field columns do not match the external data columns to be written and some column or type conversion is required, or if you need to join multiple data sources and then do some data widening, field expansion, etc., then you need to add some Transform in the process, corresponding to the middle part of the picture.

This shows that the core of SeaTunnel is the Source, Transform and Sink process definitions.

In Source we can define the data sources we need to read, in Sink, we can define the data pipeline and eventually write the external storage, and we can transform the data in between, either using SQL or custom functions.

SeaTunnel Connector API Version V1 Architecture Breakdown

For a mature component framework, there must be something unique about the design pattern of the API design implementation that makes the framework scalable.

The SeaTunnel architecture consists of three main parts.

1、SeaTunnel Basic API.

  1. the implementation of the SeaTunnel base API.

  2. SeaTunnel’s plug-in system.

SeaTunnel Basic API

The above diagram shows the definition of the interface, the Plugin interface in SeaTunnel abstracts the various actions of data processing into a Plugin.

The five parts of the diagram below, Basesource, Basetransfform, Basesink, Runtimeenv, and Execution, all inherit from the Plugin interface.

As a process definition plug-in, Source is responsible for reading data, Transform is responsible for transforming, Sink is responsible for writing and Runtimeenv is setting the base environment variables.

The overall SeaTunnel base API is shown below

Execution, the data flow builder used to build the entire data flow based on the first three, is also part of the base API

SeaTunnel Base API Implementation

Based on the previous basic APIs, SeaTunnel has been implemented in separate packages for different computing engines, currently the Spark API abstraction and the Flink API abstraction, which logically completes the process of building the data pipeline.

Due to space constraints, we will focus on Spark batch processing. Based on the wrapped implementation of the previous base Api, the first is that Base spark source implements Base source, base Spark transform implements Base transform and Base Spark sink implements Base sink.

The method definition uses Spark’s Dataset as the carrier of the data, and all data processing is based on the Dataset, including reading, processing and exporting.

The SparkEnvironment, which internally encapsulates Spark’s Sparksession in an Env, makes it easy for individual plugins to use.

The Spark batch process ends with SparkBatchExecution (the data stream builder), which is the core code snippet used to functionally build our data stream Pipeline, the most basic data stream on the left in the diagram below.

The user-based definition of each process component is also the configuration of Source Sink, Transform. More complex data flow logic can be implemented, such as multi-source Join, multi-pipeline processing, etc., all of which can be built through Execution.

SeaTunnel Connector V1 API Architecture Summary

SeaTunnel’s API consists of three main parts.

The first part is the SeaTunnel base API, which provides the basic abstract interfaces such as Source, Sink, Transform, and Plugin.

The second part is based on a set of interfaces Transform, Sink, Source, Runtime, and Execution provided by the SeaTunnel base API, which is wrapped and implemented on the Flink and Spark engines respectively, i.e. Spark engine API layer abstraction and Flink engine API layer abstraction.

Both Flink and Spark engines support stream and batch processing, so there are different ways to use streams/batches under the Flink API abstraction and Spark abstraction APIs, such as Flinkstream and Flinkbatch under the Flink abstraction API, and Sparkbatch and Sparkstreaming under the Spark abstraction API.

The third part is the plug-in system, based on Spark abstraction and Flink API abstraction, SeaTunnel engine implements rich connectors and processing plug-ins, while developers can also be based on different engine API abstractions, and extensions to achieve their own Plugin.

SeaTunnel Implementation Principle Currently, SeaTunnel offers a variety of ways to use Flink, Spark, and FlinkSQL. Due to space limitations, we will introduce the execution principles of the Spark method.

First, the entry starts the command Start-seatunnel-spark.sh via the shell, which internally calls Sparkstarter’s Class, which parses the parameters passed by the shell script, and also parses the Config file to determine which Connectors are defined in the Config file, such as Fake, Console, etc. Then find the Connector path from the Connector plugin directory and stitch it into the Spark-submit launch command with — jar, so that the found Plugin jar package can be passed to the Spark cluster as a dependency.

For Connector plugins, all Spark Connectors are packaged in the plugin directory of the distribution (this directory is managed centrally).

After Spark-submit is executed, the task is submitted to the Spark cluster, and the Main class of the Spark job’s Driver builds the data flow Pipeline through the data flow builder Execution, combined with Souce, Sink, and Transform so that the whole chain is connected.

SeaTunnel Connector V2 API Architecture

In the latest community release of SeaTunnel 2.2.0-beta, the refactoring of the Connectorapi, now known as the SeaTurnelV2 API, has been completed!

Why do we need to reconfigure?

As the Container is currently a strongly coupled engine, i.e. Flink and Spark API, if the Flink or Spark engine is upgraded, the Connector will also have to be adjusted, possibly with changes to parameters or interfaces.

This can lead to multiple implementations for different engines and inconsistent parameters to develop a new Connector. Therefore, the community has designed and implemented the V2 version of the API based on these pain points.

SeaTunnel V2 API Architecture

SeaTunnel V2 API Architecture

1.Table API

·DataType: defines SeaTunnel’s data structure SeaTunnelRow, which is used to isolate the engine

·Catalog: used to obtain Table Scheme, Options, etc..

·Catalog Storage: used to store user-defined Table Schemes etc. for unstructured engines such as Kafka.

·Table SPI: mainly used to expose the Source and Sink interfaces as an SPI

2. Source & Sink API

Define the Connector’s core programming interface for implementing the Connector

3.Engine API

·Translation: The translation layer, which translates the Source and Sink APIs implemented by the Connector into a runnable API inside the engine.

·Execution: Execution logic, used to define the execution logic of Source, Transform, Sink and other operations within the engine.

The Source & Sink API is the basis for the implementation of the connector and is very important for developers.

The design of the v2 Source & Sink API is highlighted below

SeaTunnel Connector V2 Source API

The current version of SeaTunnel’s API design draws on some of Flink’s design concepts, and the more core classes of the Source API are shown below.

The core Source API interaction flow is shown above. In the case of concurrent reads, the enumerator SourceSplitEnumerator is required to split the task and send the SourceSplit down to the SourceReader, which receives the split and uses it to read the external data source.

In order to support breakpoints and Eos semantics, it is necessary to preserve and restore the state, for example by preserving the current Reader’s Split consumption state and restoring it after a failure in each Reader through the Checkpoint state and Checkpoint mechanism, so that the data can be read from the place where it failed.

SeaTunnel Connector V2 Sink API

The overall Sink API interaction flow is shown in the diagram below. The SeaTunnel sink is currently designed to support distributed transactions, based on a two-stage transaction commit.

First SinkWriter continuously writes data to an external data source, then when the engine does a checkpoint, it triggers a first-stage commit.

SinkWriter needs to do a Prepare commit, which is the first stage of the commit.

The engine will determine if all the Writer's first stage succeeds, and if they all succeed, the engine will combine the Subtask’s Commit info with the Commit method of the Committer to do the actual commit of the transaction and operate the database for the Commit, i.e. the second stage of the commit. This is the second stage of commit.

For the Kafka sink connector implementation, the first stage is to do a pre-commit by calling KafkaProducerSender.prepareCommit().

The second commit is performed via Producer.commitTransaction();.

flush(); flushes the data from the Broker’s system cache to disk.

Finally, it is worth noting!

Both SinkCommitter and SinkAggregatedCommitter can perform a second stage commit to replace the Committer in the diagram. The difference is that SinkCommitter can only do a partial commit of a single Subtask’s CommitInfo, which may be partially successful and partially unsuccessful, and cannot be handled globally. The difference is that the SinkCommitter can only do partial commits of a single Subtask’s CommitInfo, which may be partially successful and partially unsuccessful.

SinkAggregatedCommitter is a single parallel, aggregating the CommitInfo of all Subtask, and can do the second stage commit as a whole, either all succeed or all fail, avoiding the problem of inconsistent status due to partial failure of the second stage.

It is therefore recommended that the SinkAggregatedCommitter be used in preference.

Comparison of SeaTunnel V1 and V2 API processing flows

We can look at the changes before and after the V1 V2 upgrade from a data processing perspective, which is more intuitive, Spark batch processing as an example: SeaTunnel V1: The entire data processing process is based on the Spark dataset API, and the Connector and the compute engine are strongly coupled.

SeaTunnel V2: Thanks to the work of the engine translator, the Connector API, and the SeaTunnelRow, the data source of the SeaTunnel internal data structures accessed through the Connector, are translated by the translation layer into a runnable Spark API and spark dataset that is recognized inside the engine during data transformation.

As data is written out, the Spark API and Spark dataset are translated through the translation layer into an executable connector API inside the SeaTunnel connector and a data source of internal SeaTunnel structures that can be used.

Overall, the addition of a translation layer at the API and compute engine layers decouples the Connector API from the engine, and the Connector implementation no longer depends on the compute engine, making the extension and implementation more flexible.

In terms of community planning, the V2 API will be the main focus of development, and more features will be supported in V2, while V1 will be stabilized and no longer maintained.

Practice and reflections on our off-line development scheduling platform

Practice and reflections on our off-line development scheduling platform

Hornet’s Nest Big Data Development Platform, which focuses on providing one-stop big data development and scheduling services, helps businesses solve complex problems such as data development management, task scheduling and task monitoring in offline scenarios.

The offline development and scheduling platform plays the role of the top and the bottom. The top is to provide open interface API and UI to connect with various data application platforms and businesses, and the bottom is to drive various computations and storage, and then run in an orderly manner according to the task dependency and scheduling time.

Platform Capabilities

Data development

Task configuration, quality testing, release live

·Data synchronisation

Data access, data processing, data distribution

·Scheduling capabilities

Supports timed scheduling, triggered scheduling

·Operations and Maintenance Centre Job Diagnosis, Task O&M, Instance O&M

·Management

Library table management, permission management, API management, script management

In summary, the core capabilities of the offline development scheduling platform are openness, versatility, and one-stop shopping. Through standardized processes, the entire task development cycle is managed and a one-stop service experience is provided.

The architecture of the platform

The Hornet’s Nest Big Data Development and Scheduling Platform consists of four main modules: the task component layer, the scheduling layer, the service layer, and the monitoring layer.

The service layer is mainly responsible for job lifecycle management (e.g. job creation, testing, release, offline); Airflow dagphthon file building and generating, task bloodline dependency management, permission management, API (providing data readiness, querying of task execution status).

The scheduling layer is based on Airflow and is responsible for the scheduling of all offline tasks.

A task component layer that enables users to develop data through supported components that include tools such as SparkSQL/, HiveSQ, LMR), StarRocks import, etc., directly interfacing with underlying HDFS, MySQL, and other storage systems.

The monitoring layer is responsible for all aspects of monitoring and alerting on scheduling resources, computing resources, task execution, etc.

Open Data Sync Capability Scenarios

Challenges with open capabilities: Need to support multiple business scenarios and meet flexible data pipeline requirements (i.e. extend to support more task components such as hive2clickhourse, clickhourse2mysql, etc.)

Extending task components based on Airflow: higher maintenance costs for extensions, need to reduce costs and increase efficiency (based on the limited provider's Airflow offers, less applicable in terms of usage requirements, Airflow is a Python technology stack, while our team is mainly based on the Java technology stack, so the technology stack difference brings higher iteration costs)

Self-developed task components: the high cost of platform integration, long development cycle, high cost of the configuration of task components. (Research or implement task components by yourself, different ways of adapting the parameters of the components in the service layer, no uniform way of parameter configuration)

We wanted to investigate a data integration tool that, firstly, supported a rich set of components, provided out-of-the-box capabilities, was easy to extend, and offered a uniform configuration of parameters and a uniform way of using them to facilitate platform integration and maintenance.

  • Selection of data integration tools To address the pain points mentioned above, we actively explored solutions and conducted a selection analysis of several mainstream data integration products in the industry. As you can see from the comparison above, Datax and SeaTunnel both offer good scalability, and high stability, support rich connector plugins, provide scripted, uniformly configurable usage, and have active communities.

However, Datax is limited by being distributed and is not well suited to massive data scenarios.

In contrast, SeaTunnel offers the ability to provide distributed execution, distributed transactions, scalable levels of data handling, and the ability to provide a unified technical solution in data synchronization scenarios.

In addition to the advantages and features described above and the applicable scenarios, more importantly, the current offline computing resources for big data are unified and managed by yarn, and for the subsequently extended tasks we also wish to execute on Yarn, we finally prefer SeaTunnel for our usage scenarios.

Further performance testing of SeaTunnel and the development of an open data scheduling platform to integrate SeaTunnel may be carried out at a later stage, and its use will be rolled out gradually.

Outbound scenario: Hive data sync to StarRocks

To briefly introduce the background, the Big Data platform has now completed the unification of the OLAP engine layer, using the StarRocks engine to replace the previous Kylin engine as the main query engine in OLAP scenarios.

In the data processing process, after the data is modelled in the data warehouse, the upper model needs to be imported into the OLAP engine for query acceleration, so there are a lot of tasks to push data from Hive to StarRocks every day. task (based on a wrapper for the StarRocks Broker Load import method) to a StarRocks-based table.

The current pain points are twofold.

·Long data synchronization links: Hive2StarRocks processing links, which require at least two tasks, are relatively redundant.

·Outbound efficiency: From the perspective of outbound efficiency, many Hive models themselves are processed by Spark SQL, and based on the processing the Spark Dataset in memory can be pushed directly to StarRocks without dropping the disk, improving the model’s regional time.

StarRocks currently also supports Spark Load, based on the Spark bulk data import method, but our ETL is more complex, needs to support data conversion multi-table Join, data aggregation operations, etc., so temporarily can not meet.

We know from the SeaTunnel community that there are plans to support the StarRocks Sink Connector, and we are working on that part as well, so we will continue to communicate with the community to build it together later.

How to get involved in community building

SeaTunnel Community Contribution

As mentioned earlier, the community has completed the refactoring of the V1 to V2 API and needs to implement more connector plug-ins based on the V2 version of the connector API, which I was lucky enough to contribute to.

I am currently responsible for big data infrastructure work, which many mainstream big data components big data also use, so when the community proposed a connector issue, I was also very interested in it.

As the platform is also investigating SeaTunnel, learning and being able to contribute pr to the community is a great way to learn about SeaTunnel.

I remember at first I proposed a less difficult pr to implement the WeChat sink connector, but in the process of contributing I encountered many problems, bad coding style, code style did not take into account the rich output format supported by the extension, etc. Although the process was not so smooth, I was really excited and accomplished when the pr was merged. Although the process was not so smooth, it was very exciting and rewarding when the pr was merged.

As I became more familiar with the process, I became much more efficient at submitting pr and was confident enough to attempt difficult issues.

How to get involved in community contributions quickly

  • Good first issue Good first issue #3018 #2828

If you are a first-time community contributor, it is advisable to focus on the Good first issue first, as it is basically a relatively simple and newcomer-friendly issue.

Through Good first issue, you can get familiar with the whole process of participating in the GitHub open source community contribution, for example, first fork the project, then submit the changes, and finally submit the pull request, waiting for the community to review, the community will target to you to put forward some suggestions for improvement, directly will leave a comment below, until when your pr is merged in, this will have completed a comp

  • Subscribe to community mailings Once you’re familiar with the pr contribution process, you can subscribe to community emails to keep up to date with what’s happening in the community, such as what features are currently being worked on and what’s planned for future iterations. If you’re interested in a feature, you can contribute to it in your own situation!
  • Familiarity with git use The main git commands used in development are git clone, git pull, git rebase and git merge. git rebase is recommended in the community development specification and does not generate additional commits compared to git merge.
  • Familiarity with GitHub project collaboration process Open source projects are developed collaboratively by multiple people, and the collaboration method on GitHub is at its core outlined in fork For example, the apache st project, which is under the apache space, is first forked to our own space on GitHub

Then modify the implementation, mention a pull request, and submit the pull request to be associated with the issue, in the commit, if we change a long time, in the upward commit, then the target branch has a lot of new commits exhausted this time we need to do a pull& merge or rebase.

  • Source code compilation project It is important to be familiar with source compilation, as local source compilation can prove that the code added to a project can be compiled, and can be used as a preliminary check before committing to pr. Source compilation is generally slow and can be speeded up by using mvn -T for multi-threaded parallel compilation.
  • Compilation checks Pre-compilation checks, including Licence header, Code checkstyle, and Document checkstyle, will be checked during Maven compilation, and if they fail, the CI will not be passed. So it is recommended to use some plug-in tools in the idea to improve the efficiency, such as Code checkstyle has a plug-in to automatically check the code specification, Licence header can add code templates in the idea, these have been shared by the community before how to do!
  • Add full E2E

Add full E2E testing and ensure that the E2E is passed before the Pull request.

Finally, I hope more students will join the SeaTunnel community, where you can not only feel the open-source spirit and culture of Apache but also understand the management process of Apache projects and learn good code design ideas.

We hope that by working together and growing together, we can build SeaTunnel into a top-notch data integration platform.

· 阅读需 4 分钟

After days of community development, the preliminary development of the new Connector API of SeaTunnel is completed. The next step is to adapt this new connector. In order to aid the developers to use this connector, this article provides guide to develop a new API.

Priliminary Setup

  • Environment configuration: JDK8 and Scala2.11 are recommended.

  • As before, we need to download the latest code locally through git and import it into the IDE, project address: https://github.com/apache/incubator-seatunnel . At the same time, switch the branch to api-draft, and currently use this branch to develop the new version of the API and the corresponding Connector. The project structure is as follows:

    Project Structure

Prerequisites

  • At present, in order to distinguish different Connectors, we put the connectors that support

    • Flink/Spark under the seatunnel-connectors/seatunnel-connectors-flink(spark) module.
    • New version of the Connector is placed under the seatunnel-connectors/seatunnel-connectors-seatunnel module.

    As we can see from the above figure, we have implemented Fake, Console, Kafka Connector, and Clickhouse Connector is also being implemented.

  • At present, the data type we support is SeaTunnelRow, so no matter the type of data generated by the Source or the type of data consumed by the Sink, it should be SeaTunnelRow.

Development of Connector

Taking Fake Connector as an example, let's introduce how to implement a new Connector:

  • Create a corresponding module with a path under seatunnel-connectors-seatunnel, which is at the same level as other new connectors.

  • Modify the seatunnel-connectors-seatunnel/pom.xml file, add a new module to modules, modify seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml, add seatunnel-api dependencies, and correct parent Quote. The resulting style is as follows:

    Style

  • The next step is to create the corresponding package and related classes, create FakeSource, and need to inherit SeaTunnel Source.

    • Note : The Source of SeaTunnel adopts the design of stream and batch integration. The Source of SeaTunnel determines whether current Source is a stream or batch through attribute getBoundedness.

    So you can specify a Source as a stream or batch by dynamic configuration (refer to the default method). The configuration defined by the user in the configuration file can be obtained through the prepare method to realize the customized configuration.

    Then create FakeSourceReader, FakeSource SplitEnumerator, and FakeSourceSplit to inherit the corresponding abstract classes (which can be found in the corresponding classes). As long as we implement the corresponding methods of these classes, then our SeaTunnel Source Connector is basically completed.

  • Next, just follow the existing example to write the corresponding code. The most important one is the FakeSource Reader, which defines how we obtain data from the outside, which is the most critical part of the Source Connector. Every time a piece of data is generated, we need to place it in the collector as shown:

    Source

  • After the code development is complete, we need to configure the configuration file plugin-mapping.properties located under seatunnel-connectors/modules. Adding a seatunnel .source.FakeSource = seatunnel-connector-fake means that SeaTunnel can find the jar package corresponding to the project by looking for a Source named FakeSource. This allows the Connector to be used in the normal configuration file.

  • For a detailed description of writing Source and Sink and SeaTunnel API, please refer to the introduction at seatunnel-connectors/seatunnel-connectors-seatunnel/ README.zh.md.

Connector Testing

  • For testing, we can find the seatunnel-flink(spark)-new-connector-example module in seatunnel-examples, and test it against different engines to ensure that the performance of the Connector is as consistent as possible. If you find any discrepancies, you can mark them in the document, modify the configuration file under resource, add our Connector to the configuration, and introduce seatunnel-flink(spark)-new-connector-example/pom.xml dependency, you can execute SeaTunnelApiExample to test.
  • The default is stream processing mode, and the execution mode is switched to batch mode by modifying job.mode=BATCH in the environment of the configuration file.

Submit PR

When our Connector is ready, we can submit PR to github. After reviewing by other partners, our contributed Connector will become part of SeaTunnel!

· 阅读需 5 分钟

1

可管理,可调用,可计算,可变现的数据资源才能成为资产,信息系统的互联互通使得多源和多维度的数据集成需求巨大,这就对数据处理和集成的工具提出了严苛的要求。

智能化时代,在“智慧城市”、“智慧治理”、“产品智能化”等的趋势下,企业大多面临如何实现高效数据推送,提高平台质量,以及保障数据安全的挑战。选对数据集成工具和平台,数据才能发挥出做大的作用。

Apache SeaTunnel (Incubating) 作为下一代高性能、分布式、海量数据集成框架,致力于让数据同步更简单,更高效,加快分布式数据处理能力在生产环境落地。

在 Apache SeaTunnel(Incubating) Meetup(2022 年 4 月 16日),Apache SeaTunnel(Incubating) 社区将邀请了 Apache SeaTunnel(Incubating)的资深用户,分享 Apache SeaTunnel(Incubating)在智能化生产环境中落地的最佳实践。此外,还会有贡献者现场进行 Apache SeaTunnel(Incubating)的源码解析,让你对 Apache SeaTunnel(Incubating)有一个更加全面而深入的了解。

无论你是对 Apache SeaTunnel(Incubating)抱有兴趣的初学者,还是在日常的生产实践中遭遇了复杂棘手的部署问题,都可以来到这里,与我们的讲师近距离沟通,得到你想要的答案。

01 报 名 通 道

Apache SeaTunnel (Incubating) Meetup | 4 月线上直播报名通道已开启,赶快预约吧!

时间:2022-4-16 14:00-17:00

形式:线上直播

点击链接或扫码预约报名(免费):

2

扫码预约报名

3

扫码进直播群

02 活 动 亮 点

  • 行业案例详解
  • 特色功能分析
  • 一线企业踩坑心得
  • 开源社区实战攻略
  • 行业技术专家面对面 Q&A
  • 惊喜礼品送不停

03 活 动 议 程

活动当天,将有来自孩子王、oppo 的工程师现场分享来自厂商的一线前沿实践经验,还有来自白鲸开源的高级工程师对 Apache SeaTunnel(Incubating)的重要功能更新进行“硬核”讲解,干货满满。

4

袁洪军 孩子王 大数据专家、OLAP 平台架构师

多年大数据平台研发管理经验,在数据资产、血缘图谱、数据治理、OLAP 等领域有着丰富的研究经验

演讲时间:14:00-14:40

演讲题目:Apache SeaTunnel(Incubating) 在孩子王的应用实践

演讲概要: 如何实现高效数据推送?如何提高平台质量?如何保障数据安全?孩子王又对 Apache SeaTunnel(Incubating)做了哪些改造?

6

范佳 白鲸开源  高级工程师 Apache SeaTunnel Contributor

演讲时间: 14:40-15:20

演讲题目: 基于 Apache SeaTunnel(Incubating)的 Clickhouse Bulk Load 实现

演讲概要: 通过扩展 Apache SeaTunnel(Incubating)的 Connector实现 Clickhouse的 bulk load 数据同步功能。

7

王子超 oppo 高级后端工程师

演讲时间: 15:50-16:30

演讲题目: oppo智能推荐样本中心基于 Apache SeaTunnel(Incubating)的技术革新

演讲概要: 介绍 oppo 智能推荐机器学习样本流程的演进及 Apache  SeaTunnel(Incubating) 在其中发挥的作用。

除了精彩的演讲之外,现场还设置了多个抽奖环节,参与抽奖有机会获得 Apache SeaTunnel(Incubating) 精美定制礼品,敬请期待~