Skip to main content

· 13 min read

SeaTunnel Connector Acess Plan

During the recent live event of the SeaTunnel Connector Access Plan, Beluga open source engineer Wang Hailin shared the “SeaTunnel Connector Access Plan and Development Guide to Avoiding Pit,” and taught everyone how to develop a connector from scratch, including the whole process — from preparation to testing, and final PR.

Speaker

Wang Hailin

Wailin Hailin is an open-source enthusiast, SkyWalking Committer, DolphinScheduler, and SeaTunnel contributor. His current work focuses on performance monitoring, data processing, and more. He likes to study related technical implementations and participate in community exchanges and contributions.

This presentation is divided into 5 parts:

  1. About the connector access incentive program
  2. Preparation before claiming/developing connector
  3. Small things in development
  4. Considerations for writing E2E Tests
  5. Preparations to submit a PR

1. About the Connector Access Incentive Plan

Firstly, let me introduce the SeaTunnel Connector Access Incentive Program, and the steps to develop a connector from start to finish (even for novices). This includes the whole process of preparation for development, testing, and final PR.

The SeaTunnel community released a new connector API not long ago, which supports running on various engines, including Flink, Spark, and more. This eliminates the need for repeated development of the old version.

After the new API is released, the old connector needs to be migrated, or the new connector should be supported.

In order to motivate the community to actively participate in the SeaTunnel Connector Access work and help build SeaTunnel into a more efficient data integration platform, the SeaTunnel community-initiated activities, sponsored by Beluga Open Source.

The activities have three modes: simple, medium, and hard for the task of accessing the connector. The threshold is low.

You can see which tasks need to be claimed on the activity issue list, as well as segmentation based on difficulty and priority. You can choose the activity you are comfortable with. You can start contributing based on the difficulty level.

The ecological construction of SeaTunnel can become more complete and advanced only with the help of your contributions. You are welcome to participate actively.

In order to express our gratitude, our event has set up a link where points can be exchanged for physical prizes. The more points you get, the more prized you can win!

Presently, we’ve seen many small partners participate in the event and submit their connectors. It’s not too late to join as there is still a significant period of time before the event ends. Based on the difficulty of the activity, the deadline may be relaxed or extended.

2. Preparations Before Claiming/Developing Connectors

So, how do you get involved with this amazing activity?

By first getting to know the basics of a connector.

01. What is a connector?

A connector is composed of Source and SInk (Source + Sink).

In the above figure, the connectors are connected to various data sources at the upper and lower layers. The source is responsible for reading data from external data sources, while the sink is responsible for writing data to external sources.

There is also an abstraction layer between the source and the sink.

Through this abstraction later, the data types of various data sources can be uniformly converted into the data format of SeaTunnelRow. This allows users to arbitrarily assemble various sources and sinks, so as to realize the integration of heterogeneous data sources, and data synchronization between multiple data sources.

02. How to claim a connector

After understanding the basic concepts, the next step is to claim the connector.

GitHub link: https://github.com/apache/incubator-seatunnel/issues/1946

You can use the above-mentioned GitHub link to see our plans for connecting to the connector. You can make any additions at any time.

First, find a connector that has not been claimed. To avoid conflicts, search the entire issue to see if anyone has submitted a PR.

After claiming the connector, we suggest that you create an issue of the corresponding feature, synchronize the problems you encountered in the development, and discuss the design of your solution.

If you encounter any problems and need help, you can describe them in the issue, and the community can take it up together. Participate in the discussions to help solve the problem. This is also added to the record of the function implementation process, which makes it easy to refer to when maintaining and modifying in the future.

03. Compile the project

After claiming the connector, it’s time to prepare the development environment.

First, fork the SeaTunnel project to the local development environment and compile it.

Here’s the compilation reference documentation: https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/setup.md

Run the testcase in the documentation after the compilation is successful. You might encounter some issues/problems during the first contact compilation process, such as the following compilation errors:

The solution to the above exceptions:

rm {your_maven_dir}/repository/org/apache/seatunnel
./mvnw clean
Recompile it

The success of project compilation means that the development environment is ready. Next, let’s take a look at the project code structure and API interface structure of the connector.

Engineering Code structure

After the project is compiled, there are three parts related to the connector. The first part is the code implementation and dependency management of the new connector module.

  • seatunnel-connectors-v2 stores the connector submodule
  • seatunnel-connectors-v2-dist manages connectors-v2 maven dependencies

The second part is the example. When testing locally, you can build a corresponding case on the example to test the connector.

  • seatunnel-flink-connector-v2-example example running on Flink
  • seatunnel-spark-connector-v2-example example running on Spark

The third part is the E2E-testcase: adding targeted test cases on the respective running engines of Spark or Flink, and verifying the functional logic of the connector through automated testing.

  • seatunnel-flink-connector-v2-e2e testcase running on Flink
  • seatunnel-spark-connector-v2-e2e testcase running on Spark

Code structure (interfaces, base classes)

The public interfaces and base classes used in the development are fully described in our readme. For example, API function usage scenarios.

Here’s the link: https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/README.en.md

05. See how other people develop connectors

After going through the above steps, don’t rush to start the work. Instead, first, check out how others do it.

We strongly recommend you check out the connector novice development tutorial shared on the community's official account:

  • [SeaTunnel Connector Minimalist Development Process]
  • [New API Connector Development Analysis]
  • [The way of decoupling Apache SeaTunnel (Incubating) and the computing engine — what we’ve done to reconstruct the API]

In addition, you can refer to the merged Connector code to see the scope of changes, the public interfaces and dependencies used, and the test cases.

3. Small Issues/Tasks During Development

Next, you have to officially enter the connector development process. What problems may be encountered during the development process?

The connector is divided into the source and sink ends — you can choose either one or both.

The first thing to pay attention to when developing a source is to determine the reading mode of the source: is it streaming or batch? Is support still required?

Use the Source#getBoundedness interface to mark the modes supported by the source.

For example, Kafka naturally supports streaming reading, but it can also support batch mode reading by obtaining lastOffset in the source.

Another question to be aware of: does the source require concurrent reads? If it is single concurrency, after the source is started, a reader will be created to read the data from the data source.

If you want to achieve multi-concurrency, you need to implement an enumerator interface through which data blocks are allocated to readers, and the readers each read their allocated data blocks.

For example, the Kafka source uses partition sharding, and the jdbc source uses fields for range query sharding. It should be noted here that if it is a concurrent reading method, the stability of the data block distribution rules must be ensured.

This is because currently, the connector has a corresponding enumerator on each shard in actual operation, and it is necessary to ensure that the enumerator has data in each shard.

Thirdly, does the source need to support resumable transfer/state restoration?

If you want to support this, you need to implement:

  • Source#restoreEnumerator: restore state
  • Enumerator#snapshotState: storage shard allocation
  • Reader#snapshotState: stores the read position

If the sink is a common sink implementation, use Sink#createWriter to write our data according to the concurrency of the source.

If you need to support failure recovery, you need to implement:

  • Sink#restoreWriter: restore state
  • Writer#snapshotState: snapshot state

If you want to support a two-phase commit, you need to implement the following interfaces:

  • Sink#createCommitter
  • Writer#prepareCommit: pre-commit
  • Committer#commit: abort Phase 2 commit

Next, let’s take a look at some of the general problems, especially when the first contribution is made with different styles for each environment, there are often various problems. Therefore, it is recommended that you import tools/checkstyle/checkStyle.xml from the project during development, and use a unified coding format.

Whether it is a source or a sink, it will involve defining the data format. The community is pushing for a unified data format definition.

To define Schema, please refer to PR: https://github.com/apache/incubator-seatunnel/pull/2392 To define the Format, please refer to PR: https://github.com/apache/incubator-seatunnel/pull/2435

If you feel that the compilation speed is slow, you can temporarily annotate the old version of the connector-related module in order to speed up both development and debugging.

04. How to seek help

When you encounter problems during development and need help, you can:

  • Describe the problem in your Issue and call active contributors
  • Discuss on mailing lists and Slack
  • Communicate through the WeChat group (if you have not joined, please follow the SeaTunnel official account to join the group, and add a small assistant WeChat seatunnel1)
  • There may be a community docking group for docking third-party components (allowing you to do more with less)

4. Notes on Writing E2E Tests

E2E testing is very important. It is often called the gatekeeper of connector quality.

This is because, if the connector you wrote is not tested, it could be difficult for the community to judge whether there are problems with the implementation of the static code.

Therefore, E2E testing is not only functional verification but also a process of checking data logic, which can reduce the pressure on the community to review code and ensure basic functional correctness.

