跳到主要内容

· 阅读需 3 分钟

More than a month after the release of Apache SeaTunnel(Incubating) 2.1.2, we have been collecting user and developer feedback to bring you version 2.1.3. The new version introduces the Assert Sink connector, which is an inurgent need in the community, and two Transforms, NullRate and Nulltf. Some usability problems in the previous version have also been fixed, improving stability and efficiency.

This article will introduce the details of the update of Apache SeaTunnel(Incubating) version 2.1.3.

Major feature updates

Introduces Assert Sink connector

Assert Sink connector is introduced in SeaTunnel version 2.1.3to verify data correctness. Special thanks to Lhyundeadsoul for his contribution.

Add two Transforms

In addition, the 2.1.3 version also adds two Transforms, NullRate and Nulltf, which are used to detect data quality and convert null values ​​in the data to generate default values. These two Transforms can effectively improve the availability of data and reduce the frequency of abnormal situations. Special thanks to wsyhj and Interest1-wyt for their contributions.

At present, SeaTunnel has supported 9 types of Transforms including Common Options, Json, NullRate, Nulltf, Replace, Split, SQL, UDF, and UUID, and the community is welcome to contribute more Transform types.

For details of Transform, please refer to the official documentation: https://seatunnel.apache.org/docs/2.1.3/category/transform

ClickhouseFile connector supports Rsync data transfer method now

At the same time, SeaTunnel 2.1.3 version brings Rsync data transfer mode support to ClickhouseFile connector, users can now choose SCP and Rsync data transfer modes. Thanks to Emor-nj for contributing to this feature.

Specific feature updates:

Optimization

  • Refactored Spark TiDB-related parameter information
  • Refactor the code to remove redundant code warning information
  • Optimize connector jar package loading logic
  • Add Plugin Discovery module
  • Add documentation for some modules
  • Upgrade common-collection from version 4 to 4.4
  • Upgrade common-codec version to 1.13

Bug Fix

In addition, in response to the feedback from users of version 2.1.2, we also fixed some usability issues, such as the inability to use the same components of Source and Sink, and further improved the stability.

  • Fixed the problem of Hudi Source loading twice
  • Fix the problem that the field TwoPhaseCommit is not recognized after Doris 0.15
  • Fixed abnormal data output when accessing Hive using Spark JDBC
  • Fix JDBC data loss when partition_column (partition mode) is set
  • Fix KafkaTableStream schema JSON parsing error
  • Fix Shell script getting APP_DIR path error
  • Updated Flink RunMode enumeration to get correct help messages for run modes
  • Fix the same source and sink registered connector cache error
  • Fix command line parameter -t( — check) conflict with Flink deployment target parameter
  • Fix Jackson type conversion error problem
  • Fix the problem of failure to run scripts in paths other than SeaTunnel_Home

Acknowledgment

Thanks to all the contributors (GitHub ID, in no particular order,), it is your efforts that fuel the launch of this version, and we look forward to more contributions to the Apache SeaTunnel(Incubating) community!

leo65535, CalvinKirs, mans2singh, ashulin, wanghuan2054, lhyundeadsoul, tobezhou33, Hisoka-X, ic4y, wsyhj, Emor-nj, gleiyu, smallhibiscus, Bingz2, kezhenxu94, youyangkou, immustard, Interest1-wyt, superzhang0929, gaaraG, runwenjun

· 阅读需 4 分钟

After days of community development, the preliminary development of the new Connector API of SeaTunnel is completed. The next step is to adapt this new connector. In order to aid the developers to use this connector, this article provides guide to develop a new API.

Priliminary Setup

  • Environment configuration: JDK8 and Scala2.11 are recommended.

  • As before, we need to download the latest code locally through git and import it into the IDE, project address: https://github.com/apache/incubator-seatunnel . At the same time, switch the branch to api-draft, and currently use this branch to develop the new version of the API and the corresponding Connector. The project structure is as follows:

    Project Structure

Prerequisites

  • At present, in order to distinguish different Connectors, we put the connectors that support

    • Flink/Spark under the seatunnel-connectors/seatunnel-connectors-flink(spark) module.
    • New version of the Connector is placed under the seatunnel-connectors/seatunnel-connectors-seatunnel module.

    As we can see from the above figure, we have implemented Fake, Console, Kafka Connector, and Clickhouse Connector is also being implemented.

  • At present, the data type we support is SeaTunnelRow, so no matter the type of data generated by the Source or the type of data consumed by the Sink, it should be SeaTunnelRow.

Development of Connector

Taking Fake Connector as an example, let's introduce how to implement a new Connector:

  • Create a corresponding module with a path under seatunnel-connectors-seatunnel, which is at the same level as other new connectors.

  • Modify the seatunnel-connectors-seatunnel/pom.xml file, add a new module to modules, modify seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/pom.xml, add seatunnel-api dependencies, and correct parent Quote. The resulting style is as follows:

    Style

  • The next step is to create the corresponding package and related classes, create FakeSource, and need to inherit SeaTunnel Source.

    • Note : The Source of SeaTunnel adopts the design of stream and batch integration. The Source of SeaTunnel determines whether current Source is a stream or batch through attribute getBoundedness.

    So you can specify a Source as a stream or batch by dynamic configuration (refer to the default method). The configuration defined by the user in the configuration file can be obtained through the prepare method to realize the customized configuration.

    Then create FakeSourceReader, FakeSource SplitEnumerator, and FakeSourceSplit to inherit the corresponding abstract classes (which can be found in the corresponding classes). As long as we implement the corresponding methods of these classes, then our SeaTunnel Source Connector is basically completed.

  • Next, just follow the existing example to write the corresponding code. The most important one is the FakeSource Reader, which defines how we obtain data from the outside, which is the most critical part of the Source Connector. Every time a piece of data is generated, we need to place it in the collector as shown:

    Source

  • After the code development is complete, we need to configure the configuration file plugin-mapping.properties located under seatunnel-connectors/modules. Adding a seatunnel .source.FakeSource = seatunnel-connector-fake means that SeaTunnel can find the jar package corresponding to the project by looking for a Source named FakeSource. This allows the Connector to be used in the normal configuration file.

  • For a detailed description of writing Source and Sink and SeaTunnel API, please refer to the introduction at seatunnel-connectors/seatunnel-connectors-seatunnel/ README.zh.md.

Connector Testing

  • For testing, we can find the seatunnel-flink(spark)-new-connector-example module in seatunnel-examples, and test it against different engines to ensure that the performance of the Connector is as consistent as possible. If you find any discrepancies, you can mark them in the document, modify the configuration file under resource, add our Connector to the configuration, and introduce seatunnel-flink(spark)-new-connector-example/pom.xml dependency, you can execute SeaTunnelApiExample to test.
  • The default is stream processing mode, and the execution mode is switched to batch mode by modifying job.mode=BATCH in the environment of the configuration file.

Submit PR

When our Connector is ready, we can submit PR to github. After reviewing by other partners, our contributed Connector will become part of SeaTunnel!