In E2E testing, these are some of the problems that may be encountered:

01. E2E Failed — Test Case Network Address Conflict

Because the E2E network deployment structure has the following characteristics:

  • External components that Spark, Flink, and e2e-test case depend on (for example, MySQL), use the container networkAliases(host) as the access address
  • e2e-test case on both sides of Spark and Flink may run in parallel under the same host
  • External components that e2e-test case depends on, need to map ports to hosts for e2e-test case to access

Therefore, E2E has to pay attention to:

  • The external components e2e-test case depends on the ports mapped to the external networkAliases, and so cannot be the same in the test cases on both sides of Spark and Flink
  • e2e-test case uses localhost, the above-mapped port, to access external components
  • e2e’s configuration file uses networkAliases(host), the external components that depend on port access in the container

Here’s the E2E Testcase reference PR: https://github.com/apache/incubator-seatunnel/pull/2429

02. E2E failure — Spark jar package conflict

Spark uses the parent first-class loader by default, which may conflict with the package referenced by the connector. For this, the userClassPathFirst classloader can be configured in the Connector environment.

However, the current packaging structure of SeaTunnel will cause userClassPathFirst to not work properly, so we created an issue, https://github.com/apache/incubator-seatunnel/pull/2474, to track this issue. Everyone is welcome to contribute solutions.

Currently, this can only be resolved by replacing conflicting packages in the spark jars directory with the documentation.

03. E2E failure — Connector jar package conflict

Both the old and new versions of Connector are dependent on the E2E project and cause conflicts.

PR https://github.com/apache/incubator-seatunnel/pull/2414 has resolved this issue.

Version conflict between Connector-v2:

  • Mainly occurs during E2E, because the E2E project depends on all Connectors
  • We may plan to provide a separate test project for each Connector (or version) in the future

04. Insufficient E2E — Sink Logic Verification

The FakeSource of the Connector-v2 version can only generate random data of a few fixed columns at present, and the community partners are optimizing it to make it better. https://github.com/apache/incubator-seatunnel/pull/2406 That said, we can temporarily solve this problem by simulating the data of the specified content through Transform sql:

05. Insufficient E2E — Source validation data

The Assert Sink can configure column rules, but cannot do row-level value checking. For this problem, you can temporarily use other connector sinks with external storage for query verification data.

06. E2E stability improvement

In many cases, when E2E starts, you might use Thread.sleep to wait for resource initialization. Here, sleep will cause fewer initialization failures but more time-wasting issues.

In addition, due to the instability of resources, network, and other issues, you might be able to run it now but not later. To avoid this problem, Thread.sleep can be replaced with Awaitility.

07. A method to speed up E2E

At present, I see that most people run E2E tests separately for both source and sink. If you want to speed up the PR process, it is recommended that you combine both the sink and source into one E2E testcase for verification, and run the testcase only once.

5. Checks Before Submitting a PR

After completing the previous steps, please make sure you do some checks before submitting PR — including the following aspects:

Complete recompile project:

  • Codestyle validation, dependency validation
  • The successful compilation before does not mean that it can be compiled successfully now

Running E2E locally succeeds:

  • Both Flink and Spark are verified

Supplement or change the document and review it again before submitting:

  • Review for places not covered by tests
  • Places that hav been reviewed before and needs to be checked again
  • Review for including all files, not just code

The above operations and steps can greatly save CI resources, speed up PR Merged, and reduce the costs of community reviews.

· 10 min read

At the Apache SeaTunnel&Apache Doris Joint Meetup held on July 24, Liu Li — senior engineer of WhaleOps and contributor to Apache SeaTunnel — mentioned an easy way to develop a connector in SeaTunnel quickly.

We’ll divide it into four key parts:

● The definition of a Connector

● How to access data sources and targets

● Code to demonstrate how to implement a Connector

● Sources and targets that are currently supported

Definition of a Connector

The Connector consists of Source and Sink and is a concrete implementation of accessing data sources.

Source: The Source is responsible for reading data from sources such as MySQLSource, DorisSource, HDFSSource, TXTSource, and more.

Sink: The Sink is responsible for writing read data to the target, including MySQLSink, ClickHouseSink, HudiSink, and more. Data transfer, and more specifically, data synchronization is completed through the cooperation between the Source and Sink.

Of course, different sources and sinks can cooperate with each other.

For example, you can use MySQL Source, and Doris Sink to synchronize data from MySQL to Doris, or even read data from MySQL Source and write to HDFS Sink.

How to access data sources and targets

How to access Source

Firstly, let’s take a look at how we can access the Source. To elaborate, let’s dive in and check out how we can implement a source and the core interfaces that need to be implemented to access the Source.

The simplest Source is a single concurrent Source. However, if a source does not support state storage and other advanced functions, what interfaces should we implement in these simple single concurrent sources?

Firstly, we need to use getBoundedness in the Source to identify whether the Source supports real-time or offline, or both.

createReader creates a Reader whose main function is to read the specific implementation of data. A single concurrent source is really simple as we only need to implement one method, pollNext, through which the read data is sent.

If concurrent reading is required, what additional interfaces should we implement?

For concurrent reading, we’ll introduce a new member, called the Enumerator.

We implement createEnumerator in Source, and the main function of this member is to create an Enumerator to split the task into segments and then send it to the Reader.

For example, a task can be divided into 4 splits.

If it is concurrent twice, it’ll correspond to two Readers. Two of the four splits will be sent to Reader1, and the other two will be sent to Reader2.

If the number of concurrencies is more — for example, let’s say there are four concurrences, then you have to create four Readers. You have to use the corresponding four splits for concurrent reading for improved efficiency.

A corresponding interface in the Enumerator called the addSplitsBack sends the splits to the corresponding Reader. Through this method, the ID of the Reader can be specified.

Similarly, there is an interface called the addSplits in the Reader to receive the splits sent by the Enumerator for data reading.

In a nutshell, for concurrent reading, we need an Enumerator to implement task splitting and send the splits to the reader. Also, the reader receives the splits and uses them for reading.

In addition, if we need to support resuming and exactly-once semantics, what additional interfaces should we implement?

If the goal is to resume the transfer from a breakpoint, we must save the state and restore it. For this, we need to implement a restoreEnumerator in Source.

The restoreEnumerator method is used to restore an Enumerator through the state and restore the split.

Correspondingly, we need to implement a snapshotState in this enumerator, which is used to save the state of the current Enumerator and perform failure recovery during checkpoints.

At the same time, the Reader will also have a snapshotState method to save the split state of the Reader.

In the event of a failed restart, the Enumerator can be restored through the saved state. After the split is restored, reading can be continued from the place of failure, including fetching and incoming data.

The exact one-time semantics actually requires the source to support data replays, such as Kafka, Pulsar, and others. In addition, the sink must be submitted in two phases, i.e., the precise one-time semantics can be achieved with the cooperation of these two sources and sinks.

How to access Sink

Now, let’s take a look at how to connect to the Sink. What interfaces does the Sink need to implement?

Truth be told, Sink is relatively simple. For concurrent sinks, when state storage and two-phase commit are not supported, the Sink is simple.

To elaborate, the Sink does not distinguish between stream synchronization and batch synchronization as the Sink — and the entire SeaTunnel API system — supports Unified Stream and Batch Processing.

Firstly, we need to implement createWriter. A Writer is used for data writing.

You need to implement a writer method in Writer through which data is written to the target library.

As shown in the figure above, if two concurrencies are set, the engine will call the createWriter method twice in order to generate two Writers. The engine will feed data to these two writers, which will write the data to the target through the write method.

For a more advanced setup, for example, we need to support two-phase commit and state storage.

Here, what additional interfaces should we implement?

First, let’s introduce a new member, the Committer, whose main role is for the second-stage commit.

Since Sink is stored in state, it is necessary to restore Writer through the state. Hence, restoreWriter should be implemented.

Also, since we have introduced a new member, the Committer, we should also implement a createCommitter in the sink. We can then use this method to create a Committer for the second-stage commit or rollback.

In this case, what additional interfaces does Writer need to implement?

Since it is a two-phase commit, the first-phase commit is done in the Writer through the implementation of the prepareCommit method — which is mainly used for the first-phase commit.

In addition, state storage and failure recovery is also supported, meaning we need snapshotState to take snapshots at checkpoints. This saves the state for failure recovery scenarios.

The Committer is the core here. It is mainly used for rollback and commit operations in the second phase.

For the corresponding process, we need to write data to the database. Here, the engine will trigger the first stage commit during the checkpoint, and then the Writer needs to prepare a commit.

At the same time, it will return commitInfo to the engine, and the engine will judge whether the first stage commits of all writers are successful.

If they are indeed successful, the engine will use the commit method to actually commit.

For MySQL, the first-stage commit just saves a transaction ID and sends it to the commit. The engine determines whether the transaction ID is committed or rolled back.

How to implement the Connector

We’ve taken a look at Source and Sink; let’s now look at how to access the data source and implement your own Connector.

Firstly, we need to build a development environment for the Connector.

The necessary environment

  1. Java 1.8\11, Maven, IntelliJ IDEA

  2. Windows users need to additionally download gitbash (https://gitforwindows.org/)

  3. Once you have these, you can download the SeaTunnel source code by cloning the git.

  4. Download SeaTunnel source code 1, git clone https://github.com/apache/incubator-seatunnel.git2, cd incubator-seatunnel

SeaTunnel Engineering Structure

We then open it again through the IDE, and see the directory structure as shown in the figure:

The directory is divided into several parts:

  1. Connector — v2

Specific implementation of the new Connector(Connector — v2) will be placed in this module.

  1. connector-v2-dist

The translation layer of the new connector translates into specific engine implementation — instead of implementing under corresponding engines such as Spark, Flink, and ST-Engine. ST-Engine is the “important, big project” the community is striving to implement. This project is worth the wait.

  1. examples

This package provides a single-machine local operation method, which is convenient for debugging while implementing the Connector.

  1. e2e

The e2e package is for e2e testing of the Connector.

Next, let’s check out how a Connector can be created (based on the new Connector). Here is the step-by-step process:

  1. Create a new module in the seatunnel-connectors-v2 directory and name it this way: connector-{connector name}.

  2. The pom file can refer to the pom file of the existing connector and add the current child model to the parent model’s pom file.

  3. Create two new packages corresponding to the packages of Source and Sink, respectively:

a. org.apache.seatunnel.connectors.seatunnel.{connector name}.source

b. org.apache.seatunnel.connectors.seatunnel.{connector name}.sink

Take this mysocket example shown in the figure:

To do some implementation, develop the connector. During implementation, you can use the example module for local debugging if you need to debug. That said, this module mainly provides the local running environment of Flink and Spark.

As you can see in the image, there are numerous examples under the “Example” module — including seatunnel-flink-connector-v2-example.

So how do you use them?

Let’s take an example. The debugging steps on Flink are as follows (these actions are under the seatunnel-flink-connector-v2-example module:

  1. Add connector dependencies in pom.xml

  2. Add the task configuration file under resources/examples

  3. Configure the file in the SeaTunnelApiExample main method

  4. Run the main method

Code Demo

This code demonstration is based on DingTalk.

Here’s a reference( 19:35s–37:10s):

https://weixin.qq.com/sph/A1ri7B

New Connectors supported at this stage

As of July 14, contributions and statistics for the completed connectors are welcome. You are more than welcome to try them out, and raise issues in our community if you find bugs.

The Connector shared below have already been claimed and developed:

Also, we have Connectors in the roadmap — the connectors we want to support in the near future. To foster the process, the SeaTunnel Community initiated SeaTunnel Connector Access Incentive Plan, you are more than welcome to contribute to the project.

SeaTunnel Connector Access Incentive Plan: https://github.com/apache/incubator-seatunnel/issues/1946

You can claim tasks that haven’t been marked in the comment area, and take a spree home! Here is part of the connectors that need to be accessed as soon as possible: In fact, the implementations of Connectors like Feishu, DingTalk, and Facebook messenger are quite simple as the connectors do not need to carry a large amount of data (just a simple Source and Sink). This is in sharp contrast to Hive and other databases that need to consider transaction consistency or concurrency issues.

We welcome everyone to make contributions and join our Apache SeaTunnel family!

About SeaTunnel

SeaTunnel (formerly Waterdrop) is an easy-to-use, ultra-high-performance distributed data integration platform that supports the real-time synchronization of massive amounts of data and can synchronize hundreds of billions of data per day stably and efficiently.

Why do we need SeaTunnel?

SeaTunnel does everything it can to solve the problems you may encounter in synchronizing massive amounts of data.

  • Data loss and duplication
  • Task buildup and latency
  • Low throughput
  • Long application-to-production cycle time
  • Lack of application status monitoring

SeaTunnel Usage Scenarios

  • Massive data synchronization
  • Massive data integration
  • ETL of large volumes of data
  • Massive data aggregation
  • Multi-source data processing

Features of SeaTunnel

  • Rich components
  • High scalability
  • Easy to use
  • Mature and stable

· 2 min read

As SeaTunnel gets popular around the world, it is attracting more and more contributors from overseas to join the open-source career. Among them, a big data platform engineer at Kakao enterprise corp., Namgung Chan has recently contributed the Neo4j Sink Connector for the SeaTunnel. We have a talk with him to know why SeaTunnel is attractive to him, and how he thinks SeaTunnel should gain popularity in the South Korean market.

Personal Profile

Namgung Chan, South Korea, Big Data Platform Engineer at Kakao enterprise corp.

Blog (written in Korean): https://getchan.github.io/ GitHub ID: https://github.com/getChan LinkedIn : https://www.linkedin.com/in/namgung-chan-6a06441b6/

Contributions to the community

He writes the Neo4j Sink Connector code for the new SeaTunnel Connector API.

How to know SeaTunnel for the first time?

It’s the first time Namgung Chan to engage in open source. He wants to learn technical skills by contributing, at the same time experience the open-source culture.

For him, an open source project which is written by java lang, and made for data engineering, has many issues of ‘help wanted’ or ‘good first issue’ is quite suitable. Then he found SeaTunnel on the Apache Software Foundation project webpage.

The first impression of SeaTunnel Community

Though it was his first open source experience, he felt it was comfortable and interesting to go to the community. He also felt very welcome, because there are many ‘good first issue, and ‘volunteer wanted’ tagged issues and will get a quick response of code review.

With gaining knowledge of Neo4j, he grows much more confident in open source contribution.

Research and comparison

Before knowing about SeaTunnel, Namgung Chan used Spring Cloud Data Flow for data integration. While after experiencing SeaTunnel, he thinks the latter is more lightweight than SCDF, because in SCDF, every source, processor, and sink component are individual applications, but SeaTunnel is not.

Though hasn’t used SeaTunnel in his working environment yet, Namgung Chan said he would like to use it positively when he is in need, especially for data integration for various data storage.

Expectations for SeaTunnel

The most exciting new features or optimizations for Namgung Chan are:

Data Integration for various data storage. Strict data validation. monitoring extension Low computing resource exactly-once data processing In the future, Namgung Chan plans to keep contributing from light issues to heavy ones, and we hope he will have a good time here!

· 3 min read

More than a month after the release of Apache SeaTunnel(Incubating) 2.1.2, we have been collecting user and developer feedback to bring you version 2.1.3. The new version introduces the Assert Sink connector, which is an inurgent need in the community, and two Transforms, NullRate and Nulltf. Some usability problems in the previous version have also been fixed, improving stability and efficiency.

This article will introduce the details of the update of Apache SeaTunnel(Incubating) version 2.1.3.

Major feature updates

Introduces Assert Sink connector

Assert Sink connector is introduced in SeaTunnel version 2.1.3to verify data correctness. Special thanks to Lhyundeadsoul for his contribution.

Add two Transforms

In addition, the 2.1.3 version also adds two Transforms, NullRate and Nulltf, which are used to detect data quality and convert null values ​​in the data to generate default values. These two Transforms can effectively improve the availability of data and reduce the frequency of abnormal situations. Special thanks to wsyhj and Interest1-wyt for their contributions.

At present, SeaTunnel has supported 9 types of Transforms including Common Options, Json, NullRate, Nulltf, Replace, Split, SQL, UDF, and UUID, and the community is welcome to contribute more Transform types.

For details of Transform, please refer to the official documentation: https://seatunnel.apache.org/docs/2.1.3/category/transform

ClickhouseFile connector supports Rsync data transfer method now

At the same time, SeaTunnel 2.1.3 version brings Rsync data transfer mode support to ClickhouseFile connector, users can now choose SCP and Rsync data transfer modes. Thanks to Emor-nj for contributing to this feature.

Specific feature updates:

Optimization

  • Refactored Spark TiDB-related parameter information
  • Refactor the code to remove redundant code warning information
  • Optimize connector jar package loading logic
  • Add Plugin Discovery module
  • Add documentation for some modules
  • Upgrade common-collection from version 4 to 4.4
  • Upgrade common-codec version to 1.13

Bug Fix

In addition, in response to the feedback from users of version 2.1.2, we also fixed some usability issues, such as the inability to use the same components of Source and Sink, and further improved the stability.

  • Fixed the problem of Hudi Source loading twice
  • Fix the problem that the field TwoPhaseCommit is not recognized after Doris 0.15
  • Fixed abnormal data output when accessing Hive using Spark JDBC
  • Fix JDBC data loss when partition_column (partition mode) is set
  • Fix KafkaTableStream schema JSON parsing error
  • Fix Shell script getting APP_DIR path error
  • Updated Flink RunMode enumeration to get correct help messages for run modes
  • Fix the same source and sink registered connector cache error
  • Fix command line parameter -t( — check) conflict with Flink deployment target parameter
  • Fix Jackson type conversion error problem
  • Fix the problem of failure to run scripts in paths other than SeaTunnel_Home

Acknowledgment

Thanks to all the contributors (GitHub ID, in no particular order,), it is your efforts that fuel the launch of this version, and we look forward to more contributions to the Apache SeaTunnel(Incubating) community!

leo65535, CalvinKirs, mans2singh, ashulin, wanghuan2054, lhyundeadsoul, tobezhou33, Hisoka-X, ic4y, wsyhj, Emor-nj, gleiyu, smallhibiscus, Bingz2, kezhenxu94, youyangkou, immustard, Interest1-wyt, superzhang0929, gaaraG, runwenjun

· 4 min read

After days of community development, the preliminary development of the new Connector API of SeaTunnel is completed. The next step is to adapt this new connector. In order to aid the developers to use this connector, this article provides guide to develop a new API.

Priliminary Setup

  • Environment configuration: JDK8 and Scala2.11 are recommended.

  • As before, we need to download the latest code locally through git and import it into the IDE, project address: https://github.com/apache/incubator-seatunnel . At the same time, switch the branch to api-draft, and currently use this branch to develop the new version of the API and the corresponding Connector. The project structure is as follows:

    Project Structure

Prerequisites

  • At present, in order to distinguish different Connectors, we put the connectors that support

    • Flink/Spark under the seatunnel-connectors/seatunnel-connectors-flink(spark) module.
    • New version of the Connector is placed under the seatunnel-connectors/seatunnel-connectors-seatunnel module.

    As we can see from the above figure, we have implemented Fake, Console, Kafka Connector, and Clickhouse Connector is also being implemented.

  • At present, the data type we support is SeaTunnelRow, so no matter the type of data generated by the Source or the type of data consumed by the Sink, it should be SeaTunnelRow.

Development of Connector

Taking Fake Connector as an example, let's introduce how to implement a new Connector:

  • Create a corresponding module with a path under seatunnel-connectors-seatunnel, which is at the same level as other new connectors.

  • Modify the seatunnel-connectors-seatunnel/pom.xml file, add a new module to modules, modify seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml, add seatunnel-api dependencies, and correct parent Quote. The resulting style is as follows:

    Style

  • The next step is to create the corresponding package and related classes, create FakeSource, and need to inherit SeaTunnel Source.

    • Note : The Source of SeaTunnel adopts the design of stream and batch integration. The Source of SeaTunnel determines whether current Source is a stream or batch through attribute getBoundedness.

    So you can specify a Source as a stream or batch by dynamic configuration (refer to the default method). The configuration defined by the user in the configuration file can be obtained through the prepare method to realize the customized configuration.

    Then create FakeSourceReader, FakeSource SplitEnumerator, and FakeSourceSplit to inherit the corresponding abstract classes (which can be found in the corresponding classes). As long as we implement the corresponding methods of these classes, then our SeaTunnel Source Connector is basically completed.

  • Next, just follow the existing example to write the corresponding code. The most important one is the FakeSource Reader, which defines how we obtain data from the outside, which is the most critical part of the Source Connector. Every time a piece of data is generated, we need to place it in the collector as shown:

    Source

  • After the code development is complete, we need to configure the configuration file plugin-mapping.properties located under seatunnel-connectors/modules. Adding a seatunnel .source.FakeSource = seatunnel-connector-fake means that SeaTunnel can find the jar package corresponding to the project by looking for a Source named FakeSource. This allows the Connector to be used in the normal configuration file.

  • For a detailed description of writing Source and Sink and SeaTunnel API, please refer to the introduction at seatunnel-connectors/seatunnel-connectors-seatunnel/ README.zh.md.

Connector Testing

  • For testing, we can find the seatunnel-flink(spark)-new-connector-example module in seatunnel-examples, and test it against different engines to ensure that the performance of the Connector is as consistent as possible. If you find any discrepancies, you can mark them in the document, modify the configuration file under resource, add our Connector to the configuration, and introduce seatunnel-flink(spark)-new-connector-example/pom.xml dependency, you can execute SeaTunnelApiExample to test.
  • The default is stream processing mode, and the execution mode is switched to batch mode by modifying job.mode=BATCH in the environment of the configuration file.

Submit PR

When our Connector is ready, we can submit PR to github. After reviewing by other partners, our contributed Connector will become part of SeaTunnel!

· 12 min read

Translator | Critina

In the May joint Meetup between Apache SeaTunnel and Apache Inlong, Li Zongwen, a senior engineer at WhaleOps, shared his experiences about finding and refactoring of the the four major problems with Apache SeaTunnel (Incubating).i.e. the connectors of SeaTunnel have to be implemented many times,the inconsistent parameters, SeaTunnel is not supportive of multiple versions of the engine, and it’s difficult to upgrade the engine. In order to solve these problems, Li Zongwen aimed to decouple Apache SeaTunnel (Incubating) from thw computing engines, and re-factor the Source and Sink apis to improve the development experience.

This speech mainly consists of five parts.The first part is about Apache SeaTunnel (Incubator) refactoring background and motivation. The second part introduces Apache SeaTunnel (Incubating) Target for refactoring.The third part discusses Apache SeaTunnel (Incubating) overall design for refactoring. The last two parts is about Apache SeaTunnel (Incubating) Source API design and Apache SeaTunnel (Incubating) Sink API design.

01 Background and motivation for refactoring

Those of you who have used Apache SeaTunnel (Incubator) or developers should know that Apache SeaTunnel (Incubator) is now fully coupled with the engine, which is entirely based on Spark or Flink, and so are the configuration file parameters. From the perspective of contributors and users, we can find they face some problems.

In the view of the contributors, repeated implementing connector is meaningless and it is unable for potential contributors to contribute to the community due to inconsistent engine versions.

At present, many companies use Lambda architecture, Spark is used for offline operations and Flink is used for real-time operations. In the view of the users, it can be found that Spark may have the Connector of SeaTunnel, but Flink does not, and the parameters of the two engines for the Connector of the same storage engine are not unified, thus resulting a high cost of and deviating from its original intention of being easy to use. And some users question that Flink version 1.14 is not supported nowadays. While with the current SeaTunnel architecture, we must discard the previous version in order to support Flink version 1.14, which will bring great trouble for early version users.

As a result, it was difficult for us to either upgrade engine or support more versions.

In addition, Spark and Flink both adopt the Checkpoint fault-tolerant mechanism implemented by Chandy-Lamport algorithm and internally unify DataSet and DataStream. On this premise, we believe decoupling is feasible.

02 Apache SeaTunnel (Incubating) decouples with computing engine

Therefore, in order to solve the problems raised above, we set the following goals.

  1. Connector is only implemented once. To solve the problems that parameters are not unified and Connector is implemented for too many times, we hope to achieve a unified Source and Sink API;

  2. Multiple versions of Spark and Flink engines are supported. A translation layer above the Source and Sink API is added to support multiple versions of Spark and Flink engines.

  3. The logic for parallel shard of Source and the Sink submission should be clarified. We must provide a good API to support Connector development.

  4. The full database synchronization in real-time scenarios should be supported. This is a derivative requirement that many users have mentioned for CDC support. I once participated the Flink CDC community before and many users pointed out that in the CDC scenario, if you wanted to use the Flink CDC directly, each table would have a link and there would be thousands of links for thousands of tables when you need to synchronize the whole library, which was unacceptable for both the database and the DBA. To solve this problem, the simplest way was to introduce Canal、Debezium or other components, which were used to pull incremental data to Kafka or other MQ for intermediate storage, and then we could use Flink SQL for synchronization. This actually contradicted the original idea of the Flink CDC to reduce links. However, the Flink CDC aimed only a Connector and was unable to deal with the whole link, so the proposal was not seen in the SeaTunnel community. By the chance of the reconstruction, we submitted the proposal to the SeaTunnel community.

  5. Automatic discovery and storage of meta information are realized. The users should have awful experience due to the storage engines such as Kafka lacking of record of the data structure, when we need to read structured data, the user must define the topic of structured data types before read one topic at a time . We hope once the configuration is completed, there is no need to do any redundant work again.

Some people may wonder why we don’t use Apache Beam directly. That is because Beam sources are divided into BOUNDED and UNBOUNDED sources, which means it needs to be implemented twice. Moreover, some features of Source and Sink are not supported, which will be mentioned later.

03 Apache SeaTunnel(Incubating) overall design for refactoring

The Apache SeaTunnel(Incubating) API architecture is described in the picture above.

The Source & Sink API is one of the core APIS of data integration, which defines the logic for parallel shard of Source and the commitment of Sink to realize the Connector.

The Engine API includes the translation and the execution layers. The translation is used to translate Souce and Sink API of SeaTunnel into connectors that can be run inside the engine.

The execution defines the execution logic of Source, Transform, Sink and other operations in the engine.

The Table SPI is mainly used to expose the interface of Source and Sink in SPI mode, and to specify mandatory and optional parameters of Connector etc.

The DataType includes SeaTunnel data structure used to isolate engines and declare Table schema.

The Catalog is Used to obtain Table schemes and Options, etc. The Catalog Storage is used to store Table Schemes defined by unstructured engines such as Kafka.

The execution flow we assumed nowadays can be see in the above picture.

  1. Obtain task parameters from configuration files or UI.

  2. Obtain the Table Schema, Option and other information by analyzing the parameters from Catalog.

  3. Pull up the Connector of SeaTunnel in SPI mode and inject Table information.

  4. Translate the Connector from SeaTunnel into the Connector within the engine.

  5. Execute the operation logic of the engine. The multi-table distribution in the picture only exists in the synchronization of the whole database of CDC, while other connectors are single tables and do not need the distribution logic.

It can be seen that the hardest part of the plan is to translate Source and Sink into an internal Source and Sink in the engine.

Many users today use Apache SeaTunnel (Incubating) not only as a data integration tool but also as a data storage tool, and use a lot of Spark and Flink SQLs. We want to preserve that SQL capability for users to upgrade seamlessly.

According to our research, the feature above shows the ideal execution logic of Source and Sink. Since SeaTunnel is incubated as WaterDrop, the terms in the figure are tended towards Spark.

Ideally, the Source and Sink coordinators can be run on the Driver, and the Source Reader and Sink Writer will run on the Worker. In terms of the Source Coordinator, we expect it to support several features.

The first capability is that the slicing logic of data can be dynamically added to the Reader.

The second is that the coordination of Reader can be supported. Source Reader is used to read data, and then send the data to the engine, and finally to the Source Writer for data writing. Meanwhile, Writer can support the two-phase transaction submission, and the coordinator of Sink supports the aggregation submission requirements of Connector such as Iceberg.

04 Source API

After research, we found the following features that are required by Source.

  1. Unified offline and real-time API , which supports that source is implemented only once and supports both offline and real-time API;

  2. Supportive of parallel reading. For example that Kafka generates a reader for each partition and execute in parallel.

  3. Supporting dynamic slice-adding. For example, Kafka defines a regular topic, when a new topic needs to be added due to the volume of business, the Source API allows to dynamically add the slice to the job.

  4. Supporting the work of coordinating reader, which is currently only needed in the CDC Connector. CDC is currently supported by NetFilx’s DBlog parallel algorithms, which requires reader coordination between full synchronization and incremental synchronization.

  5. Supporting a single reader to process multiple tables, i.e. to allows the whole database synchronization in the real-time scenario as mentioned above.

Based on the above requirements, we have created the basic API as shown in the figure above. And the code has been submitted to the API-Draft branch in the Apache SeaTunnel(Incubator) community. If you’re interested, you can view the code in detail.

Flink and Spark unify the API of DataSet and DataStream, and they can support the first two features. Then, for the remaining three features, how do we

  • Support dynamic slice-adding?
  • Support the work of coordinating reader?
  • Support a single reader to process multiple tables?

Let's review the design with questions.

We found that other connectors do not need coordinators, except for CDC. For those connectors that do not need coordinators, we have a Source that supports parallel execution and engine translation.

As shown in the figure above, there is a slice enumerator on the left, which can list which slices the source needs and show what there are. After enumerating slices in real time, each slice would be distributed to SourceReader, the real data reading module. Boundedness marker is used to differentiate offline and real-time operations. Connector can mark whether there is a stop Offset in a slice. For example, Kafka can support real-time and offline operations. The degree of parallelism can be set for the ParallelSource in the engine to support parallel reading.

As shown in the figure above, in a scenario where a coordinator is required, Event transmission is done between the Reader and Enumerator. Enumerator coordinates events by the Event sent by the Reader. The Coordinated Source needs to ensure single parallelism at the engine level to ensure data consistency. Of course, this does not make good use of the engine’s memory management mechanism, but trade-offs are necessary.

For the last question, how can we support a single reader to process multiple tables? This involves the Table API layer. Once all the required tables have been read from the Catalog, some of the tables may belong to a single job and can be read by a link, and some may need to be separated, depending on how Source is implemented. Since this is a special requirement, we want to make it easier for the developers. In the Table API layer, we will provide a SupportMultipleTable interface to declare that Source supports multiple Table reads. The Source is implemented based on the corresponding deserializer of multiple tables. As for how to separate derived multi-table data, Flink will adopt Side Output mechanism, while Spark is going to use Filter or Partition mechanism.

5 Sink API

At present, there are not many features required by Sink, but three mojor requirements are considerable according to our research.

The first is about idempotent writing, which requires no code and depends on whether the storage engine can support it.

The second is about distributed transactions. The mainstream method is two-phase commitments, such as Kafka etc.

The third is about the submission of aggregation. For Storage engines like Iceberg and Hoodie, we hope there is no issues triggered by small files, so we expect to aggregate these files into a single file and commit it as a whole.

Based on these three requirements, we built three APIS: SinkWriter, SinkCommitter, and SinkAggregated Committer. SinkWriter plays a role of writing, which may or may not be idempotent. SinkCommitter supports for two-phase commitments. SinkAggregatedCommitter supports for aggregated commitments.

Ideally, AggregatedCommitter runs in Driver in single or parallel, and Writer and Committer run in Worker with multiple parallels, with each parallel carrying its own pre-commit work and then send Aggregated messages to Aggregated committers.

Current advanced versions of Spark and Flink all support AggregatedCommitter running on the Driver(Job Manager) and Writer and Committer running on the worker(Task Manager).

However, for the lower versions of Flink, AggregatedCommitter cannot be supported to run in JM, so we are also carrying translation adaptation. Writer and Committer will act as pre-operators, packaged by Flink’s ProcessFunction, supports concurrent pre-delivery and write, and implement two-phase commitment based on Flink’s Checkpoint mechanism. This is also the current 2PC implementation of many of Flink connectors. The ProcessFunction can send messages about pre-commits to downstream Aggregated committers, which can be wrapped around operators such as SinkFunction or ProcessFunction. Of course, We need to ensure that only one single parallel will be started by the AggregatedCommitter in case of the broken of the logic of the aggregated commitment.

Thank you for watching. If you’re interested in the specific implementations mentioned in my speech, you can refer to the Apache SeaTunnel (Incubating) community and check out the API-Draft branch code. Thank you again.

· 8 min read

Author | Fan Jia, Apache SeaTunnel(Incubating) Contributor Editor | Test Engineer Feng Xiulan

For importing billions of batches of data, the traditional JDBC approach does not perform as well as it should in some massive data synchronization scenarios. To write data faster, Apache SeaTunnel (Incubating) has just released version 2.1.1 to provide support for ClickhouseFile-Connector to implement Bulk load data writing.

Bulk load means synchronizing large amounts of data to the target DB. SeaTunnel currently supports data synchronization to ClickHouse.

At the Apache SeaTunnel (Incubating) April Meetup, Apache SeaTunnel (Incubating) contributor Fan Jia shared the topic of "ClickHouse bulk load implementation based on SeaTunnel", explaining in detail the implementation principle and process of ClickHouseFile for efficient processing of large amounts of data.

Thanks to the test engineer Feng Xiulan for the article arrangement!

This presentation contains seven parts.

  • State of ClickHouse Sink
  • Scenarios that ClickHouse Sink isn't good at
  • Introduction to the ClickHouseFile plugin
  • ClickHouseFile core technologies
  • Analysis of ClickHouseFile plugin implementation
  • Comparison of plug-in capabilities
  • Post-optimization directions

Fan Jia, Apache SeaTunnel (Incubating) contributor, Senior Enginee of WhaleOps.

01 Status of ClickHouse Sink

At present, the process of synchronizing data from SeaTunnel to ClickHouse is as follows: as long as the data source is supported by SeaTunnel, the data can be extracted, converted (or not), and written directly to the ClickHouse sink connector, and then written to the ClickHouse server via JDBC.

However, there are some problems with writing to the ClickHouse server via traditional JDBC.

Firstly, the tool used now is the driver provided by ClickHouse and implemented via HTTP, however, HTTP is not very efficient to implement in certain scenarios. The second is the huge amount of data, if there is duplicate data or a large amount of data written at once, it needs to generate the corresponding insert statement and send it via HTTP to the ClickHouse server-side by the traditional method, where it is parsed and executed item by item or in batches, which does not allow data compression.

Finally, there is the problem we often encounter, i.e. too much data may lead to an OOM on the SeaTunnel side or a server-side hang due to too much data being written to the server-side too often.

So we thought, is there a faster way to send than HTTP? If data pre-processing or data compression could be done on the SeaTunnel side, then the network bandwidth pressure would be reduced and the transmission rate would be increased.

02 Scenarios that ClickHouse Sink isn't good at

  1. If the HTTP transfer protocol is used, HTTP may not be able to handle it when the volume of data is too large and the batch is sending requests in micro-batches.
  2. Too many INSERT requests may put too much pressure on the server. The bandwidth can handle a large number of requests, but the server-side is not always able to carry them. The online server not only needs data inserts but more importantly, the query data can be used by other business teams. If the server cluster goes down due to too much-inserted data, it is more than worth the cost.

03 ClickHouse File core technologies

In response to these scenarios that ClickHouse is not good at, we wondered is there a way to do data compression right on the Spark side, without increasing the resource load on the Server when writing data, and with the ability to write large amounts of data quickly? So we developed the ClickHouseFile plugin to solve the problem.

The key technology of the ClickHouseFile plugin is ClickHouse -local. ClickHouse-local mode allows users to perform fast processing of local files without having to deploy and configure a ClickHouse Server. C lickHouse-local uses the same core as ClickHouse Server, so it supports most features as well as the same format and table engine.

These two features mean that users can work directly with local files without having to do the processing on the ClickHouse Server side. Because it is the same format, the data generated by the operations we perform on the remote or SeaTunnel side is seamlessly compatible with the server-side and can be written to using ClickHouse local. ClickHouse local is the core technology for the implementation of ClickHouseFile, which allows for implementing the ClickHouse file connector.

ClickHouse local core is used in the following ways.

First line: pass the data to the test_table table of the ClickHouse-local program via the Linux pipeline.

Lines two to five: create a result_table for receiving data.

The sixth line: pass data from test_table to the result_table.

Line 7: Define the disk path for data processing.

By calling the Clickhouse-local component, the Apache SeaTunnel (Incubating) is used to generate the data files and compress the data. By communicating with the Server, the generated data is sent directly to the different nodes of Clickhouse and the data files are then made available to the nodes for the query.

Comparison of the original and current implementations.

Originally, the data, including the insert statements was sent by Spark to the server, and the server did the SQL parsing, generated and compressed the table data files, generated the corresponding files, and created the corresponding indexes. If we use ClickHouse local technology, the data file generation, file compression and index creation are done by SeaTunnel, and the final output is a file or folder for the server-side, which is synchronized to the server and the server can queries the data without additional operations.

04 Core technical points

The above process makes data synchronization more efficient, thanks to three optimizations we have made to it.

Firstly, the data is transferred from the pipeline to the ClickHouseFile by the division, which imposes limitations in terms of length and memory. For this reason, we write the data received by the ClickHouse connector, i.e. the sink side, to a temporary file via MMAP technology, and then the ClickHouse local reads the data from the temporary file to generate our target local file, in order to achieve the effect of incremental data reading and solve the OM problem.

Secondly, it supports sharding. If only one file or folder is generated in a cluster, the file is distributed to only one node, which will greatly reduce the performance of the query. Therefore, we carry out slicing support. Users can set the key for slicing in the configuration folder, and the algorithm will divide the data into multiple log files and write them to different cluster nodes, significantly improving the read performance.

The third key optimization is file transfer. Currently, SeaTunnel supports two file transfer methods, one is SCP, which is characterized by security, versatility, and no additional configuration; the other is RSYNC, which is somewhat fast and efficient and supports breakpoint resume, but requires additional configuration, users can choose between the way suits their needs.

05 Plugin implementation analysis

In summary, the general implementation process of ClickHouseFile is as follows.

1.caching data to the ClickHouse sink side. 2.calling ClickHouse-local to generate the file. 3.sending the data to the ClickHouse server. 4.Execution of the ATTACH command.

With the above four steps, the generated data reaches a queryable state.

06 Comparison of plug-in capabilities

(a) In terms of data transfer, ClickHouseFile is more suitable for massive amounts of data, with the advantage that no additional configuration is required and it is highly versatile, while ClickHouseFile is more complex to configure and currently supports fewer engines.

In terms of environmental complexity, ClickHouse is more suitable for complex environments and can be run directly without additional configuration.

In terms of versatility, ClickHouse, due to being an officially supported JDBC diver by SeaTunnel, basically supports all engines for data writing, while ClickHouseFile supports relatively few engines.

In terms of server pressure, ClickHouseFile's advantage shows when it comes to massive data transfers that don't put too much pressure on the server.

However, the two are not in competition and the choice needs to be based on the usage scenario.

07 Follow-up plans

Although SeaTunnel currently supports the ClickHouseFile plugin, there are still many defects that need to be optimized, mainly including

  • Rsync support.
  • Exactly-Once support.
  • Zero Copy support for transferring data files.
  • More Engine support.

Anyone interested in the above issues is welcome to contribute to the follow-up plans, or tell me your ideas!

· 10 min read

At the Apache SeaTunnel (Incubating) Meetup in April, Yuan Hongjun, a big data expert and OLAP platform architect at Kidswant, shared a topic of SeaTunnel Application and Refactoring at Kidswant.

The presentation contains five parts.

  • Background of the introduction of Apache SeaTunnel (Incubating) by Kidswant
  • A comparison of mainstream tools for big data processing
  • The implementation of Apache SeaTunnel (Incubating)
  • Common problems in Apache SeaTunnel (Incubating) refactoring
  • Predictions on the future development of Kidswant

Yuan Hongjun, Big data expert, OLAP platform architect of Kidswant. He has many years of experience in big data platform development and management, and has rich research experience in data assets, data lineage mapping, data governance, OLAP, and other fields.

01 Background

At present, Kidswant’s OLAP platform consists of seven parts: metadata layer, task layer, storage layer, SQL layer, scheduling layer, service layer, and monitoring layer. This sharing focuses on offline tasks in the task layer.

In fact, Kidswant had a complete internal collection and push system, but due to some historical legacy issues, the company’s existing platform could not quickly support the OLAP platform getting online, so at that time the company had to abandon its own platform and start developing a new system instead. There were three options in front of OLAP at the time.

1, Re-develop the collection and push system.

2、Self-R&D.

3, Participate in open source projects.

02 Big data processing mainstream tools comparison

These three options have their own pros and cons. Carrying re-research and development based on the collection and push system is convenient for us to take advantage of the experience of previous results and avoid repeatedly stepping into the pit. But the disadvantage is that it requires a large amount of code, time, a longer research period, and with less abstract code and lots of customized functions bound to the business, it’s difficult to do the re-development.

If completely self-developed, though the development process is autonomous and controllable, some engines such as Spark can be done to fit our own architecture, while the disadvantage is that we may encounter some unknown problems.

For the last choice, if we use open-source frameworks, the advantage is that there is more abstract code, and the framework can be guaranteed in terms of performance and stability after verification by other major companies. Therefore Kidswant mainly studied three open-source data synchronization tools, DATAX, Sqoop, and SeaTunnel in the early stages of OLAP data synchronization refactoring.

From the diagram we can see that Sqoop’s main function is data synchronization for RDB, and its implementation is based on MAP/REDUCE. Sqoop has rich parameters and command lines to perform various operations. The advantage of Sqoop is that it fits Hadoop ecology, and already supports most of the conversion from RDB to HIVE arbitrary source, with a complete set of commands and APIs.

The disadvantages are that Sqoop only supports RDB data synchronization and has some limitations on data files, and there is no concept of data cleansing yet.

DataX mainly aims at synchronizing data from any source by configurable files + multi-threading, which runs three main processes: Reader, Framework, and Writer, where Framework mainly plays the role of communication and leaving empty space.

The advantage of DataX is that it uses plug-in development, has its own flow control and data control, and is active in the community, with DataX’s official website offering data pushes from many different sources. The disadvantage of DataX, however, is that it is memory-based and there may be limitations on the amount of data available.

Apache SeaTunnel (Incubating) also does data synchronization from any source and implements the process in three steps: source, transform and sink based on configuration files, Spark or Flink.

The advantage is that the current 2.1.0 version has a very large number of plug-ins and source pushes, based on the idea of plug-ins also makes it very easy to extend and embrace Spark and Flink while with a distributed architecture. The only downside to Apache SeaTunnel (Incubating) is probably the lack of IP calls at the moment and the need to manage the UI interface by yourself.

In summary, although Sqoop is distributed, it only supports data synchronization between RDB and HIVE, Hbase and has poor scalability, which is not convenient for re-development. DataX is scalable and stable overall, but because it is a standalone version, it cannot be deployed in a distributed cluster, and there is a strong dependency between data extraction capability and machine performance. SeaTunnel, on the other hand, is similar to DataX and makes up for the flaw of non-distributed DataX. It also supports real-time streaming, and the community is highly active as a new product. We chose SeaTunnel based on a number of factors such as whether it supported distributed or not, and whether it needed to be deployed on a separate machine.

03 Implementation

On the Apache SeaTunnel (Incubating) website, we can see that the basic process of Apache SeaTunnel (Incubating) consists of three parts: source, transform and sink. According to the guidelines on the website, Apache SeaTunnel (Incubating) requires a configuration script to start, but after some research, we found that the final execution of Apache SeaTunnel (Incubating) is bansed on an application submitted by spark-submit that relies on the config file.

This initialization, although simple, has the problem of having to rely on the config file, which is generated and then cleared after each run, and although it can be dynamically generated in the scheduling script, it raises two questions: 1) whether frequent disk operations make sense; and 2) whether there is a more efficient way to support Apache SeaTunnel (Incubating).