· 阅读需 16 分钟

Apache SeaTunnel (Incubating) 与 Apache Inlong (Incubating) 的5月联合Meetup中,第二位分享的嘉宾是来自白鲸开源的高级工程师李宗文。在使用Apache SeaTunnel (Incubating) 的过程中,他发现了 Apache SeaTunnel (Incubating) 存在的四大问题:Connector实现次数多、参数不统一、难以支持多个版本的引擎以及引擎升级难的问题。为了解决以上的难题,李宗文将目标放在将Apache SeaTunnel (Incubating)与计算引擎进行解耦,重构其中Source与Sink API,实现改良了开发体验。

本次演讲主要包含四个部分:

  1. Apache SeaTunnel (Incubating)重构的背景和动机
  2. Apache SeaTunnel (Incubating)重构的目标
  3. Apache SeaTunnel (Incubating)重构整体的设计
  4. Apache SeaTunnel (Incubating) Source API的设计
  5. Apache SeaTunnel (Incubating) Sink API的设计

李宗文

白鲸开源 高级工程师

Apache SeaTunnel(Incubating)

& Flink Contributor, Flink CDC & Debezium Contributor

01 重构的背景与动机

01 Apache SeaTunnel(Incubating)与引擎耦合

用过Apache SeaTunnel (Incubating) 的小伙伴或者开发者应该知道,目前Apache SeaTunnel (Incubating) 与引擎完全耦合,完全基于Spark、Flink开发,其中的配置文件参数都基于Flink、Spark引擎。从贡献者和用户的角度出发,我们能发现一些问题。

从贡献者的角度:反复实现Connector,没有收获感;潜在贡献者由于引擎版本不一致无法贡献社区;

从用户的角度:目前很多公司采用Lambda架构,离线作业使用Spark,实时作业使用Flink, 使用中就会发现SeaTunnel 的Connector可能Spark有,但是Flink没有,以及两个引擎对于同一存储引擎的Connector的参数也不统一,有较高的使用成本,脱离了SeaTunnel简单易用的初衷;还有用户提问说目前支不支持Flink的1.14版本,按照目前SeaTunnel的架构,想要支持Flink的1.14就必须抛弃之前的版本,因此这也会对之前版本的用户造成很大的问题。

因此,我们不管是做引擎升级或者支持更多的版本的用户都很困难。

另外Spark和Flink都采用了Chandy-lamport算法实现的Checkpoint容错机制,也在内部进行了DataSet与DataStream的统一,以此为前提我们认为解耦是可行的。

02 Apache SeaTunnel(Incubating)与引擎解耦

因此为了解决以上提出的问题,我们有了以下的目标:

  1. Connector只实现一次:针对参数不统一、Connector多次实现的问题,我们希望实现一个统一的Source 与Sink API;
  2. 支持多个版本的Spark与Flink引擎:在Source与Sink API上再加入翻译层去支持多个版本与Spark和Flink引擎,解耦后这个代价会小很多。
  3. 明确Source的分片并行逻辑和Sink的提交逻辑:我们必须提供一个良好的API去支持Connector开发;
  4. 支持实时场景下的数据库整库同步:这个是目前很多用户提到需要CDC支持衍生的需求。我之前参与过Flink CDC社区,当时有许多用户提出在CDC的场景中,如果直接使用Flink CDC的话会导致每一个表都持有一个链接,当遇到需要整库同步需求时,千张表就有千个链接,该情况无论是对于数据库还是DBA都是不能接受的,如果要解决这个问题,最简单的方式就是引入Canal、Debezium等组件,使用其拉取增量数据到Kafka等MQ做中间存储,再使用Flink SQL进行同步,这实际已经违背了Flink CDC最早减少链路的想法,但是Flink CDC的定位只是一个Connector,无法做全链路的需求,所以该proposal在Flink CDC社区中没有被提出,我们借着本次重构,将proposa提交到了SeaTunnel社区中。
  5. 支持元信息的自动发现与存储:这一部分用户应该有所体验,如Kafka这类存储引擎,没有记录数据结构的功能,但我们在读取数据时又必须是结构化的,导致每次读取一个topic之前,用户都必须定义topic的结构化数据类型,我们希望做到用户只需要完成一次配置,减少重复的操作。

可能也有同学有疑惑为什么我们不直接使用Apache Beam,Beam的Source分为BOUNDED与UNBOUNDED,也就是需要实现两遍,并且有些Source与Sink的特性也不支持,具体所需的特性在后面会提到;

03 Apache SeaTunnel(Incubating)重构整体的设计

Apache SeaTunnel(Incubating) API总体结构的设计如上图;

Source & Sink API:数据集成的核心API之一,明确Source的分片并行逻辑和Sink的提交逻辑,用于实现Connector;

Engine API

Translation: 翻译层,用于将SeaTunnel的Souce与Sink API翻译成引擎内部可以运行的Connector;

Execution:执行逻辑,用于定义Source、Transform、Sink等操作在引擎内部的执行逻辑;

Table API

Table SPI:主要用于以SPI的方式暴露Source与Sink接口,并明确Connector的必填与可选参数等;

DataType:SeaTunnel的数据结构,用于隔离引擎,声明Table Schema等;

Catalog:用于获取Table Scheme、Options等;

Catalog Storage: 用于存储用户定义Kafka等非结构化引擎的Table Scheme等;

从上图是我们现在设想的执行流程

  1. 从配置文件或UI等方式获取任务参数;
  2. 通过参数从Catalog中解析得到Table Schema、Option等信息;
  3. 以SPI方式拉起SeaTunnel的Connector,并注入Table信息等;
  4. 将SeaTunnel的Connector翻译为引擎内部的Connector;
  5. 执行引擎的作业逻辑,图中的多表分发目前只存在CDC整库同步场景下,其他Connector都是单表,不需要分发逻辑;

从以上可以看出,最难的部分是如何将Apache SeaTunnel(Incubating) 的Source和Sink翻译成引擎内部的Source和Sink。

当下许多用户不仅把Apache SeaTunnel (Incubating) 当做一个数据集成方向的工具,也当做数仓方向的工具,会使用很多Spark和Flink的SQL,我们目前希望能够保留这样的SQL能力,让用户实现无缝升级。

根据我们的调研,如上图,是对Source与Sink的理想执行逻辑,由于SeaTunnel以WaterDrop孵化,所以图上的术语偏向Spark;

理想情况下,在Driver上可以运行Source和Sink的协调器,然后Worker上运行Source的Reader和Sink的Writer。在Source协调器方面,我们希望它能支持几个能力。

一、是数据的分片逻辑,可以将分片动态添加到Reader中。

二、是可以支持Reader的协调。SourceReader用于读取数据,然后将数据发送到引擎中流转,最终流转到Source Writer中进行数据写入,同时Writer可以支持二阶段事务提交,并由Sink的协调器支持Iceberg等Connector的聚合提交需求;

04 Source API