With these considerations in mind, we added a Unified Configuration Template Platform module to the final design solution. Scheduling is done by initiating a commit command, and Apache SeaTunnel (Incubating) itself pulls the configuration information from the unified configuration template platform, then loads and initializes the parameters.

The diagram above shows the business process for Kidswant’s OLAP, which is divided into three sections. The overall flow of data from Parquet, i.e. Hive, through the Parquet tables to KYLIN and CK source.

This is the page where we construct the model, which is generated mainly through drag and drop, with some transactional operations between each table, and micro-processing for Apache SeaTunnel (Incubating) on the right.

So we end up submitting the commands as above, where the first one marked in red is [-conf customconfig/jars], referring to the fact that the user can then unify the configuration template platform for processing, or specify it separately when modeling. The last one marked in red is [421 $start_time $end_time $taskType] Unicode, which is a unique encoding.

Below, on the left, are the 38 commands submitted by our final dispatch script. Below, on the right, is a modification made for Apache SeaTunnel (Incubating), and you can see a more specific tool class called WaterdropContext. It can first determine if Unicode exists and then use Unicode_code to get the configuration information for the different templates, avoiding the need to manipulate the config file.

In the end, the reportMeta is used to report some information after the task is completed, which is also done in Apache SeaTunnel (Incubating).

In the finalized config file as above, it is worth noting that in terms of transforms, Kidswant has made some changes. The first is to do desensitization for mobile phones or ID numbers etc. If the user specifies a field, they do it by field, if not they will scan all fields and then desensitize and encrypt them according to pattern matching.

Second, transform also supports custom processing, as mentioned above when talking about OLAP modeling. With the addition of HideStr, the first ten fields of a string of characters can be retained and all characters at the back encrypted, providing some security in the data.

Then, on the sink side, we added pre_sql in order to support the idempotency of the task, which is mainly done for tasks such as data deletion, or partition deletion, as the task cannot be run only once during production, and this design needed to account for the data deviation and correctness once operations such as reruns or complement occur.

On the right side of the diagram, on the Sink side of a Clickhouse, we have added an is_senseless_mode, which forms a read/write senseless mode, where the user does not perceive the whole area when querying and complementing but uses the CK partition conversion, i.e. the command called MOVE PARTITION TO TABLE to operate.

A special note here is the Sink side of KYLIN. KYLIN is a very special source with its own set of data entry logic and its monitoring page, so the transformation we have done on KYLIN is simply a call to its API operation and a simple API call and constant polling of the state when using KYLIN, so the resources for KYLIN are limited in the Unified Template Configuration platform.

04 Common problems about the Apache SeaTunnel (Incubating) transformation