通过我们的调研,发现Source所需要的以下特性:

  1. 统一离线和实时API:Source只实现一次,同时支持离线和实时;
  2. 能够支持并行读取:比如Kafka每一个分区都生成一个的读取器,并行的执行;
  3. 支持动态添加分片:比如Kafka定于一个topic正则,由于业务量的需求,需要新增一个topic,该Source API可以支持我们动态添加到作业中。
  4. 支持协调读取器的工作:这个目前只发现在CDC这种Connector需要支持。CDC目前都是基于Netfilx的DBlog并行算法去支持,该情况在全量同步和增量同步两个阶段的切换时需要协调读取器的工作。
  5. 支持单个读取器处理多张表:即由前面提到的支持实时场景下的数据库整库同步需求;

对应以上需求,我们做出了基础的API,如上图,目前代码以提交到Apache SeaTunnel(Incubating)的社区中api-draft分支,感兴趣的可以查看代码详细了解。

如何适配Spark和Flink引擎

Flink与Spark都在后面统一了DataSet与DataStream API,即能够支持前两个特性,那么对于剩下的3个特性:

  • 如何支持动态添加分片?
  • 如何支持协调读取器?
  • 如何支持单个读取器处理多张表?

带着问题,进入目前的设计。

我们发现除了CDC之外,其他Connector是不需要协调器的,针对不需要协调器的,我们会有一个支持并行的Source,并进行引擎翻译。

如上图中左边是一个分片的enumerator,可以列举source需要哪些分片,有哪些分片,实时进行分片的枚举,随后将每个分片分发到真正的数据读取模块SourceReader中。对于离线与实时作业的区分使用Boundedness标记,Connector可以在分片中标记是否有停止的Offset,如Kafka可以支持实时,同时也可以支持离线。ParallelSource可以在引擎设置任意并行度,以支持并行读取。

在需要协调器的场景,如上图,需要在Reader和Enumerator之间进行Event传输, Enumerator通过Reader发送的Event进行协调工作。Coordinated Source需要在引擎层面保证单并行度,以保证数据的一致性;当然这也不能良好的使用引擎的内存管理机制,但是取舍是必要的;

对于最后一个问题,我们如何支持单个读取器处理多张表。这会涉及到Table API层,通过Catalog读取到了所有需要的表后,有些表可能属于一个作业,可以通过一个链接去读取,有些可能需要分开,这个依赖于Source是怎么实现的。基于这是一个特殊需求,我们想要减少普通开发者的难度,在Table API这一层,我们会提供一个SupportMultipleTable接口,用于声明Source支持多表的读取。Source在实现时,要根据多张表实现对应的反序列化器。针对衍生的多表数据如何分离,Flink将采用Side Output机制,Spark预想使用Filter或Partition机制。

05 Sink API

目前Sink所需的特性并不是很多,经过调研目前发现有三个需求

  1. 幂等写入,这个不需要写代码,主要看存储引擎是否能支持。
  2. 分布式事务,主流是二阶段提交,如Kafka都是可以支持分布式事务的。
  3. 聚合提交,对于Iceberg、hoodie等存储引擎而言,我们不希望有小文件问题,于是期望将这些文件聚合成一个文件,再进行提交。

基于以上三个需求,我们有对应的三个API,分别是SinkWriter、SinkCommitter、SinkAggregated Committer。SinkWriter是作为基础写入,可能是幂等写入,也可能不是。SinkCommitter支持二阶段提交。SinkAggregatedCommitter支持聚合提交。

理想状态下,AggregatedCommitter单并行的在Driver中运行,Writer与Committer运行在Worker中,可能有多个并行度,每个并行度都有自己的预提交工作,然后把自己提交的信息发送给Aggregated Committer再进行聚合。

目前Spark和Flink的高版本都支持在Driver(Job Manager)运行AggregatedCommitter,worker(Job Manager)运行writer和Committer。

但是对于Flink低版本,无法支持AggregatedCommitter在JM中运行,我们也进行翻译适配的设计。Writer与Committer会作为前置的算子,使用Flink的ProcessFunction进行包裹,支持并发的预提交与写入工作,基于Flink的Checkpoint机制实现二阶段提交,这也是目前Flink的很多Connector的2PC实现方式。这个ProcessFunction会将预提交信息发送到下游的Aggregated Committer中,Aggregated Committer可以采用SinkFunction或Process Function等算子包裹,当然,我们需要保证AggregatedCommitter只会启动一个,即单并行度,否则聚合提交的逻辑就会出现问题。

感谢各位的观看,如果大家对具体实现感兴趣,可以去 Apache SeaTunnel (Incubating) 的社区查看api-draft分支代码,谢谢大家。

· 阅读需 10 分钟

作者 | Apache SeaTunnel(Incubating) Contributor 范佳

整理 | 测试工程师 冯秀兰

对于百亿级批数据的导入,传统的 JDBC 方式在一些海量数据同步场景下的表现并不尽如人意。为了提供更快的写入速度,Apache SeaTunnel(Incubating) 在刚刚发布的 2.1.1 版本中提供了 ClickhouseFile-Connector 的支持,以实现 Bulk load 数据写入。

Bulk load 指把海量数据同步到目标 DB 中,目前 SeaTunnel 已实现数据同步到 ClickHouse 中。

在 Apache SeaTunnel(Incubating) 4 月 Meetup 上,Apache SeaTunnel(Incubating) Contributor 范佳分享了《基于 SeaTunnel 的 ClickHouse bulk load 实现》,详细讲解了 ClickHouseFile 高效处理海量数据的具体实现原理和流程。

感谢本文整理志愿者 测试工程师 冯秀兰 对 Apache SeaTunnel(Incubating) 项目的支持!

本次演讲主要包含七个部分:

  • ClickHouse Sink 现状
  • ClickHouse Sink 弱场景
  • ClickHouseFile 插件介绍
  • ClickHouseFile 核心技术点
  • ClickHouseFile 插件的实现解析
  • 插件能力对比
  • 后期优化方向

​范 佳白鲸开源 高级工程师

01 ClickHouse Sink 现状

现阶段,SeaTunnel 把数据同步到 ClickHouse 的流程是:只要是 SeaTunnel 支持的数据源,都可以把数据抽取出来,抽取出来之后,经过转换(也可以不转换),直接把源数据写入 ClickHouse sink connector 中,再通过 JDBC 写入到 ClickHouse 的服务器中。

但是,通过传统的 JDBC 写入到 ClickHouse 服务器中会存在一些问题。

首先,现阶段使用的工具是 ClickHouse 提供的驱动,实现方式是通过 HTTP,然而 HTTP 在某些场景下,实现效率不高。其次是海量数据,如果有重复数据或者一次性写入大量数据,使用传统的方式是生成对应的插入语句,通过 HTTP 发送到 ClickHouse 服务器端,在服务器端来进行逐条或分批次解析、执行,无法实现数据压缩。

最后就是我们通常会遇到的问题,数据量过大可能导致 SeaTunnel 端 OOM,或者服务器端因为写入数据量过大,频率过高,导致服务器端挂掉。