01 OOM & Too Many Parts

The problem usually arises during the Hive to Hive process, even if we go through automatic resource allocation, but there are cases where the data amount suddenly gets bigger, for example after holding several events. Such problems can only be avoided by manually and dynamically tuning the reference and adjusting the data synchronization batch time. In the future, we may try to control the data volume to achieve fine control.

02 Field and type inconsistency issues

When the model runs, the user will make some changes to the upstream tables or fields that the task depends on, and these changes may lead to task failure if they are not perceived. The current solution is to rely on data lineage+ snapshots for advance awareness to avoid errors.

03 Custom data sources & custom separators

If the finance department requires a customized separator or jar information, the user can now specify the loading of additional jar information as well as the separator information themselves in the unified configuration template platform.

04 Data skewing issues

This may be due to users setting their parallelism but not being able to do so perfectly. We haven’t finished dealing with this issue yet, but we may add post-processing to the Source module to break up the data and complete the skew.

05 KYLIN global dictionary lock problem

As the business grows, one cube will not be able to meet the needs of the users, so it will be necessary to create more than one cube. If the same fields are used between multiple cubes, the problem of KYLIN global dictionary lock will be encountered. The current solution is to separate the scheduling time between two or more tasks, or if this is not possible, we can make a distributed lock control, where the sink side of KYLIN has to get the lock to run.

05 An outlook on the future of Kidswant

  1. Multi-source data synchronization, maybe processing for RDB sources
  2. Real-time Flink-based implementation
  3. Take over the existing collection and scheduling platform (mainly to solve the problem of splitting library and tables)
  4. Data quality verification, like some null values, the vacancy rate of the whole data, main time judgment, etc.

This is all I have to share, I hope we can communicate more with the community in the future and make progress together, thanks!

· 4 min read

1

As we know, only manageable, callable, computable, and magnetizable data resources can be deposited as assets. The interconnection of information systems has created a huge demand for multi-source and multidimensional data integration, which imposes strict requirements on data processing and integration tools.

In the era of intelligence, under the trends of “smart city”, “smart governance”, and “intelligent products”, enterprises are mostly faced with the challenge of how to achieve efficient data push, improve platform quality, and ensure data security. Only by choosing the right data integration tools and platforms can data play a key role.

As a next-generation high-performance, distributed, and massive data integration framework, Apache SeaTunnel is committed to making data synchronization simpler and more efficient and accelerating the implementation of distributed data processing capabilities in the production environment.

At the Apache SeaTunnel Meetup (April 16, 2022), the community will invite experienced Apache SeaTunnel users to share the best practices of the project in intelligent production environments. In addition, there will be contributors to analyze the source code of Apache SeaTunnel, guiding you to have a comprehensive and in-depth understanding of this powerful data integration tool.

Whether you are a beginner who is interested in Apache SeaTunnel or users who encounter complex and difficult deployment problems in daily practice, you can come here to communicate with our instructors and get the answers you want.

01 Sign up

Apache SeaTunnel Meetup | April online live registration has been started, hurry up and register!

Time: 2022–4–16 14:00–17:00

Format: live online

Click the link to register (free): https://www.slidestalk.com/m/780

Join Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-10u1eujlc-g4E~ppbinD0oKpGeoo_dAw

02 Highlights

  • Detailed case study
  • Feature Analysis
  • Tips to avoid stepping into the pit from enterprises
  • Open-source community growth strategy
  • Face-to-face Q&A with industry technical experts
  • Surprise gifts

03 Event Agenda

On the day of the event, big data engineers from Kidswant and oppo will share the front-line practical experience, and senior engineers from WhaleOps will give a “hard-core” explanation of the important function updates of Apache SeaTunnel.