于是我们思考,是否有比 HTTP 更快的发送方式?如果可以在 SeaTunnel 端做数据预处理或数据压缩,那么网络带宽压力会降低,传输速率也会提高。

02 ClickHouse Sink 的弱场景

如果使用 HTTP 传输协议,当数据量过大,批处理以微批的形式发送请求,HTTP 可能处理不过来;

太多的 insert 请求,服务器压力大。假设带宽可以承受大量的请求,但服务器端不一定能承载。线上的服务器不仅需要数据插入,更重要的是查询数据为其他业务团队使用。若因为插入数据过多导致服务器集群宕机,是得不偿失的。

03 ClickHouse File 核心技术点

针对这些 ClickHouse 的弱场景,我们想,有没有一种方式,既能在 Spark 端就能完成数据压缩,还可以在数据写入时不增加 Server 的资源负载,并且能快速写入海量数据?于是我们开发了 ClickHouseFile 插件来满足这些需求。

ClickHouseFile 插件的关键技术是 ClickHouse -local。ClickHouse-local 模式可以让用户能够对本地文件执行快速处理,而无需部署和配置 ClickHouse 服务器。ClickHouse-local 使用与 ClickHouse Server 相同的核心,因此它支持大多数功能以及相同的格式和表引擎。

因为有这 2 个特点,这意味着用户可以直接处理本地文件,而无需在 ClickHouse 服务器端做处理。由于是相同的格式,我们在远端或者 SeaTunnel 端进行的操作所产生的数据和服务器端是无缝兼容的,可以使用 ClickHouse local 来进行数据写入。ClickHouse local 是实现 ClickHouseFile 的核心技术点,因为有了这个插件,现阶段才能够实现 ClickHouse file 连接器。

ClickHouse local 核心使用方式:

第一行:将数据通过 Linux 管道传递给 ClickHouse-local 程序的 test_table 表。

第二至五行:创建一个 result_table 表用于接收数据。

第六行:将数据从 test_table 到 result_table 表。

第七行:定义数据处理的磁盘路径。

通过调用 Clickhouse-local 组件,实现在 Apache SeaTunnel(Incubating) 端完成数据文件的生成,以及数据压缩等一系列操作。再通过和 Server 进行通信,将生成的数据直接发送到 Clickhouse 的不同节点,再将数据文件提供给节点查询。

原阶段和现阶段实现方式对比:

原来是 Spark 把数据包括 insert 语句,发送给服务器端,服务器端做 SQL 的解析,表的数据文件生成、压缩,生成对应的文件、建立对应索引。若使用 ClickHouse local 技术,则由 SeaTunnel 端做数据文件的生成、文件压缩,以及索引的创建,最终产出就是给服务器端使用的文件或文件夹,同步给服务器后,服务器就只需对数据查询,不需要做额外的操作。

04 核心技术点

以上流程可以促使数据同步更加高效,得益于我们对其中的三点优化。

第一,数据实际上师从管道传输到 ClickHouseFile,在长度和内存上会有限制。为此,我们将 ClickHouse connector,也就是 sink 端收到的数据通过 MMAP 技术写入临时文件,再由 ClickHouse local 读取临时文件的数据,生成我们的目标 local file,以达到增量读取数据的效果,解决 OM 的问题。

第二,支持分片。因为如果在集群中使用,如果只生成一个文件或文件夹,实际上文件只分发到一个节点上,会大大降低查询的性能。因此,我们进行了分片支持,用户可以在配置文件夹中设置分片的 key,算法会将数据分为多个 log file,写入到不同的集群节点中,大幅提升读取性能。

第三个重要的优化是文件传输,目前 SeaTunnel 支持两种文件传输方式,一种是 SCP,其特点是安全、通用、无需额外配置;另一种是 RSYNC,其有点事快速高效,支持断点续传,但需要额外配置,用户可以根据需要选择适合自己的方式。

05 插件实现解析

概括而言,ClickHouseFile 的总体实现流程如下:

  • 缓存数据,缓存到 ClickHouse sink 端;
  • 调用本地的 ClickHouse-local 生成文件;
  • 将数据发送到 ClickHouse 服务端;
  • 执行 ATTACH 命令

通过以上四个步骤,生成的数据达到可查询的状态。

06 插件能力对比

从数据传输角度来说,ClickHouseFile 更适用于海量数据,优势在于不需要额外的配置,通用性强,而 ClickHouseFile 配置比较复杂,目前支持的 engine 较少;

就环境复杂度来说,ClickHouse 更适合环境复杂度高的情况,不需要额外配置就能直接运行;

在通用性上,ClickHouse 由于是 SeaTunnel 官方支持的 JDBC diver,基本上支持所有的 engine 的数据写入,ClickHouseFile 支持的 engine 相对较少;从服务器压力方面来说,ClickHouseFile 的优势在海量数据传输时就体现出来了,不会对服务器造成太大的压力。

但这二者并不是竞争关系,需要根据使用场景来选择。

07 后续计划

目前虽然 SeaTunnel 支持 ClickHouseFile 插件,但是还有很多地方需要优化,主要包括:

  • Rsync 支持;
  • Exactly-Once 支持;
  • 支持 Zero Copy 传输数据文件;
  • 更多 Engine 的支持

· 阅读需 14 分钟

在Apache SeaTunnel(Incubating) 4 月Meetup上,孩子王大数据专家、OLAP平台架构师 袁洪军 为我们带来了《Apache SeaTunnel (Incubating)在孩子王的应用实践》。

本次演讲主要包含五个部分:

  • 孩子王引入Apache SeaTunnel (Incubating)的背景介绍

  • 大数据处理主流工具对比分析

  • Apache SeaTunnel (Incubating)的落地实践

  • Apache SeaTunnel (Incubating)改造中的常见问题

  • 对孩子王未来发展方向的预测展望

袁洪军

孩子王 大数据专家、OLAP 平台架构师。多年大数据平台研发管理经验,在数据资产、血缘图谱、数据治理、OLAP 等领域有着丰富的研究经验。

01 背景介绍

目前孩子王的OLAP平台主要包含元数据层、任务层、存储层、SQL层、调度层、服务层以及监控层七部分,本次分享主要关注任务层中的离线任务。

其实孩子王内部有一套完整的采集推送系统,但由于一些历史遗留问题,公司现有的平台无法快速支持OLAP平台上线,因此当时公司只能选择放弃自身的平台,转而着手研发新的系统。

当时摆在OLAP面前的有三个选择:

1、给予采集推送系统做二次研发;

2、完全自研;

3、参与开源项目。

02 大数据处理主流工具对比分析

而这三项选择却各有优劣。若采基于采集推送做二次研发,其优点是有前人的经验,能够避免重复踩坑。但缺点是代码量大,研读时间、研读周期较长,而且抽象代码较少,与业务绑定的定制化功能较多,这也导致了其二开的难度较大。

若完全自研,其优点第一是开发过程自主可控,第二是可以通过Spark等一些引擎做贴合我们自身的架构,但缺点是可能会遭遇一些未知的问题。

最后如果使用开源框架,其优点一是抽象代码较多,二是经过其他大厂或公司的验证,框架在性能和稳定方面能够得到保障。因此孩子王在OLAP数据同步初期,我们主要研究了DATAX、Sqoop和SeaTunnel这三个开源数据同步工具。

从脑图我们可以看到,Sqoop的主要功能是针对RDB的数据同步,其实现方式是基于MAP/REDUCE。Sqoop拥有丰富的参数和命令行可以去执行各种操作。Sqoop的优点在于它首先贴合Hadoop生态,并已经支持大部分RDB到HIVE任意源的转换,拥有完整的命令集和API的分布式数据同步工具。

但其缺点是Sqoop只支持RDB的数据同步,并且对于数据文件有一定的限制,以及还没有数据清洗的概念。

DataX的主要功能是任意源的数据同步,通过配置化文件+多线程的方式实现,主要分为三个流程:Reader、Framework和Writer,其中Framework主要起到通信和留空的作用。

DataX的优点是它采用了插件式的开发,拥有自己的流控和数据管控,在社区活跃度上,DataX的官网上提供了许多不同源的数据推送。但DataX的缺点在于它基于内存,对数据量可能存在限制。

Apache SeaTunnel (Incubating)做的也是任意源的数据同步,实现流程分为source、transform和sink三步,基于配置文件、Spark或Flink实现。其优点是目前官网2.1.0有非常多的插件和源的推送,基于插件式的思想也使其非常容易扩展,拥抱Spark和Flink的同时也做到了分布式的架构。要说Apache SeaTunnel (Incubating)唯一的缺点可能是目前缺少IP的调用,UI界面需要自己做管控。

综上所述,Sqoop虽然是分布式,但是仅支持RDB和HIVE、Hbase之间的数据同步且扩展能力差,不利于二开。DataX扩展性好,整体性稳定,但由于是单机版,无法分布式集群部署,且数据抽取能力和机器性能有强依赖关系。而SeaTunnel和DataX类似并弥补了DataX非分布式的问题,对于实时流也做了很好的支持,虽然是新产品,但社区活跃度高。基于是否支持分布式、是否需要单独机器部署等诸多因素的考量,最后我们选择了SeaTunnel。

03 Apache SeaTunnel (Incubating)的落地实践

在Apache SeaTunnel (Incubating)的官网我们可以看到Apache SeaTunnel (Incubating)的基础流程包括source、transform和sink三部分。根据官网的指南,Apache SeaTunnel (Incubating)的启动需要配置脚本,但经过我们的研究发现,Apache SeaTunnel (Incubating)的最终执行是依赖config文件的spark-submit提交的一个Application应用。

这种初始化方式虽然简单,但存在必须依赖Config文件的问题,每次运行任务后都会生成再进行清除,虽然可以在调度脚本中动态生成,但也产生了两个问题。1、频繁的磁盘操作是否有意义;2、是否存在更为高效的方式支持Apache SeaTunnel (Incubating)的运行。

基于以上考量,在最终的设计方案中,我们增加了一个统一配置模板平台模块。调度时只需要发起一个提交命令,由Apache SeaTunnel (Incubating)自身去统一配置模板平台中拉取配置信息,再去装载和初始化参数。

上图展示的便是孩子王OLAP的业务流程,主要分为三块。数据从Parquet,即Hive,通过Parquet表的方式到KYLIN和CK source的整体流程。

这是我们建模的页面,主要通过拖拉拽的方式生成最终模型,每个表之间通过一些交易操作,右侧是针对Apache SeaTunnel (Incubating)的微处理。

因此我们最终提交的命令如上,其中标红的首先是【-conf customconfig/jars】,指用户可以再统一配置模板平台进行处理,或者建模时单独指定。最后标红的【421 $start_time $end_time $taskType】Unicode,属于唯一编码。

下方图左就是我们最终调度脚本提交的38个命令,下方图右是针对Apache SeaTunnel (Incubating)做的改造,可以看到一个较为特殊的名为WaterdropContext的工具类。可以首先判断Unicode是否存在,再通过Unicode_code来获取不同模板的配置信息,避免了config文件的操作。

在最后的reportMeta则是用于在任务执行完成后上报一些信息,这也会在Apache SeaTunnel (Incubating)中完成。

在最终完成的config文件如上,值得注意的是在transform方面,孩子王做了一些改造。首先是针对手机或者身份证号等做脱敏处理,如果用户指定字段,就按照字段做,如果不指定字段就扫描所有字段,然后根据模式匹配,进行脱敏加密。

第二transform还支持自定义处理,如上文说道OLAP建模的时候说到。加入了HideStr,可以保留一串字符的前十个字段,加密后方的所有字符,在数据安全上有所保障。

然后,在sink端,我们为了支持任务的幂等性,我们加入了pre_sql,这主要完成的任务是数据的删除,或分区的删除,因为任务在生产过程中不可能只运行一次,一旦出现重跑或补数等操作,就需要这一部分为数据的不同和正确性做考量。

在图右方的一个Clickhouse的Sink端,这里我们加入了一个is_senseless_mode,它组成了一个读写分离的无感模式,用户在查询和补数的时候不感知整体区域,而是用到CK的分区转换,即名为MOVE PARTITION TO TABLE的命令进行操作的。

此处特别说明KYLIN的Sink端,KYLIN是一个非常特殊的源,拥有自己一整套数据录入的逻辑,而且,他有自己的监控页面,因此我们给予KYLIN的改造只是简单地调用其API操作,在使用KYLIN时也只是简单的API调用和不断轮询的状态,所以KYLIN这块的资源在统一模板配置平台就被限制地很小。

04 Apache SeaTunnel (Incubating)改造中的常见问题

1、OOM&Too many Parts

问题通常会出现在Hive到Hive的过程中,即使我们通过了自动资源的分配,但也存在数据突然间变大的情况,比如在举办了多次活动之后。这样的问题其实只能通过手动动态地调参,调整数据同步批量时间来避免。未来我们可能尽力去完成对于数据量的掌握,做到精细的控制。

2、字段、类型不一致问题

模型上线后,任务依赖的上游表或者字段,用户都会做一些修改,这些修改若无法感知,可能导致任务的失败。目前解决方法是依托血缘+快照的方式进行提前感知来避免错误。

3、自定义数据源&自定义分隔符

如财务部门需要单独使用的分割符,或是jar信息,现在用户可以自己在统一配置模板平台指定加载额外jar信息以及分割符信息。

4、数据倾斜问题

这可能因为用户自己设置了并行度,但无法做到尽善尽美。这一块我们暂时还没有完成处理,后续的思路可能在Source模块中添加post处理,对数据进行打散,完成倾斜。

5、KYLIN全局字典锁问题