2

Yuan Hongjun, Kidswant Big Data Expert, OLAP Platform Architect

Years of experience in R&D and management of big data platforms, rich research experience in data assets, data linkage, data governance, OLAP, and other fields

Time: 14:00–14:40

Topic: Application Practice of Apache SeaTunnel in Kidswant

Speech outline: How to push data efficiently? How to improve the quality of the platform? How to ensure data security? What changes did Kidswant make to Apache SeaTunnel?

3

Fan Jia, WhaleOps Senior Engineer

Time: 14:40–15:20

Topic: Clickhouse Bulk Load Implementation Based on Apache SeaTunnel

Speech outline: How to implement the bulk load data synchronization function of Clickhouse by extending the Connector of Apache SeaTunnel?

4

Wang Zichao, Oppo Senior Backend Engineer

Time: 15:50–16:30

Topic: The technological innovation of oppo intelligent recommendation sample center based on Apache SeaTunnel

Speech outline: Introduce the evolution of oppo’s intelligent recommendation machine learning sample dealing process and the role of Apache SeaTunnel in it.

In addition to the wonderful speeches, a number of lucky draw sessions were also set up on the meetup. Anyone participating in the lucky draw will have the opportunity to win exquisite customized gifts from Apache SeaTunnel, so stay tuned~

About SeaTunnel

SeaTunnel (formerly Waterdrop) is an easy-to-use, ultra-high-performance distributed data integration platform that supports real-time synchronization of massive amounts of data and can synchronize hundreds of billions of data per day in a stable and efficient manner.

Why do we need SeaTunnel?

SeaTunnel does everything it can to solve the problems you may encounter in synchronizing massive amounts of data.

  • Data loss and duplication
  • Task buildup and latency
  • Low throughput
  • Long application-to-production cycle time
  • Lack of application status monitoring

SeaTunnel Usage Scenarios

  • Massive data synchronization
  • Massive data integration
  • ETL of large volumes of data
  • Massive data aggregation
  • Multi-source data processing

Features of SeaTunnel

  • Rich components
  • High scalability
  • Easy to use
  • Mature and stable

How to get started with SeaTunnel quickly?

Want to experience SeaTunnel quickly? SeaTunnel 2.1.0 takes 10 seconds to get you up and running.

https://seatunnel.apache.org/docs/2.1.0/developement/setup

How can I contribute?

We invite all partners who are interested in making local open-source global to join the SeaTunnel contributors family and foster open-source together!

Submit an issue:

https://github.com/apache/incubator-seatunnel/issues

Contribute code to:

https://github.com/apache/incubator-seatunnel/pulls

Subscribe to the community development mailing list :

dev-subscribe@seatunnel.apache.org

Development Mailing List :

dev@seatunnel.apache.org

Join Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-123jmewxe-RjB_DW3M3gV~xL91pZ0oVQ

Follow Twitter:

https://twitter.com/ASFSeaTunnel

Come and join us!

· 6 min read

On December 9, 2021, Apache SeaTunnel(Incubating) entered the Apache Incubator, and after nearly four months of endeavor by the community contributors, we passed the first Apache version control in one go and released it on March 18, 2022. This means that version 2.1.0 is an official release that is safe for corporate and individual users to use, which has been voted on by the Apache SeaTunnel(Incubating) community and the Apache Incubator.

Note:software license is a legal instrument governing the use or redistribution of software. A typical software license grants the licensee, typically an end-user, permission to use one or more copies of the software in ways where such a use would otherwise potentially constitute copyright infringement of the software owner's exclusive rights under copyright. Effectively, a software license is a contract between the software developer and the user that guarantees the user will not be sued within the scope of the license.

Before and after entering the incubator, we spent a lot of time sorting through the external dependencies of the entire project to ensure compliance. It is important to note that the choice of License for open source software does not necessarily mean that the project itself is compliant. While the stringent version control process of ASF ensures compliance and legal distribution of the software license maximumly.

Release Note

We bring the following key featuresto this release:

  1. The kernel of the microkernel plug-in architecture is overall optimized, which is mainly in Java. And a lot of improvements are made to command line parameter parsing, plug-in loading, etc. At the same time, the users (or contributors) can choose the language to develop plug-in extensions, which greatly reduces the development threshold of plug-ins.
  2. Overall support for Flink, while the users are free to choose the underlying engine. This version also brings a large number of Flink plug-ins and welcomes anyone to contribute more.
  3. Provide local development fast startup environment support (example), allow contributors or users quickly and smoothly start without changing any code to facilitate rapid local development debugging. This is certainly exciting news for contributors or users who need to customize their plugins. In fact, we've had a large number of contributors use this approach to quickly test the plugin in our pre-release testing.
  4. With Docker container installation provided, users can deploy and install Apache SeaTunnel(Incubating) via Docker extremely fast, and we will iterate around Docker & K8s in the future, any interesting proposal on this is welcomed.

Specific release notes:

[Features]

  • Use JCommander to do command line parameter parsing, making developers focus on the logic itself.
  • Flink is upgraded from 1.9 to 1.13.5, keeping compatibility with older versions and preparing for subsequent CDC.
  • Support for Doris, Hudi, Phoenix, Druid, and other Connector plugins, and you can find complete plugin support here plugins-supported-by-seatunnel.
  • Local development extremely fast starts environment support. It can be achieved by using the example module without modifying any code, which is convenient for local debugging.
  • Support for installing and trying out Apache SeaTunnel(Incubating) via Docker containers.
  • SQL component supports SET statements and configuration variables.
  • Config module refactoring to facilitate understanding for the contributors while ensuring code compliance (License) of the project.
  • Project structure realigned to fit the new Roadmap.
  • CI&CD support, code quality automation control (more plans will be carried out to support CI&CD development).

Acknowledgments

Thanks to the following contributors who participated in this version release (GitHub IDs, in no particular order).

Al-assad, BenJFan, CalvinKirs, JNSimba, JiangTChen, Rianico, TyrantLucifer, Yves-yuan, ZhangchengHu0923, agendazhang, an-shi-chi-fan, asdf2014, bigdataf, chaozwn, choucmei, dailidong, dongzl, felix-thinkingdata, fengyuceNv, garyelephant, kalencaya, kezhenxu94, legendtkl, leo65535, liujinhui1994, mans2singh, marklightning, mosence, nielifeng, ououtt, ruanwenjun, simon824, totalo, wntp, wolfboys, wuchunfu, xbkaishui, xtr1993, yx91490, zhangbutao, zhaomin1423, zhongjiajie, zhuangchong, zixi0825.

Also sincere gratitude to our Mentors: Zhenxu Ke, Willem Jiang, William Guo, LiDong Dai, Ted Liu, Kevin, JB for their help!

Planning for the next few releases:

  • CDC support.
  • Support for the monitoring system.
  • UI system support.
  • More Connector and efficient Sink support, such as ClickHouse support will be available in the next release soon. The follow-up Features are decided by the community consensus, and we sincerely appeal to more participation in the community construction.

We need your attention and contributions:)

Community Status

Recent Development

Since entering the Apache incubator, the contributor group has grown from 13 to 55 and continues to grow, with the average weekly community commits remaining at 20+.

Three contributors from different companies (Lei Xie, HuaJie Wang, Chunfu Wu) have been invited to become Committers on account of their contributions to the community.

We held two Meetups, where instructors from Bilibili, OPPO, Vipshop, and other companies shared their large-scale production practices based on SeaTunnel in their companies (we will hold one meetup monthly in the future, and welcome SeaTunnel users or contributors to come and share their stories about SeaTunnel).

Users of Apache SeaTunnel(Incubating)

Note: Only registered users are included.

Registered users of Apache SeaTunnel(Incubating) are shown below. If you are also using Apache SeaTunnel(Incubating), too, welcome to register on Who is using SeaTunne!

PPMC's Word

LiFeng Nie, PPMC of Apache SeaTunnel(Incubating), commented on the first Apache version release.

From the first day entering Apache Incubating, we have been working hard to learn the Apache Way and various Apache policies. Although the first release took a lot of time (mainly for compliance), we think it was well worth it, and that's one of the reasons we chose to enter Apache. We need to give our users peace of mind, and Apache is certainly the best choice, with its almost demanding license control that allows users to avoid compliance issues as much as possible and ensure that the software is circulating reasonably and legally. In addition, its practice of the Apache Way, such as public service mission, pragmatism, community over code, openness and consensus decision-making, and meritocracy, can drive the Apache SeaTunnel(Incubating) community to become more open, transparent, and diverse.