随着业务发展,一个cube无法满足用户使用,就能需要建立多个cube,如果多个cube之间用了相同的字段,就会遇到KYLIN全局字典锁的问题。目前解决的思路是把两个或多个任务之间的调度时间进行隔开,如果无法隔开,可以做一个分布式锁的控制。KYLIN的sink端必须要拿到锁才能运行。

05 对孩子王未来发展方向的预测展望

  • 多源数据同步,未来可能针对RDB源进行处理

  • 基于实时Flink的实现

  • 接管已有采集调度平台(主要解决分库分表的问题)

  • 数据质量校验,像一些空值、整个数据的空置率、主时间的判断等

我的分享就到这里,希望以后可以和社区多多交流,共同进步,谢谢!

· 阅读需 5 分钟

1

可管理,可调用,可计算,可变现的数据资源才能成为资产,信息系统的互联互通使得多源和多维度的数据集成需求巨大,这就对数据处理和集成的工具提出了严苛的要求。

智能化时代,在“智慧城市”、“智慧治理”、“产品智能化”等的趋势下,企业大多面临如何实现高效数据推送,提高平台质量,以及保障数据安全的挑战。选对数据集成工具和平台,数据才能发挥出做大的作用。

Apache SeaTunnel (Incubating) 作为下一代高性能、分布式、海量数据集成框架,致力于让数据同步更简单,更高效,加快分布式数据处理能力在生产环境落地。

在 Apache SeaTunnel(Incubating) Meetup(2022 年 4 月 16日),Apache SeaTunnel(Incubating) 社区将邀请了 Apache SeaTunnel(Incubating)的资深用户,分享 Apache SeaTunnel(Incubating)在智能化生产环境中落地的最佳实践。此外,还会有贡献者现场进行 Apache SeaTunnel(Incubating)的源码解析,让你对 Apache SeaTunnel(Incubating)有一个更加全面而深入的了解。

无论你是对 Apache SeaTunnel(Incubating)抱有兴趣的初学者,还是在日常的生产实践中遭遇了复杂棘手的部署问题,都可以来到这里,与我们的讲师近距离沟通,得到你想要的答案。

01 报 名 通 道

Apache SeaTunnel (Incubating) Meetup | 4 月线上直播报名通道已开启,赶快预约吧!

时间:2022-4-16 14:00-17:00

形式:线上直播

点击链接或扫码预约报名(免费):

2

扫码预约报名

3

扫码进直播群

02 活 动 亮 点

  • 行业案例详解
  • 特色功能分析
  • 一线企业踩坑心得
  • 开源社区实战攻略
  • 行业技术专家面对面 Q&A
  • 惊喜礼品送不停

03 活 动 议 程

活动当天,将有来自孩子王、oppo 的工程师现场分享来自厂商的一线前沿实践经验,还有来自白鲸开源的高级工程师对 Apache SeaTunnel(Incubating)的重要功能更新进行“硬核”讲解,干货满满。

4

袁洪军 孩子王 大数据专家、OLAP 平台架构师

多年大数据平台研发管理经验,在数据资产、血缘图谱、数据治理、OLAP 等领域有着丰富的研究经验

演讲时间:14:00-14:40

演讲题目:Apache SeaTunnel(Incubating) 在孩子王的应用实践

演讲概要: 如何实现高效数据推送?如何提高平台质量?如何保障数据安全?孩子王又对 Apache SeaTunnel(Incubating)做了哪些改造?

6

范佳 白鲸开源  高级工程师 Apache SeaTunnel Contributor

演讲时间: 14:40-15:20

演讲题目: 基于 Apache SeaTunnel(Incubating)的 Clickhouse Bulk Load 实现

演讲概要: 通过扩展 Apache SeaTunnel(Incubating)的 Connector实现 Clickhouse的 bulk load 数据同步功能。

7

王子超 oppo 高级后端工程师

演讲时间: 15:50-16:30

演讲题目: oppo智能推荐样本中心基于 Apache SeaTunnel(Incubating)的技术革新

演讲概要: 介绍 oppo 智能推荐机器学习样本流程的演进及 Apache  SeaTunnel(Incubating) 在其中发挥的作用。

除了精彩的演讲之外,现场还设置了多个抽奖环节,参与抽奖有机会获得 Apache SeaTunnel(Incubating) 精美定制礼品,敬请期待~

· 阅读需 6 分钟

On December 9, 2021, Apache SeaTunnel(Incubating) entered the Apache Incubator, and after nearly four months of endeavor by the community contributors, we passed the first Apache version control in one go and released it on March 18, 2022. This means that version 2.1.0 is an official release that is safe for corporate and individual users to use, which has been voted on by the Apache SeaTunnel(Incubating) community and the Apache Incubator.

Note:software license is a legal instrument governing the use or redistribution of software. A typical software license grants the licensee, typically an end-user, permission to use one or more copies of the software in ways where such a use would otherwise potentially constitute copyright infringement of the software owner's exclusive rights under copyright. Effectively, a software license is a contract between the software developer and the user that guarantees the user will not be sued within the scope of the license.

Before and after entering the incubator, we spent a lot of time sorting through the external dependencies of the entire project to ensure compliance. It is important to note that the choice of License for open source software does not necessarily mean that the project itself is compliant. While the stringent version control process of ASF ensures compliance and legal distribution of the software license maximumly.

Release Note

We bring the following key featuresto this release:

  1. The kernel of the microkernel plug-in architecture is overall optimized, which is mainly in Java. And a lot of improvements are made to command line parameter parsing, plug-in loading, etc. At the same time, the users (or contributors) can choose the language to develop plug-in extensions, which greatly reduces the development threshold of plug-ins.
  2. Overall support for Flink, while the users are free to choose the underlying engine. This version also brings a large number of Flink plug-ins and welcomes anyone to contribute more.
  3. Provide local development fast startup environment support (example), allow contributors or users quickly and smoothly start without changing any code to facilitate rapid local development debugging. This is certainly exciting news for contributors or users who need to customize their plugins. In fact, we've had a large number of contributors use this approach to quickly test the plugin in our pre-release testing.
  4. With Docker container installation provided, users can deploy and install Apache SeaTunnel(Incubating) via Docker extremely fast, and we will iterate around Docker & K8s in the future, any interesting proposal on this is welcomed.

Specific release notes:

[Features]

  • Use JCommander to do command line parameter parsing, making developers focus on the logic itself.
  • Flink is upgraded from 1.9 to 1.13.5, keeping compatibility with older versions and preparing for subsequent CDC.
  • Support for Doris, Hudi, Phoenix, Druid, and other Connector plugins, and you can find complete plugin support here plugins-supported-by-seatunnel.
  • Local development extremely fast starts environment support. It can be achieved by using the example module without modifying any code, which is convenient for local debugging.
  • Support for installing and trying out Apache SeaTunnel(Incubating) via Docker containers.
  • SQL component supports SET statements and configuration variables.
  • Config module refactoring to facilitate understanding for the contributors while ensuring code compliance (License) of the project.
  • Project structure realigned to fit the new Roadmap.
  • CI&CD support, code quality automation control (more plans will be carried out to support CI&CD development).

Acknowledgments

Thanks to the following contributors who participated in this version release (GitHub IDs, in no particular order).

Al-assad, BenJFan, CalvinKirs, JNSimba, JiangTChen, Rianico, TyrantLucifer, Yves-yuan, ZhangchengHu0923, agendazhang, an-shi-chi-fan, asdf2014, bigdataf, chaozwn, choucmei, dailidong, dongzl, felix-thinkingdata, fengyuceNv, garyelephant, kalencaya, kezhenxu94, legendtkl, leo65535, liujinhui1994, mans2singh, marklightning, mosence, nielifeng, ououtt, ruanwenjun, simon824, totalo, wntp, wolfboys, wuchunfu, xbkaishui, xtr1993, yx91490, zhangbutao, zhaomin1423, zhongjiajie, zhuangchong, zixi0825.

Also sincere gratitude to our Mentors: Zhenxu Ke, Willem Jiang, William Guo, LiDong Dai, Ted Liu, Kevin, JB for their help!

Planning for the next few releases:

  • CDC support.
  • Support for the monitoring system.
  • UI system support.
  • More Connector and efficient Sink support, such as ClickHouse support will be available in the next release soon. The follow-up Features are decided by the community consensus, and we sincerely appeal to more participation in the community construction.

We need your attention and contributions:)

Community Status

Recent Development

Since entering the Apache incubator, the contributor group has grown from 13 to 55 and continues to grow, with the average weekly community commits remaining at 20+.

Three contributors from different companies (Lei Xie, HuaJie Wang, Chunfu Wu) have been invited to become Committers on account of their contributions to the community.

We held two Meetups, where instructors from Bilibili, OPPO, Vipshop, and other companies shared their large-scale production practices based on SeaTunnel in their companies (we will hold one meetup monthly in the future, and welcome SeaTunnel users or contributors to come and share their stories about SeaTunnel).

Users of Apache SeaTunnel(Incubating)

Note: Only registered users are included.

Registered users of Apache SeaTunnel(Incubating) are shown below. If you are also using Apache SeaTunnel(Incubating), too, welcome to register on Who is using SeaTunne!

PPMC's Word

LiFeng Nie, PPMC of Apache SeaTunnel(Incubating), commented on the first Apache version release.

From the first day entering Apache Incubating, we have been working hard to learn the Apache Way and various Apache policies. Although the first release took a lot of time (mainly for compliance), we think it was well worth it, and that's one of the reasons we chose to enter Apache. We need to give our users peace of mind, and Apache is certainly the best choice, with its almost demanding license control that allows users to avoid compliance issues as much as possible and ensure that the software is circulating reasonably and legally. In addition, its practice of the Apache Way, such as public service mission, pragmatism, community over code, openness and consensus decision-making, and meritocracy, can drive the Apache SeaTunnel(Incubating) community to become more open, transparent, and diverse.

· 阅读需 8 分钟

ClickHouse 是面向 OLAP 的分布式列式 DBMS。我们部门目前已经把所有数据分析相关的日志数据存储至 ClickHouse 这个优秀的数据仓库之中,当前日数据量达到了 300 亿。

之前介绍的有关数据处理入库的经验都是基于实时数据流,数据存储在 Kafka 中,我们使用 Java 或者 Golang 将数据从 Kafka 中读取、解析、清洗之后写入 ClickHouse 中,这样可以实现数据的快速接入。然而在很多同学的使用场景中,数据都不是实时的,可能需要将 HDFS 或者是 Hive 中的数据导入 ClickHouse。有的同学通过编写 Spark 程序来实现数据的导入,那么是否有更简单、高效的方法呢。

目前开源社区上有一款工具 Seatunnel,项目地址 https://github.com/apache/incubator-seatunnel,可以快速地将 HDFS 中的数据导入 ClickHouse。

HDFS To ClickHouse

假设我们的日志存储在 HDFS 中,我们需要将日志进行解析并筛选出我们关心的字段,将对应的字段写入 ClickHouse 的表中。

Log Sample

我们在 HDFS 中存储的日志格式如下, 是很常见的 Nginx 日志

10.41.1.28 github.com 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:03:09:32 +0800] "GET /Apache/Seatunnel HTTP/1.1" 200 0 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)" "196" "-" "mainpage" "443" "-" "172.16.181.129"

ClickHouse Schema

我们的 ClickHouse 建表语句如下,我们的表按日进行分区

CREATE TABLE cms.cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32,
pool String
) ENGINE = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 16384

Seatunnel with ClickHouse

接下来会给大家详细介绍,我们如何通过 Seatunnel 满足上述需求,将 HDFS 中的数据写入 ClickHouse 中。

Seatunnel

Seatunnel 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Seatunnel 拥有着非常丰富的插件,支持从 Kafka、HDFS、Kudu 中读取数据,进行各种各样的数据处理,并将结果写入 ClickHouse、Elasticsearch 或者 Kafka 中。

Prerequisites

首先我们需要安装 Seatunnel,安装十分简单,无需配置系统环境变量

  1. 准备 Spark 环境
  2. 安装 Seatunnel
  3. 配置 Seatunnel

以下是简易步骤,具体安装可以参照 Quick Start

cd /usr/local

wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

wget https://github.com/InterestingLab/seatunnel/releases/download/v1.1.1/seatunnel-1.1.1.zip

unzip seatunnel-1.1.1.zip

cd seatunnel-1.1.1
vim config/seatunnel-env.sh

# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

seatunnel Pipeline

我们仅需要编写一个 seatunnel Pipeline 的配置文件即可完成数据的导入。

配置文件包括四个部分,分别是 Spark、Input、filter 和 Output。

Spark

这一部分是 Spark 的相关配置,主要配置 Spark 执行时所需的资源大小。

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从 HDFS 文件中读取 text 格式数据的配置案例。

input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
}
}

Filter

在 Filter 部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将 HTTPDATE 转化为 ClickHouse 支持的日期格式、对 Number 类型的字段进行类型转换以及通过 SQL 进行字段筛减等

filter {
# 使用正则解析原始日志
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
}

# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# "yyyy/MM/dd HH:mm:ss"格式的数据
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
}

# 使用SQL筛选关注的字段,并对字段进行处理
# 甚至可以通过过滤条件过滤掉不关心的数据
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
}
}

Output

最后我们将处理好的结构化数据写入 ClickHouse

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Running seatunnel

我们将上述四部分配置组合成为我们的配置文件 config/batch.conf

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
}
}

filter {
# 使用正则解析原始日志
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
}

# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# "yyyy/MM/dd HH:mm:ss"格式的数据
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
}

# 使用SQL筛选关注的字段,并对字段进行处理
# 甚至可以通过过滤条件过滤掉不关心的数据
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
}
}

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

执行命令,指定配置文件,运行 Seatunnel,即可将数据写入 ClickHouse。这里我们以本地模式为例。

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

Conclusion

在这篇文章中,我们介绍了如何使用 Seatunnel 将 HDFS 中的 Nginx 日志文件导入 ClickHouse 中。仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码。除了支持 HDFS 数据源之外,Seatunnel 同样支持将数据从 Kafka 中实时读取处理写入 ClickHouse 中。我们的下一篇文章将会介绍,如何将 Hive 中的数据快速导入 ClickHouse 中。

当然,Seatunnel 不仅仅是 ClickHouse 数据写入的工具,在 Elasticsearch 以及 Kafka等 数据源的写入上同样可以扮演相当重要的角色。

希望了解 Seatunnel 和 ClickHouse、Elasticsearch、Kafka 结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

-- Power by InterestingLab

· 阅读需 6 分钟

ClickHouse是面向OLAP的分布式列式DBMS。我们部门目前已经把所有数据分析相关的日志数据存储至ClickHouse这个优秀的数据仓库之中,当前日数据量达到了300亿。

在之前的文章 如何快速地把HDFS中的数据导入ClickHouse 中我们提到过使用 Seatunnel https://github.com/apache/incubator-seatunnel 对HDFS中的数据经过很简单的操作就可以将数据写入ClickHouse。HDFS中的数据一般是非结构化的数据,那么针对存储在Hive中的结构化数据,我们应该怎么操作呢?

Hive to ClickHouse

假定我们的数据已经存储在Hive中,我们需要读取Hive表中的数据并筛选出我们关心的字段,或者对字段进行转换,最后将对应的字段写入ClickHouse的表中。

Hive Schema

我们在Hive中存储的数据表结构如下,存储的是很常见的Nginx日志

CREATE TABLE `nginx_msg_detail`(
`hostname` string,
`domain` string,
`remote_addr` string,
`request_time` float,
`datetime` string,
`url` string,
`status` int,
`data_size` int,
`referer` string,
`cookie_info` string,
`user_agent` string,
`minute` string)
PARTITIONED BY (
`date` string,
`hour` string)

ClickHouse Schema

我们的ClickHouse建表语句如下,我们的表按日进行分区

CREATE TABLE cms.cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32
) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostname) SETTINGS index_granularity = 16384

Seatunnel with ClickHouse

接下来会给大家介绍,我们如何通过 Seatunnel 将Hive中的数据写入ClickHouse中。

Seatunnel

Seatunnel 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Seatunnel 拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中。

Seatunnel的环境准备以及安装步骤这里就不一一赘述了,具体安装步骤可以参考上一篇文章或者访问 Seatunnel Docs

Seatunnel Pipeline

我们仅需要编写一个Seatunnel Pipeline的配置文件即可完成数据的导入。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
// 这个配置必需填写
spark.sql.catalogImplementation = "hive"
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从Hive文件中读取text格式数据的配置案例。

input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}

看,很简单的一个配置就可以从Hive中读取数据了。其中pre_sql是从Hive中读取数据SQL,table_name是将读取后的数据,注册成为Spark中临时表的表名,可为任意字段。

需要注意的是,必须保证hive的metastore是在服务状态。

在Cluster、Client、Local模式下运行时,必须把hive-site.xml文件置于提交任务节点的$HADOOP_CONF目录下

Filter

在Filter部分,这里我们配置一系列的转化,我们这里把不需要的minute和hour字段丢弃。当然我们也可以在读取Hive的时候通过pre_sql不读取这些字段

filter {
remove {
source_field = ["minute", "hour"]
}
}

Output

最后我们将处理好的结构化数据写入ClickHouse

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "nginx_log"
fields = ["date", "datetime", "hostname", "url", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Running Seatunnel

我们将上述四部分配置组合成为我们的配置文件config/batch.conf

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
// 这个配置必需填写
spark.sql.catalogImplementation = "hive"
}
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}
filter {
remove {
source_field = ["minute", "hour"]
}
}
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

执行命令,指定配置文件,运行 Seatunnel,即可将数据写入ClickHouse。这里我们以本地模式为例。

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

Conclusion

在这篇文章中,我们介绍了如何使用 Seatunnel 将Hive中的数据导入ClickHouse中。仅仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码,十分简单。

希望了解 Seatunnel 与ClickHouse、Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

-- Power by InterestingLab

· 阅读需 8 分钟

说到数据写入 Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

今天给大家推荐一款能够实现数据快速写入的黑科技 Seatunnel https://github.com/apache/incubator-seatunnel 一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

Kafka to Elasticsearch

和Logstash一样,Seatunnel同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用 Seatunnel 将数据快速写入Elasticsearch

Log Sample

原始日志格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Document

我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:

domain String
hostname String
status int
datetime String
count int

Seatunnel with Elasticsearch

接下来会给大家详细介绍,我们如何通过 Seatunnel 读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

Seatunnel

Seatunnel 同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

Prerequisites

首先我们需要安装seatunnel,安装十分简单,无需配置系统环境变量

  1. 准备Spark环境
  2. 安装 Seatunnel
  3. 配置 Seatunnel

以下是简易步骤,具体安装可以参照 Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/seatunnel/releases/download/v1.1.1/seatunnel-1.1.1.zip
unzip seatunnel-1.1.1.zip
cd seatunnel-1.1.1

vim config/seatunnel-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Seatunnel Pipeline

与Logstash一样,我们仅需要编写一个Seatunnel Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手 Seatunnel 配置。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}

Input

这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

kafkaStream {
topics = "seatunnel-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_es_group"
consumer.rebalance.max.retries = 100
}

Filter

在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合

filter {
# 使用正则解析原始日志
# 最开始数据都在raw_message字段中
grok {
source_field = "raw_message"
pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# Elasticsearch中支持的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
}
## 利用SQL对数据进行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}

Output

最后我们将处理好的结构化数据写入Elasticsearch。

output {
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}

Running Seatunnel

我们将上述四部分配置组合成为我们的配置文件 config/batch.conf

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}
input {
kafkaStream {
topics = "seatunnel-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_es_group"
consumer.rebalance.max.retries = 100
}
}
filter {
# 使用正则解析原始日志
# 最开始数据都在raw_message字段中
grok {
source_field = "raw_message"
pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# Elasticsearch中支持的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
}
## 利用SQL对数据进行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}

执行命令,指定配置文件,运行 Seatunnel,即可将数据写入Elasticsearch。这里我们以本地模式为例。

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^_^.

"_source": {
"domain": "elasticsearch.cn",
"hostname": "localhost",
"status": "200",
"datetime": "2018-11-26T21:54:00.000+08:00",
"count": 26
}

Conclusion

在这篇文章中,我们介绍了如何通过 Seatunnel 将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用 Seatunnel 解决问题。

希望了解 Seatunnel 与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

Contract us

  • 邮件列表 : dev@seatunnel.apache.org. 发送任意内容至 dev-subscribe@seatunnel.apache.org, 按照回复订阅邮件列表。
  • Slack: 发送 Request to join SeaTunnel slack 邮件到邮件列表 (dev@seatunnel.apache.org), 我们会邀请你加入(在此之前请确认已经注册Slack).
  • bilibili B站 视频