跳到主要内容

· 阅读需 8 分钟

在大规模数据集成场景中,吞吐瓶颈往往不在数据通道本身,而在“元数据路径”上:启动时的 Connector/Jar 加载、运行中的状态管理与恢复、以及初始化阶段对外部系统(如数据库、Hive Metastore)的 Schema/分区查询。任务量一旦上到千级、万级,这些“看似轻量”的动作会被放大成集群级别的压力。

Apache SeaTunnel Engine(Zeta)把一部分高频、可复用且昂贵的元数据下沉到引擎侧进行缓存,并配合分布式存储与自动清理策略,让海量同步任务可以更稳定地并行运行。

SeaTunnel 分布式架构下的元数据流转

为什么“元数据”会成为瓶颈

以“万级小作业”并行启动为例,常见的元数据瓶颈主要来自三类:

  • 类加载与依赖隔离:每个作业独立创建 ClassLoader,会反复加载同一批 Connector 依赖,快速抬升 JVM Metaspace 压力。
  • 状态与恢复信息:Checkpoint、任务状态、历史作业信息等若缺少分层存储与自动清理,会带来内存与 IO 的双重负担。
  • 外部目录/Schema 查询:作业初始化阶段对源端数据库或 Metastore 的频繁请求,容易造成连接拥塞与元数据服务抖动。

下面从三条主线拆解 SeaTunnel 的“元数据缓存”思路与可落地的配置建议。

一:ClassLoader 缓存,降低 Metaspace 压力

当大量作业复用相同的 Source/Sink Connector 时,持续创建和销毁类加载器会带来明显的 Metaspace 抖动,甚至触发溢出。SeaTunnel Engine 提供 classloader-cache-mode,用于复用作业之间的 ClassLoader,减少重复加载和频繁回收的开销。

seatunnel.yaml 中开启(该配置默认开启,若你曾手动关闭可重新启用):

seatunnel:
engine:
classloader-cache-mode: true

适用场景

  • 作业规模大、启动频繁,且 Connector 类型相对有限(复用率高)。
  • JVM Metaspace 频繁增长,或出现与类加载相关的内存告警。

注意点

  • 如果集群长期运行且 Connector 类型非常分散,缓存会增加常驻的类元数据占用;建议结合监控观察 Metaspace 曲线,再决定是否开启或调整作业结构。

二:分布式状态与持久化,保证可恢复与可运营

SeaTunnel Engine 的容错语义基于 Chandy–Lamport Checkpoint 思想。为了兼顾性能与可靠性,它在引擎内部使用 Hazelcast 的分布式数据结构(如 IMap)承载一部分运行态信息,并通过外部存储(共享/分布式存储)完成故障恢复所需的数据落盘。

你通常需要关心三组配置:

1) Checkpoint 触发参数

seatunnel:
engine:
checkpoint:
interval: 300000
timeout: 10000

说明:如果在作业配置文件 env 中配置了 checkpoint.interval/checkpoint.timeout,会优先以作业配置为准。

2) IMap 备份与持久化(建议用于生产集群)

当集群节点数大于 1 时,建议至少配置 backup-count,以降低单点故障导致的内存态信息丢失风险;对于需要“全停全启后自动恢复”的场景,可进一步配置 IMap 外部持久化。

相关细节可参考文档:

  • /docs/seatunnel-engine/deployment
  • /docs/seatunnel-engine/checkpoint-storage

3) 历史作业信息的自动清理

SeaTunnel 将已完成作业的状态、计数器、错误日志等信息存放在 IMap 中。作业越多,累积越快。建议按需配置 history-job-expire-minutes,让过期信息自动淘汰,避免内存长期膨胀(默认 1440 分钟,即 1 天)。

seatunnel:
engine:
history-job-expire-minutes: 1440

三:Catalog/Schema 元数据缓存,减少源端压力

大量作业并行启动时,对外部系统的元数据请求(表结构、分区信息、约束信息等)很容易成为“隐形风暴”。SeaTunnel 在 Connector/Catalog 侧引入缓存与复用思路,尽量把高频查询前置到引擎侧,减少重复的网络往返与服务端解析开销。

  • JDBC 场景:初始化阶段会读取表结构、字段类型、主键等信息,用于校验与分片规划。建议在高并发启动时避免每个作业对同一张表重复拉取全量元数据(可通过作业编排层做批次启动/预热)。
  • Hive 场景:Hive Metastore 往往是共享服务且相对敏感,建议尽量复用 Catalog 实例与已加载的 Database/Table/Partition 信息,并在大规模分区表同步中关注 Metastore 的 QPS 与响应时间。

Flink 的设计重心是长生命周期的流作业与复杂算子状态;Spark 更偏向批处理与作业级 Context 管理。在“万级独立小任务并发”这个目标下,SeaTunnel Engine 的策略更强调把可复用的启动与运行元数据沉到引擎层:减少重复加载、减少重复查询、并对历史信息进行可控的生命周期管理,从而提升并发启动与稳定性。

生产落地建议

  • 启用合理备份:生产集群建议 backup-count >= 1,并评估是否需要 IMap 外部持久化以支持全停全启自动恢复。
  • 收敛 Connector 类型:尽量在同一集群里控制 Connector 组合的离散程度,让 classloader-cache-mode 的收益最大化。
  • 关注“元数据指标”:除了 JVM 指标,建议关注 Checkpoint 延迟/失败率、Hazelcast 内存使用、IMap 大小与增长速率、历史作业累积速度等。
  • 配置过期策略:根据排障与审计需求设置 history-job-expire-minutes,避免“为了可观测性而撑爆内存”。

元数据缓存相关指标示意

· 阅读需 8 分钟

Amazon Aurora DSQL是亚马逊云科技于2024年12月推出的分布式SQL数据库,专为构建扩展性无限、高可用且免基础设施管理的应用程序设计,具有可用性高、无服务器模式架构、兼容性强、容错能力和安全级别高等特点。

由于Aurora DSQL的认证机制与IAM集成, 访问Aurora DSQL数据库需要通过IAM的身份来生成token 进行访问,而token 默认只有15分钟有效期,因此目前一些主流的数据同步工具暂不支持将其他数据库的数据迁移到Aurora DSQL。

基于这种情况,本文作者基于数据同步工具Apache SeaTunnel开发了一个专门针对Aurora DSQL的sink Connector,以满足从其他数据库迁移数据到Aurora DSQL需求。

SeaTunnel 介绍

SeaTunnel是一个非常易用、多模态、超高性能的分布式数据集成平台,专注于数据集成和数据同步,主要旨在解决数据集成领域的常见问题。

SeaTunnel 相关特性

  • 丰富且可扩展的Connector: 目前,SeaTunnel 支持超过 190 个Connector且数量还在增加,像主流数据库MySQL 、Oracle、SQLServer、PostgreSQL等都已经提供了Connector支持。插件式设计让用户可以轻松开发自己的Connector并将其集成到SeaTunnel项目中。
  • 批流集成:基于SeaTunnel Connector API开发的Connector完美兼容离线同步、实时同步、全量同步、增量同步等场景。 它们大大降低了管理数据集成任务的难度。
  • 分布式快照:支持分布式快照算法,保证数据一致性。
  • 多引擎支持:SeaTunnel默认使用SeaTunnel引擎(Zeta)进行数据同步。 SeaTunnel还支持使用Flink或Spark作为Connector的执行引擎,以适应企业现有的技术组件。 SeaTunnel 支持 Spark 和 Flink 的多个版本。
  • JDBC复用、数据库日志多表解析:SeaTunnel支持多表或全库同步,解决了过度JDBC连接的问题; 支持多表或全库日志读取解析,解决了CDC多表同步场景下需要处理日志重复读取解析的问题。
  • 高吞吐量、低延迟:SeaTunnel支持并行读写,提供稳定可靠、高吞吐量、低延迟的数据同步能力。
  • 完善的实时监控:SeaTunnel支持数据同步过程中每一步的详细监控信息,让用户轻松了解同步任务读写的数据数量、数据大小、QPS等信息。

SeaTunnel 工作流程

图一 Seatunnel工作流图

SeaTunnel的工作流程如上图所示,用户配置作业信息并选择提交作业的执行引擎。Source Connector负责并行读取源端数据并将数据发送到下游Transform或直接发送到Sink,Sink将数据写入目的地。

从源码构建SeaTunnel

git clone https://github.com/apache/seatunnel.git
cd seatunnel
sh ./mvnw clean install -DskipTests -Dskip.spotless=true
cp seatunnel-dist/target/apache-seatunnel-${version}-bin.tar.gz /The-Path-You-Want-To-Copy
cd /The-Path-You-Want-To-Copy
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

从源码构建成功后,所有的Connector插件和一些必要的依赖(例如:mysql驱动)都包含在二进制包中。您可以直接使用Connector插件,而无需单独安装它们。

使用Seatunnel同步MySQL数据到Aurora DSQL 配置示例

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 6000
checkpoint.timeout = 1200000
}
source {
MySQL-CDC {
username = "user name"
password = "password"
table-names = ["db.table1"]
url = "jdbc:mysql://dbhost:3306/db?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&connectTimeout=120000&socketTimeout=120000&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
table-names-config = [
{
table = "db.table1"
primaryKeys = ["id"]
}
]
}
}
transform {

}
sink {
Jdbc {
url="jdbc:postgresql://<dsql_endpoint>:5432/postgres"
dialect="dsql"
driver = "org.postgresql.Driver"
username = "admin"
access_key_id = "ACCESSKEYIDEXAMPLE"
secret_access_key = "SECRETACCESSKEYEXAMPLE"
region = "us-east-1"
database = "postgres"
generate_sink_sql = true
primary_keys = ["id"]
max_retries="3"
batch_size =1000
}
}

运行数据同步任务

将上面的配置保存为mysql-to-dsql.conf 文件(请注意需要将示例中的值替换为真实的参数),存放在apache-seatunnel-${version} 的config 目录下,执行以下命令:

cd "apache-seatunnel-${version}"
./bin/seatunnel.sh --config ./config/mysql-to-dsql.conf -m local

图二 数据同步日志信息

命令执行成功后,您可以通过新产生的日志观察任务执行情况,如果出现错误,也可以根据异常信息进行定位,比如数据库连接超时、表不存在情况。而正常情况下,数据会成功写入目标 Aurora DSQL,如上图所示。

总结

Aurora DSQL是一款高度安全、易扩展、无服务器基础设施的分布式数据库,它的认证方式与IAM身份结合,因此目前缺少合适的工具可以将数据同步到Aurora DSQL中,尤其是在实时数据同步方面。SeaTunnel 是一款非常优秀数据集成和数据同步工具,目前支持多种数据源的数据同步,并且基于SeaTunnel 也可以非常灵活地实现自定义的数据同步需求,比如全量同步/增量实时同步。基于这种灵活性,本文作者开发了一种专门针对于Aurora DSQL 的Sink Connector, 以满足对于Aurora DSQL 数据同步需求。

参考文档

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

谭志强,亚马逊云科技迁移解决方案架构师,主要负责企业级客户的上云或跨云迁移工作,具有十几年 IT 专业服务经验,历任程序设计师、项目经理、技术顾问、解决方案架构师。

· 阅读需 16 分钟

Apache SeaTunnel (Incubating) 与 Apache Inlong (Incubating) 的5月联合Meetup中,第二位分享的嘉宾是来自白鲸开源的高级工程师李宗文。在使用Apache SeaTunnel (Incubating) 的过程中,他发现了 Apache SeaTunnel (Incubating) 存在的四大问题:Connector实现次数多、参数不统一、难以支持多个版本的引擎以及引擎升级难的问题。为了解决以上的难题,李宗文将目标放在将Apache SeaTunnel (Incubating)与计算引擎进行解耦,重构其中Source与Sink API,实现改良了开发体验。

本次演讲主要包含四个部分:

  1. Apache SeaTunnel (Incubating)重构的背景和动机
  2. Apache SeaTunnel (Incubating)重构的目标
  3. Apache SeaTunnel (Incubating)重构整体的设计
  4. Apache SeaTunnel (Incubating) Source API的设计
  5. Apache SeaTunnel (Incubating) Sink API的设计

李宗文

白鲸开源 高级工程师

Apache SeaTunnel(Incubating)

& Flink Contributor, Flink CDC & Debezium Contributor

01 重构的背景与动机

01 Apache SeaTunnel(Incubating)与引擎耦合

用过Apache SeaTunnel (Incubating) 的小伙伴或者开发者应该知道,目前Apache SeaTunnel (Incubating) 与引擎完全耦合,完全基于Spark、Flink开发,其中的配置文件参数都基于Flink、Spark引擎。从贡献者和用户的角度出发,我们能发现一些问题。

从贡献者的角度:反复实现Connector,没有收获感;潜在贡献者由于引擎版本不一致无法贡献社区;

从用户的角度:目前很多公司采用Lambda架构,离线作业使用Spark,实时作业使用Flink, 使用中就会发现SeaTunnel 的Connector可能Spark有,但是Flink没有,以及两个引擎对于同一存储引擎的Connector的参数也不统一,有较高的使用成本,脱离了SeaTunnel简单易用的初衷;还有用户提问说目前支不支持Flink的1.14版本,按照目前SeaTunnel的架构,想要支持Flink的1.14就必须抛弃之前的版本,因此这也会对之前版本的用户造成很大的问题。

因此,我们不管是做引擎升级或者支持更多的版本的用户都很困难。

另外Spark和Flink都采用了Chandy-lamport算法实现的Checkpoint容错机制,也在内部进行了DataSet与DataStream的统一,以此为前提我们认为解耦是可行的。

02 Apache SeaTunnel(Incubating)与引擎解耦

因此为了解决以上提出的问题,我们有了以下的目标:

  1. Connector只实现一次:针对参数不统一、Connector多次实现的问题,我们希望实现一个统一的Source 与Sink API;
  2. 支持多个版本的Spark与Flink引擎:在Source与Sink API上再加入翻译层去支持多个版本与Spark和Flink引擎,解耦后这个代价会小很多。
  3. 明确Source的分片并行逻辑和Sink的提交逻辑:我们必须提供一个良好的API去支持Connector开发;
  4. 支持实时场景下的数据库整库同步:这个是目前很多用户提到需要CDC支持衍生的需求。我之前参与过Flink CDC社区,当时有许多用户提出在CDC的场景中,如果直接使用Flink CDC的话会导致每一个表都持有一个链接,当遇到需要整库同步需求时,千张表就有千个链接,该情况无论是对于数据库还是DBA都是不能接受的,如果要解决这个问题,最简单的方式就是引入Canal、Debezium等组件,使用其拉取增量数据到Kafka等MQ做中间存储,再使用Flink SQL进行同步,这实际已经违背了Flink CDC最早减少链路的想法,但是Flink CDC的定位只是一个Connector,无法做全链路的需求,所以该proposal在Flink CDC社区中没有被提出,我们借着本次重构,将proposa提交到了SeaTunnel社区中。
  5. 支持元信息的自动发现与存储:这一部分用户应该有所体验,如Kafka这类存储引擎,没有记录数据结构的功能,但我们在读取数据时又必须是结构化的,导致每次读取一个topic之前,用户都必须定义topic的结构化数据类型,我们希望做到用户只需要完成一次配置,减少重复的操作。

可能也有同学有疑惑为什么我们不直接使用Apache Beam,Beam的Source分为BOUNDED与UNBOUNDED,也就是需要实现两遍,并且有些Source与Sink的特性也不支持,具体所需的特性在后面会提到;

03 Apache SeaTunnel(Incubating)重构整体的设计

Apache SeaTunnel(Incubating) API总体结构的设计如上图;

Source & Sink API:数据集成的核心API之一,明确Source的分片并行逻辑和Sink的提交逻辑,用于实现Connector;

Engine API

Translation: 翻译层,用于将SeaTunnel的Souce与Sink API翻译成引擎内部可以运行的Connector;

Execution:执行逻辑,用于定义Source、Transform、Sink等操作在引擎内部的执行逻辑;

Table API

Table SPI:主要用于以SPI的方式暴露Source与Sink接口,并明确Connector的必填与可选参数等;

DataType:SeaTunnel的数据结构,用于隔离引擎,声明Table Schema等;

Catalog:用于获取Table Scheme、Options等;

Catalog Storage: 用于存储用户定义Kafka等非结构化引擎的Table Scheme等;

从上图是我们现在设想的执行流程

  1. 从配置文件或UI等方式获取任务参数;
  2. 通过参数从Catalog中解析得到Table Schema、Option等信息;
  3. 以SPI方式拉起SeaTunnel的Connector,并注入Table信息等;
  4. 将SeaTunnel的Connector翻译为引擎内部的Connector;
  5. 执行引擎的作业逻辑,图中的多表分发目前只存在CDC整库同步场景下,其他Connector都是单表,不需要分发逻辑;

从以上可以看出,最难的部分是如何将Apache SeaTunnel(Incubating) 的Source和Sink翻译成引擎内部的Source和Sink。

当下许多用户不仅把Apache SeaTunnel (Incubating) 当做一个数据集成方向的工具,也当做数仓方向的工具,会使用很多Spark和Flink的SQL,我们目前希望能够保留这样的SQL能力,让用户实现无缝升级。

根据我们的调研,如上图,是对Source与Sink的理想执行逻辑,由于SeaTunnel以WaterDrop孵化,所以图上的术语偏向Spark;

理想情况下,在Driver上可以运行Source和Sink的协调器,然后Worker上运行Source的Reader和Sink的Writer。在Source协调器方面,我们希望它能支持几个能力。

一、是数据的分片逻辑,可以将分片动态添加到Reader中。

二、是可以支持Reader的协调。SourceReader用于读取数据,然后将数据发送到引擎中流转,最终流转到Source Writer中进行数据写入,同时Writer可以支持二阶段事务提交,并由Sink的协调器支持Iceberg等Connector的聚合提交需求;

04 Source API

通过我们的调研,发现Source所需要的以下特性:

  1. 统一离线和实时API:Source只实现一次,同时支持离线和实时;
  2. 能够支持并行读取:比如Kafka每一个分区都生成一个的读取器,并行的执行;
  3. 支持动态添加分片:比如Kafka定于一个topic正则,由于业务量的需求,需要新增一个topic,该Source API可以支持我们动态添加到作业中。
  4. 支持协调读取器的工作:这个目前只发现在CDC这种Connector需要支持。CDC目前都是基于Netfilx的DBlog并行算法去支持,该情况在全量同步和增量同步两个阶段的切换时需要协调读取器的工作。
  5. 支持单个读取器处理多张表:即由前面提到的支持实时场景下的数据库整库同步需求;

对应以上需求,我们做出了基础的API,如上图,目前代码以提交到Apache SeaTunnel(Incubating)的社区中api-draft分支,感兴趣的可以查看代码详细了解。

如何适配Spark和Flink引擎

Flink与Spark都在后面统一了DataSet与DataStream API,即能够支持前两个特性,那么对于剩下的3个特性:

  • 如何支持动态添加分片?
  • 如何支持协调读取器?
  • 如何支持单个读取器处理多张表?

带着问题,进入目前的设计。

我们发现除了CDC之外,其他Connector是不需要协调器的,针对不需要协调器的,我们会有一个支持并行的Source,并进行引擎翻译。

如上图中左边是一个分片的enumerator,可以列举source需要哪些分片,有哪些分片,实时进行分片的枚举,随后将每个分片分发到真正的数据读取模块SourceReader中。对于离线与实时作业的区分使用Boundedness标记,Connector可以在分片中标记是否有停止的Offset,如Kafka可以支持实时,同时也可以支持离线。ParallelSource可以在引擎设置任意并行度,以支持并行读取。

在需要协调器的场景,如上图,需要在Reader和Enumerator之间进行Event传输, Enumerator通过Reader发送的Event进行协调工作。Coordinated Source需要在引擎层面保证单并行度,以保证数据的一致性;当然这也不能良好的使用引擎的内存管理机制,但是取舍是必要的;

对于最后一个问题,我们如何支持单个读取器处理多张表。这会涉及到Table API层,通过Catalog读取到了所有需要的表后,有些表可能属于一个作业,可以通过一个链接去读取,有些可能需要分开,这个依赖于Source是怎么实现的。基于这是一个特殊需求,我们想要减少普通开发者的难度,在Table API这一层,我们会提供一个SupportMultipleTable接口,用于声明Source支持多表的读取。Source在实现时,要根据多张表实现对应的反序列化器。针对衍生的多表数据如何分离,Flink将采用Side Output机制,Spark预想使用Filter或Partition机制。

05 Sink API

目前Sink所需的特性并不是很多,经过调研目前发现有三个需求

  1. 幂等写入,这个不需要写代码,主要看存储引擎是否能支持。
  2. 分布式事务,主流是二阶段提交,如Kafka都是可以支持分布式事务的。
  3. 聚合提交,对于Iceberg、hoodie等存储引擎而言,我们不希望有小文件问题,于是期望将这些文件聚合成一个文件,再进行提交。

基于以上三个需求,我们有对应的三个API,分别是SinkWriter、SinkCommitter、SinkAggregated Committer。SinkWriter是作为基础写入,可能是幂等写入,也可能不是。SinkCommitter支持二阶段提交。SinkAggregatedCommitter支持聚合提交。

理想状态下,AggregatedCommitter单并行的在Driver中运行,Writer与Committer运行在Worker中,可能有多个并行度,每个并行度都有自己的预提交工作,然后把自己提交的信息发送给Aggregated Committer再进行聚合。

目前Spark和Flink的高版本都支持在Driver(Job Manager)运行AggregatedCommitter,worker(Job Manager)运行writer和Committer。

但是对于Flink低版本,无法支持AggregatedCommitter在JM中运行,我们也进行翻译适配的设计。Writer与Committer会作为前置的算子,使用Flink的ProcessFunction进行包裹,支持并发的预提交与写入工作,基于Flink的Checkpoint机制实现二阶段提交,这也是目前Flink的很多Connector的2PC实现方式。这个ProcessFunction会将预提交信息发送到下游的Aggregated Committer中,Aggregated Committer可以采用SinkFunction或Process Function等算子包裹,当然,我们需要保证AggregatedCommitter只会启动一个,即单并行度,否则聚合提交的逻辑就会出现问题。

感谢各位的观看,如果大家对具体实现感兴趣,可以去 Apache SeaTunnel (Incubating) 的社区查看api-draft分支代码,谢谢大家。

· 阅读需 10 分钟

作者 | Apache SeaTunnel(Incubating) Contributor 范佳

整理 | 测试工程师 冯秀兰

对于百亿级批数据的导入,传统的 JDBC 方式在一些海量数据同步场景下的表现并不尽如人意。为了提供更快的写入速度,Apache SeaTunnel(Incubating) 在刚刚发布的 2.1.1 版本中提供了 ClickhouseFile-Connector 的支持,以实现 Bulk load 数据写入。

Bulk load 指把海量数据同步到目标 DB 中,目前 SeaTunnel 已实现数据同步到 ClickHouse 中。

在 Apache SeaTunnel(Incubating) 4 月 Meetup 上,Apache SeaTunnel(Incubating) Contributor 范佳分享了《基于 SeaTunnel 的 ClickHouse bulk load 实现》,详细讲解了 ClickHouseFile 高效处理海量数据的具体实现原理和流程。

感谢本文整理志愿者 测试工程师 冯秀兰 对 Apache SeaTunnel(Incubating) 项目的支持!

本次演讲主要包含七个部分:

  • ClickHouse Sink 现状
  • ClickHouse Sink 弱场景
  • ClickHouseFile 插件介绍
  • ClickHouseFile 核心技术点
  • ClickHouseFile 插件的实现解析
  • 插件能力对比
  • 后期优化方向

​范 佳白鲸开源 高级工程师

01 ClickHouse Sink 现状

现阶段,SeaTunnel 把数据同步到 ClickHouse 的流程是:只要是 SeaTunnel 支持的数据源,都可以把数据抽取出来,抽取出来之后,经过转换(也可以不转换),直接把源数据写入 ClickHouse sink connector 中,再通过 JDBC 写入到 ClickHouse 的服务器中。

但是,通过传统的 JDBC 写入到 ClickHouse 服务器中会存在一些问题。

首先,现阶段使用的工具是 ClickHouse 提供的驱动,实现方式是通过 HTTP,然而 HTTP 在某些场景下,实现效率不高。其次是海量数据,如果有重复数据或者一次性写入大量数据,使用传统的方式是生成对应的插入语句,通过 HTTP 发送到 ClickHouse 服务器端,在服务器端来进行逐条或分批次解析、执行,无法实现数据压缩。

最后就是我们通常会遇到的问题,数据量过大可能导致 SeaTunnel 端 OOM,或者服务器端因为写入数据量过大,频率过高,导致服务器端挂掉。

于是我们思考,是否有比 HTTP 更快的发送方式?如果可以在 SeaTunnel 端做数据预处理或数据压缩,那么网络带宽压力会降低,传输速率也会提高。

02 ClickHouse Sink 的弱场景

如果使用 HTTP 传输协议,当数据量过大,批处理以微批的形式发送请求,HTTP 可能处理不过来;

太多的 insert 请求,服务器压力大。假设带宽可以承受大量的请求,但服务器端不一定能承载。线上的服务器不仅需要数据插入,更重要的是查询数据为其他业务团队使用。若因为插入数据过多导致服务器集群宕机,是得不偿失的。

03 ClickHouse File 核心技术点

针对这些 ClickHouse 的弱场景,我们想,有没有一种方式,既能在 Spark 端就能完成数据压缩,还可以在数据写入时不增加 Server 的资源负载,并且能快速写入海量数据?于是我们开发了 ClickHouseFile 插件来满足这些需求。

ClickHouseFile 插件的关键技术是 ClickHouse -local。ClickHouse-local 模式可以让用户能够对本地文件执行快速处理,而无需部署和配置 ClickHouse 服务器。ClickHouse-local 使用与 ClickHouse Server 相同的核心,因此它支持大多数功能以及相同的格式和表引擎。

因为有这 2 个特点,这意味着用户可以直接处理本地文件,而无需在 ClickHouse 服务器端做处理。由于是相同的格式,我们在远端或者 SeaTunnel 端进行的操作所产生的数据和服务器端是无缝兼容的,可以使用 ClickHouse local 来进行数据写入。ClickHouse local 是实现 ClickHouseFile 的核心技术点,因为有了这个插件,现阶段才能够实现 ClickHouse file 连接器。

ClickHouse local 核心使用方式:

第一行:将数据通过 Linux 管道传递给 ClickHouse-local 程序的 test_table 表。

第二至五行:创建一个 result_table 表用于接收数据。

第六行:将数据从 test_table 到 result_table 表。

第七行:定义数据处理的磁盘路径。

通过调用 Clickhouse-local 组件,实现在 Apache SeaTunnel(Incubating) 端完成数据文件的生成,以及数据压缩等一系列操作。再通过和 Server 进行通信,将生成的数据直接发送到 Clickhouse 的不同节点,再将数据文件提供给节点查询。

原阶段和现阶段实现方式对比:

原来是 Spark 把数据包括 insert 语句,发送给服务器端,服务器端做 SQL 的解析,表的数据文件生成、压缩,生成对应的文件、建立对应索引。若使用 ClickHouse local 技术,则由 SeaTunnel 端做数据文件的生成、文件压缩,以及索引的创建,最终产出就是给服务器端使用的文件或文件夹,同步给服务器后,服务器就只需对数据查询,不需要做额外的操作。

04 核心技术点

以上流程可以促使数据同步更加高效,得益于我们对其中的三点优化。

第一,数据实际上师从管道传输到 ClickHouseFile,在长度和内存上会有限制。为此,我们将 ClickHouse connector,也就是 sink 端收到的数据通过 MMAP 技术写入临时文件,再由 ClickHouse local 读取临时文件的数据,生成我们的目标 local file,以达到增量读取数据的效果,解决 OM 的问题。

第二,支持分片。因为如果在集群中使用,如果只生成一个文件或文件夹,实际上文件只分发到一个节点上,会大大降低查询的性能。因此,我们进行了分片支持,用户可以在配置文件夹中设置分片的 key,算法会将数据分为多个 log file,写入到不同的集群节点中,大幅提升读取性能。

第三个重要的优化是文件传输,目前 SeaTunnel 支持两种文件传输方式,一种是 SCP,其特点是安全、通用、无需额外配置;另一种是 RSYNC,其有点事快速高效,支持断点续传,但需要额外配置,用户可以根据需要选择适合自己的方式。

05 插件实现解析

概括而言,ClickHouseFile 的总体实现流程如下:

  • 缓存数据,缓存到 ClickHouse sink 端;
  • 调用本地的 ClickHouse-local 生成文件;
  • 将数据发送到 ClickHouse 服务端;
  • 执行 ATTACH 命令

通过以上四个步骤,生成的数据达到可查询的状态。

06 插件能力对比

从数据传输角度来说,ClickHouseFile 更适用于海量数据,优势在于不需要额外的配置,通用性强,而 ClickHouseFile 配置比较复杂,目前支持的 engine 较少;

就环境复杂度来说,ClickHouse 更适合环境复杂度高的情况,不需要额外配置就能直接运行;

在通用性上,ClickHouse 由于是 SeaTunnel 官方支持的 JDBC diver,基本上支持所有的 engine 的数据写入,ClickHouseFile 支持的 engine 相对较少;从服务器压力方面来说,ClickHouseFile 的优势在海量数据传输时就体现出来了,不会对服务器造成太大的压力。

但这二者并不是竞争关系,需要根据使用场景来选择。

07 后续计划

目前虽然 SeaTunnel 支持 ClickHouseFile 插件,但是还有很多地方需要优化,主要包括:

  • Rsync 支持;
  • Exactly-Once 支持;
  • 支持 Zero Copy 传输数据文件;
  • 更多 Engine 的支持

· 阅读需 14 分钟

在Apache SeaTunnel(Incubating) 4 月Meetup上,孩子王大数据专家、OLAP平台架构师 袁洪军 为我们带来了《Apache SeaTunnel (Incubating)在孩子王的应用实践》。

本次演讲主要包含五个部分:

  • 孩子王引入Apache SeaTunnel (Incubating)的背景介绍

  • 大数据处理主流工具对比分析

  • Apache SeaTunnel (Incubating)的落地实践

  • Apache SeaTunnel (Incubating)改造中的常见问题

  • 对孩子王未来发展方向的预测展望

袁洪军

孩子王 大数据专家、OLAP 平台架构师。多年大数据平台研发管理经验,在数据资产、血缘图谱、数据治理、OLAP 等领域有着丰富的研究经验。

01 背景介绍

目前孩子王的OLAP平台主要包含元数据层、任务层、存储层、SQL层、调度层、服务层以及监控层七部分,本次分享主要关注任务层中的离线任务。

其实孩子王内部有一套完整的采集推送系统,但由于一些历史遗留问题,公司现有的平台无法快速支持OLAP平台上线,因此当时公司只能选择放弃自身的平台,转而着手研发新的系统。

当时摆在OLAP面前的有三个选择:

1、给予采集推送系统做二次研发;

2、完全自研;

3、参与开源项目。

02 大数据处理主流工具对比分析

而这三项选择却各有优劣。若采基于采集推送做二次研发,其优点是有前人的经验,能够避免重复踩坑。但缺点是代码量大,研读时间、研读周期较长,而且抽象代码较少,与业务绑定的定制化功能较多,这也导致了其二开的难度较大。

若完全自研,其优点第一是开发过程自主可控,第二是可以通过Spark等一些引擎做贴合我们自身的架构,但缺点是可能会遭遇一些未知的问题。

最后如果使用开源框架,其优点一是抽象代码较多,二是经过其他大厂或公司的验证,框架在性能和稳定方面能够得到保障。因此孩子王在OLAP数据同步初期,我们主要研究了DATAX、Sqoop和SeaTunnel这三个开源数据同步工具。

从脑图我们可以看到,Sqoop的主要功能是针对RDB的数据同步,其实现方式是基于MAP/REDUCE。Sqoop拥有丰富的参数和命令行可以去执行各种操作。Sqoop的优点在于它首先贴合Hadoop生态,并已经支持大部分RDB到HIVE任意源的转换,拥有完整的命令集和API的分布式数据同步工具。

但其缺点是Sqoop只支持RDB的数据同步,并且对于数据文件有一定的限制,以及还没有数据清洗的概念。

DataX的主要功能是任意源的数据同步,通过配置化文件+多线程的方式实现,主要分为三个流程:Reader、Framework和Writer,其中Framework主要起到通信和留空的作用。

DataX的优点是它采用了插件式的开发,拥有自己的流控和数据管控,在社区活跃度上,DataX的官网上提供了许多不同源的数据推送。但DataX的缺点在于它基于内存,对数据量可能存在限制。

Apache SeaTunnel (Incubating)做的也是任意源的数据同步,实现流程分为source、transform和sink三步,基于配置文件、Spark或Flink实现。其优点是目前官网2.1.0有非常多的插件和源的推送,基于插件式的思想也使其非常容易扩展,拥抱Spark和Flink的同时也做到了分布式的架构。要说Apache SeaTunnel (Incubating)唯一的缺点可能是目前缺少IP的调用,UI界面需要自己做管控。

综上所述,Sqoop虽然是分布式,但是仅支持RDB和HIVE、Hbase之间的数据同步且扩展能力差,不利于二开。DataX扩展性好,整体性稳定,但由于是单机版,无法分布式集群部署,且数据抽取能力和机器性能有强依赖关系。而SeaTunnel和DataX类似并弥补了DataX非分布式的问题,对于实时流也做了很好的支持,虽然是新产品,但社区活跃度高。基于是否支持分布式、是否需要单独机器部署等诸多因素的考量,最后我们选择了SeaTunnel。

03 Apache SeaTunnel (Incubating)的落地实践

在Apache SeaTunnel (Incubating)的官网我们可以看到Apache SeaTunnel (Incubating)的基础流程包括source、transform和sink三部分。根据官网的指南,Apache SeaTunnel (Incubating)的启动需要配置脚本,但经过我们的研究发现,Apache SeaTunnel (Incubating)的最终执行是依赖config文件的spark-submit提交的一个Application应用。

这种初始化方式虽然简单,但存在必须依赖Config文件的问题,每次运行任务后都会生成再进行清除,虽然可以在调度脚本中动态生成,但也产生了两个问题。1、频繁的磁盘操作是否有意义;2、是否存在更为高效的方式支持Apache SeaTunnel (Incubating)的运行。

基于以上考量,在最终的设计方案中,我们增加了一个统一配置模板平台模块。调度时只需要发起一个提交命令,由Apache SeaTunnel (Incubating)自身去统一配置模板平台中拉取配置信息,再去装载和初始化参数。

上图展示的便是孩子王OLAP的业务流程,主要分为三块。数据从Parquet,即Hive,通过Parquet表的方式到KYLIN和CK source的整体流程。

这是我们建模的页面,主要通过拖拉拽的方式生成最终模型,每个表之间通过一些交易操作,右侧是针对Apache SeaTunnel (Incubating)的微处理。

因此我们最终提交的命令如上,其中标红的首先是【-conf customconfig/jars】,指用户可以再统一配置模板平台进行处理,或者建模时单独指定。最后标红的【421 $start_time $end_time $taskType】Unicode,属于唯一编码。

下方图左就是我们最终调度脚本提交的38个命令,下方图右是针对Apache SeaTunnel (Incubating)做的改造,可以看到一个较为特殊的名为WaterdropContext的工具类。可以首先判断Unicode是否存在,再通过Unicode_code来获取不同模板的配置信息,避免了config文件的操作。

在最后的reportMeta则是用于在任务执行完成后上报一些信息,这也会在Apache SeaTunnel (Incubating)中完成。

在最终完成的config文件如上,值得注意的是在transform方面,孩子王做了一些改造。首先是针对手机或者身份证号等做脱敏处理,如果用户指定字段,就按照字段做,如果不指定字段就扫描所有字段,然后根据模式匹配,进行脱敏加密。

第二transform还支持自定义处理,如上文说道OLAP建模的时候说到。加入了HideStr,可以保留一串字符的前十个字段,加密后方的所有字符,在数据安全上有所保障。

然后,在sink端,我们为了支持任务的幂等性,我们加入了pre_sql,这主要完成的任务是数据的删除,或分区的删除,因为任务在生产过程中不可能只运行一次,一旦出现重跑或补数等操作,就需要这一部分为数据的不同和正确性做考量。

在图右方的一个Clickhouse的Sink端,这里我们加入了一个is_senseless_mode,它组成了一个读写分离的无感模式,用户在查询和补数的时候不感知整体区域,而是用到CK的分区转换,即名为MOVE PARTITION TO TABLE的命令进行操作的。

此处特别说明KYLIN的Sink端,KYLIN是一个非常特殊的源,拥有自己一整套数据录入的逻辑,而且,他有自己的监控页面,因此我们给予KYLIN的改造只是简单地调用其API操作,在使用KYLIN时也只是简单的API调用和不断轮询的状态,所以KYLIN这块的资源在统一模板配置平台就被限制地很小。

04 Apache SeaTunnel (Incubating)改造中的常见问题

1、OOM&Too many Parts

问题通常会出现在Hive到Hive的过程中,即使我们通过了自动资源的分配,但也存在数据突然间变大的情况,比如在举办了多次活动之后。这样的问题其实只能通过手动动态地调参,调整数据同步批量时间来避免。未来我们可能尽力去完成对于数据量的掌握,做到精细的控制。

2、字段、类型不一致问题

模型上线后,任务依赖的上游表或者字段,用户都会做一些修改,这些修改若无法感知,可能导致任务的失败。目前解决方法是依托血缘+快照的方式进行提前感知来避免错误。

3、自定义数据源&自定义分隔符

如财务部门需要单独使用的分割符,或是jar信息,现在用户可以自己在统一配置模板平台指定加载额外jar信息以及分割符信息。

4、数据倾斜问题

这可能因为用户自己设置了并行度,但无法做到尽善尽美。这一块我们暂时还没有完成处理,后续的思路可能在Source模块中添加post处理,对数据进行打散,完成倾斜。

5、KYLIN全局字典锁问题

随着业务发展,一个cube无法满足用户使用,就能需要建立多个cube,如果多个cube之间用了相同的字段,就会遇到KYLIN全局字典锁的问题。目前解决的思路是把两个或多个任务之间的调度时间进行隔开,如果无法隔开,可以做一个分布式锁的控制。KYLIN的sink端必须要拿到锁才能运行。

05 对孩子王未来发展方向的预测展望

  • 多源数据同步,未来可能针对RDB源进行处理

  • 基于实时Flink的实现

  • 接管已有采集调度平台(主要解决分库分表的问题)

  • 数据质量校验,像一些空值、整个数据的空置率、主时间的判断等

我的分享就到这里,希望以后可以和社区多多交流,共同进步,谢谢!

· 阅读需 5 分钟

1

可管理,可调用,可计算,可变现的数据资源才能成为资产,信息系统的互联互通使得多源和多维度的数据集成需求巨大,这就对数据处理和集成的工具提出了严苛的要求。

智能化时代,在“智慧城市”、“智慧治理”、“产品智能化”等的趋势下,企业大多面临如何实现高效数据推送,提高平台质量,以及保障数据安全的挑战。选对数据集成工具和平台,数据才能发挥出做大的作用。

Apache SeaTunnel (Incubating) 作为下一代高性能、分布式、海量数据集成框架,致力于让数据同步更简单,更高效,加快分布式数据处理能力在生产环境落地。

在 Apache SeaTunnel(Incubating) Meetup(2022 年 4 月 16日),Apache SeaTunnel(Incubating) 社区将邀请了 Apache SeaTunnel(Incubating)的资深用户,分享 Apache SeaTunnel(Incubating)在智能化生产环境中落地的最佳实践。此外,还会有贡献者现场进行 Apache SeaTunnel(Incubating)的源码解析,让你对 Apache SeaTunnel(Incubating)有一个更加全面而深入的了解。

无论你是对 Apache SeaTunnel(Incubating)抱有兴趣的初学者,还是在日常的生产实践中遭遇了复杂棘手的部署问题,都可以来到这里,与我们的讲师近距离沟通,得到你想要的答案。

01 报 名 通 道

Apache SeaTunnel (Incubating) Meetup | 4 月线上直播报名通道已开启,赶快预约吧!

时间:2022-4-16 14:00-17:00

形式:线上直播

点击链接或扫码预约报名(免费):

2

扫码预约报名

3

扫码进直播群

02 活 动 亮 点

  • 行业案例详解
  • 特色功能分析
  • 一线企业踩坑心得
  • 开源社区实战攻略
  • 行业技术专家面对面 Q&A
  • 惊喜礼品送不停

03 活 动 议 程

活动当天,将有来自孩子王、oppo 的工程师现场分享来自厂商的一线前沿实践经验,还有来自白鲸开源的高级工程师对 Apache SeaTunnel(Incubating)的重要功能更新进行“硬核”讲解,干货满满。

4

袁洪军 孩子王 大数据专家、OLAP 平台架构师

多年大数据平台研发管理经验,在数据资产、血缘图谱、数据治理、OLAP 等领域有着丰富的研究经验

演讲时间:14:00-14:40

演讲题目:Apache SeaTunnel(Incubating) 在孩子王的应用实践

演讲概要: 如何实现高效数据推送?如何提高平台质量?如何保障数据安全?孩子王又对 Apache SeaTunnel(Incubating)做了哪些改造?

6

范佳 白鲸开源  高级工程师 Apache SeaTunnel Contributor

演讲时间: 14:40-15:20

演讲题目: 基于 Apache SeaTunnel(Incubating)的 Clickhouse Bulk Load 实现

演讲概要: 通过扩展 Apache SeaTunnel(Incubating)的 Connector实现 Clickhouse的 bulk load 数据同步功能。

7

王子超 oppo 高级后端工程师

演讲时间: 15:50-16:30

演讲题目: oppo智能推荐样本中心基于 Apache SeaTunnel(Incubating)的技术革新

演讲概要: 介绍 oppo 智能推荐机器学习样本流程的演进及 Apache  SeaTunnel(Incubating) 在其中发挥的作用。

除了精彩的演讲之外,现场还设置了多个抽奖环节,参与抽奖有机会获得 Apache SeaTunnel(Incubating) 精美定制礼品,敬请期待~

· 阅读需 8 分钟

2021 年 12 月 9 日,Apache SeaTunnel(Incubating) 进入 Apache 孵化器,在经过社区各位贡献者近四个月的努力下,我们于2022年3月18日发布了首个Apache版本,并且保证了首个版本一次性通过检查。这意味着 2.1.0 版本,是经过 Apache SeaTunnel(Incubating) 社区和 Apache 孵化器投票检查发布的官方版本,企业和个人用户可以放心安全使用。

Note: 软件许可协议是一种具有法律性质的合同或指导,目的在于规范受著作权保护的软件的使用或散布行为。通常的许可方式会允许用户来使用单一或多份该软件的复制,因为若无许可而径予使用该软件,将违反著作权法给予该软件开发者的专属保护。效用上来说,软件许可是软件开发者与其用户之间的一份合约,用来保证在符合许可范围的情况下,用户将不会受到控告。进入孵化器前后,我们花费了大量的时间来梳理整个项目的外部依赖以确保整个项目的合规性。需要说明的是,开源软件选择怎样的License并不意外着项目本身就一定合规。而ASF严苛的版本检查最大程度地保证了软件License的合规性,以及软件合理合法的流通分发。

本次发布版本说明

本次发布我们主要带来了以下特性:

  • 对微内核插件化的架构内核部分进行了大量优化,内核以 Java 为主,并对命令行参数解析,插件加载等做了大量改进,同时插件扩展可根据用户(或贡献者)所擅长的语言去做开发,极大程度地降低了插件开发门槛。
  • 全面支持 Flink ,但同时用户也可自由选择底层引擎,本次更新也为大家带来了大量的Flink插件,也欢迎大家后续贡献相关插件。
  • 提供本地开发极速启动环境支持(example),贡献者或用户可以在不更改任何代码的情况下快速丝滑启动,方便本地快速开发调试体验。对于需要自定义插件的贡献者或者用户来讲,这无疑是个令人激动的好消息。事实上,我们在发布前的测试中,也有大量贡献者采用这种方式快速对插件进行测试。
  • 提供了Docker容器安装,用户可以极快地通过Docker部署安装使用Apache SeaTunnel,未来我们也会围绕Docker&K8s做出大量迭代,欢迎大家讨论交流。

具体发布说明: [Feature]

  • 使用 JCommander来做命令行参数解析,使得开发者更关注逻辑本身。
  • Flink从1.9升级至1.13.5,保持兼容旧版本,同时为后续CDC做好铺垫。
  • 支持 Doris 、Hudi、Phoenix、Druid等Connector 插件,完整的插件支持你可以在这里找到 plugins-supported-by-seatunnel.
  • 本地开发极速启动环境支持,你可以在使用example模块,不修改任何代码的前提下快速启动,方便开发者本地调试体验。
  • 支持通过 Docker 容器安装和试用 Apache SeaTunnel。
  • Sql 组件支持 SET语句,支持配置变量。
  • Config模块重构,减少贡献者理解成本,同时保证项目的代码合规(License)。
  • 项目结构重新调整,以适应新的Roadmap。
  • CI&CD的支持,代码质量自动化管控,(后续会有更多的计划来支持CI&CD开发)。

【致谢】

感谢以下参与贡献的同学(为GitHub ID,排名不分先后):

Al-assad, BenJFan, CalvinKirs, JNSimba, JiangTChen, Rianico, TyrantLucifer, Yves-yuan, ZhangchengHu0923, agendazhang, an-shi-chi-fan, asdf2014, bigdataf, chaozwn, choucmei, dailidong, dongzl, felix-thinkingdata, fengyuceNv, garyelephant, kalencaya, kezhenxu94, legendtkl, leo65535, liujinhui1994, mans2singh, marklightning, mosence, nielifeng, ououtt, ruanwenjun, simon824, totalo, wntp, wolfboys, wuchunfu, xbkaishui, xtr1993, yx91490, zhangbutao, zhaomin1423, zhongjiajie, zhuangchong, zixi0825.

同时也诚挚的感谢我们的Mentor:Zhenxu Ke,Willem Jiang, William Guo,LiDong Dai ,Ted Liu, Kevin,JB 在这个过程中给予的帮助

未来几个版本的规划:

  • CDC的支持;
  • 监控体系的支持;
  • UI系统的支持;
  • 更多的 Connector 支持,以及更高效的Sink支持,如ClickHouse,很快会在下个版本跟大家见面。

后续Feature是由社区共同决定的,我们也在这里呼吁大家一同参与社区后续建设。 欢迎大家关注以及贡献:)

社区发展

【近期概况】

自进入Apache孵化器以来,贡献者从13 人增长至 55 人,且持续保持上升趋势,平均周commits维持在20+,来自不同公司的三位贡献者(Lei Xie, HuaJie Wang,Chunfu Wu,)通过他们对社区的贡献被邀请成为Committer。我们举办了两场MeetUp,来自B站,OPPO、唯品会等企业讲师分享了SeaTunnel在他们在企业中的大规模生产落地实践(后续我们也会保持每月一次的meetup,欢迎各位使用SeaTunnel的用户或者贡献者分享SeaTunnel和你们的故事)。

【Apache SeaTunnel(Incubating)的用户】

Note:仅包含已登记用户 Apache SeaTunnel(Incubating) 目前登记用户如下,如果您也在使用Apache SeaTunnel,欢迎在Who is using SeaTunne! 中登记!

【PPMC感言】

Apache SeaTunnel(Incubating) PPMC LiFeng Nie在谈及首个Apache版本发布的时候说,从进入Apache Incubator的第一天,我们就一直在努力学习Apache Way以及各种Apache政策,第一个版本发布的过程花费了大量的时间(主要是合规性),但我们认为这种时间是值得花费的,这也是我们选择进入Apache的一个很重要的原因,我们需要让用户用得放心,而Apache无疑是最佳选择,其 License 近乎苛刻的检查会让用户尽可能地避免相关的合规性问题,保证软件合理合法的流通。另外,其践行Apache Way,例如公益使命、实用主义、社区胜于代码、公开透明与共识决策、任人唯贤等,可以帮助 SeaTunnel 社区更加开放、透明,向多元化方向发展。

· 阅读需 8 分钟

ClickHouse 是面向 OLAP 的分布式列式 DBMS。我们部门目前已经把所有数据分析相关的日志数据存储至 ClickHouse 这个优秀的数据仓库之中,当前日数据量达到了 300 亿。

之前介绍的有关数据处理入库的经验都是基于实时数据流,数据存储在 Kafka 中,我们使用 Java 或者 Golang 将数据从 Kafka 中读取、解析、清洗之后写入 ClickHouse 中,这样可以实现数据的快速接入。然而在很多同学的使用场景中,数据都不是实时的,可能需要将 HDFS 或者是 Hive 中的数据导入 ClickHouse。有的同学通过编写 Spark 程序来实现数据的导入,那么是否有更简单、高效的方法呢。

目前开源社区上有一款工具 Seatunnel,项目地址 https://github.com/apache/incubator-seatunnel,可以快速地将 HDFS 中的数据导入 ClickHouse。

HDFS To ClickHouse

假设我们的日志存储在 HDFS 中,我们需要将日志进行解析并筛选出我们关心的字段,将对应的字段写入 ClickHouse 的表中。

Log Sample

我们在 HDFS 中存储的日志格式如下, 是很常见的 Nginx 日志

10.41.1.28 github.com 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:03:09:32 +0800] "GET /Apache/Seatunnel HTTP/1.1" 200 0 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)" "196" "-" "mainpage" "443" "-" "172.16.181.129"

ClickHouse Schema

我们的 ClickHouse 建表语句如下,我们的表按日进行分区

CREATE TABLE cms.cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32,
pool String
) ENGINE = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 16384

Seatunnel with ClickHouse

接下来会给大家详细介绍,我们如何通过 Seatunnel 满足上述需求,将 HDFS 中的数据写入 ClickHouse 中。

Seatunnel

Seatunnel 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Seatunnel 拥有着非常丰富的插件,支持从 Kafka、HDFS、Kudu 中读取数据,进行各种各样的数据处理,并将结果写入 ClickHouse、Elasticsearch 或者 Kafka 中。

Prerequisites

首先我们需要安装 Seatunnel,安装十分简单,无需配置系统环境变量

  1. 准备 Spark 环境
  2. 安装 Seatunnel
  3. 配置 Seatunnel

以下是简易步骤,具体安装可以参照 Quick Start

cd /usr/local

wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

wget https://github.com/InterestingLab/seatunnel/releases/download/v1.1.1/seatunnel-1.1.1.zip

unzip seatunnel-1.1.1.zip

cd seatunnel-1.1.1
vim config/seatunnel-env.sh

# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

seatunnel Pipeline

我们仅需要编写一个 seatunnel Pipeline 的配置文件即可完成数据的导入。

配置文件包括四个部分,分别是 Spark、Input、filter 和 Output。

Spark

这一部分是 Spark 的相关配置,主要配置 Spark 执行时所需的资源大小。

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从 HDFS 文件中读取 text 格式数据的配置案例。

input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
}
}

Filter

在 Filter 部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将 HTTPDATE 转化为 ClickHouse 支持的日期格式、对 Number 类型的字段进行类型转换以及通过 SQL 进行字段筛减等

filter {
# 使用正则解析原始日志
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
}

# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# "yyyy/MM/dd HH:mm:ss"格式的数据
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
}

# 使用SQL筛选关注的字段,并对字段进行处理
# 甚至可以通过过滤条件过滤掉不关心的数据
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
}
}

Output

最后我们将处理好的结构化数据写入 ClickHouse

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Running seatunnel

我们将上述四部分配置组合成为我们的配置文件 config/batch.conf

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
}
}

filter {
# 使用正则解析原始日志
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
}

# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# "yyyy/MM/dd HH:mm:ss"格式的数据
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
}

# 使用SQL筛选关注的字段,并对字段进行处理
# 甚至可以通过过滤条件过滤掉不关心的数据
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
}
}

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

执行命令,指定配置文件,运行 Seatunnel,即可将数据写入 ClickHouse。这里我们以本地模式为例。

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

Conclusion

在这篇文章中,我们介绍了如何使用 Seatunnel 将 HDFS 中的 Nginx 日志文件导入 ClickHouse 中。仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码。除了支持 HDFS 数据源之外,Seatunnel 同样支持将数据从 Kafka 中实时读取处理写入 ClickHouse 中。我们的下一篇文章将会介绍,如何将 Hive 中的数据快速导入 ClickHouse 中。

当然,Seatunnel 不仅仅是 ClickHouse 数据写入的工具,在 Elasticsearch 以及 Kafka等 数据源的写入上同样可以扮演相当重要的角色。

希望了解 Seatunnel 和 ClickHouse、Elasticsearch、Kafka 结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

-- Power by InterestingLab

· 阅读需 6 分钟

ClickHouse是面向OLAP的分布式列式DBMS。我们部门目前已经把所有数据分析相关的日志数据存储至ClickHouse这个优秀的数据仓库之中,当前日数据量达到了300亿。

在之前的文章 如何快速地把HDFS中的数据导入ClickHouse 中我们提到过使用 Seatunnel https://github.com/apache/incubator-seatunnel 对HDFS中的数据经过很简单的操作就可以将数据写入ClickHouse。HDFS中的数据一般是非结构化的数据,那么针对存储在Hive中的结构化数据,我们应该怎么操作呢?

Hive to ClickHouse

假定我们的数据已经存储在Hive中,我们需要读取Hive表中的数据并筛选出我们关心的字段,或者对字段进行转换,最后将对应的字段写入ClickHouse的表中。

Hive Schema

我们在Hive中存储的数据表结构如下,存储的是很常见的Nginx日志

CREATE TABLE `nginx_msg_detail`(
`hostname` string,
`domain` string,
`remote_addr` string,
`request_time` float,
`datetime` string,
`url` string,
`status` int,
`data_size` int,
`referer` string,
`cookie_info` string,
`user_agent` string,
`minute` string)
PARTITIONED BY (
`date` string,
`hour` string)

ClickHouse Schema

我们的ClickHouse建表语句如下,我们的表按日进行分区

CREATE TABLE cms.cms_msg
(
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32
) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostname) SETTINGS index_granularity = 16384

Seatunnel with ClickHouse

接下来会给大家介绍,我们如何通过 Seatunnel 将Hive中的数据写入ClickHouse中。

Seatunnel

Seatunnel 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Seatunnel 拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中。

Seatunnel的环境准备以及安装步骤这里就不一一赘述了,具体安装步骤可以参考上一篇文章或者访问 Seatunnel Docs

Seatunnel Pipeline

我们仅需要编写一个Seatunnel Pipeline的配置文件即可完成数据的导入。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
// 这个配置必需填写
spark.sql.catalogImplementation = "hive"
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Input

这一部分定义数据源,如下是从Hive文件中读取text格式数据的配置案例。

input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}

看,很简单的一个配置就可以从Hive中读取数据了。其中pre_sql是从Hive中读取数据SQL,table_name是将读取后的数据,注册成为Spark中临时表的表名,可为任意字段。

需要注意的是,必须保证hive的metastore是在服务状态。

在Cluster、Client、Local模式下运行时,必须把hive-site.xml文件置于提交任务节点的$HADOOP_CONF目录下

Filter

在Filter部分,这里我们配置一系列的转化,我们这里把不需要的minute和hour字段丢弃。当然我们也可以在读取Hive的时候通过pre_sql不读取这些字段

filter {
remove {
source_field = ["minute", "hour"]
}
}

Output

最后我们将处理好的结构化数据写入ClickHouse

output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "nginx_log"
fields = ["date", "datetime", "hostname", "url", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

Running Seatunnel

我们将上述四部分配置组合成为我们的配置文件config/batch.conf

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
// 这个配置必需填写
spark.sql.catalogImplementation = "hive"
}
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}
filter {
remove {
source_field = ["minute", "hour"]
}
}
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "seatunnel"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}

执行命令,指定配置文件,运行 Seatunnel,即可将数据写入ClickHouse。这里我们以本地模式为例。

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

Conclusion

在这篇文章中,我们介绍了如何使用 Seatunnel 将Hive中的数据导入ClickHouse中。仅仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码,十分简单。

希望了解 Seatunnel 与ClickHouse、Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

-- Power by InterestingLab

· 阅读需 8 分钟

说到数据写入 Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

今天给大家推荐一款能够实现数据快速写入的黑科技 Seatunnel https://github.com/apache/incubator-seatunnel 一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

Kafka to Elasticsearch

和Logstash一样,Seatunnel同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用 Seatunnel 将数据快速写入Elasticsearch

Log Sample

原始日志格式如下:

127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"

Elasticsearch Document

我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:

domain String
hostname String
status int
datetime String
count int

Seatunnel with Elasticsearch

接下来会给大家详细介绍,我们如何通过 Seatunnel 读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

Seatunnel

Seatunnel 同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

Prerequisites

首先我们需要安装seatunnel,安装十分简单,无需配置系统环境变量

  1. 准备Spark环境
  2. 安装 Seatunnel
  3. 配置 Seatunnel

以下是简易步骤,具体安装可以参照 Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/seatunnel/releases/download/v1.1.1/seatunnel-1.1.1.zip
unzip seatunnel-1.1.1.zip
cd seatunnel-1.1.1

vim config/seatunnel-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Seatunnel Pipeline

与Logstash一样,我们仅需要编写一个Seatunnel Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手 Seatunnel 配置。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}

Input

这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

kafkaStream {
topics = "seatunnel-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_es_group"
consumer.rebalance.max.retries = 100
}

Filter

在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合

filter {
# 使用正则解析原始日志
# 最开始数据都在raw_message字段中
grok {
source_field = "raw_message"
pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# Elasticsearch中支持的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
}
## 利用SQL对数据进行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}

Output

最后我们将处理好的结构化数据写入Elasticsearch。

output {
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}

Running Seatunnel

我们将上述四部分配置组合成为我们的配置文件 config/batch.conf

vim config/batch.conf
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.streaming.batchDuration = 5
}
input {
kafkaStream {
topics = "seatunnel-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "seatunnel_es_group"
consumer.rebalance.max.retries = 100
}
}
filter {
# 使用正则解析原始日志
# 最开始数据都在raw_message字段中
grok {
source_field = "raw_message"
pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
}
# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# Elasticsearch中支持的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
}
## 利用SQL对数据进行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${now}"
es.batch.size.entries = 100000
index_time_format = "yyyy.MM.dd"
}
}

执行命令,指定配置文件,运行 Seatunnel,即可将数据写入Elasticsearch。这里我们以本地模式为例。

./bin/start-seatunnel.sh --config config/batch.conf -e client -m 'local[2]'

最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^_^.

"_source": {
"domain": "elasticsearch.cn",
"hostname": "localhost",
"status": "200",
"datetime": "2018-11-26T21:54:00.000+08:00",
"count": 26
}

Conclusion

在这篇文章中,我们介绍了如何通过 Seatunnel 将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用 Seatunnel 解决问题。

希望了解 Seatunnel 与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

Contract us

  • 邮件列表 : dev@seatunnel.apache.org. 发送任意内容至 dev-subscribe@seatunnel.apache.org, 按照回复订阅邮件列表。
  • Slack: 发送 Request to join SeaTunnel slack 邮件到邮件列表 (dev@seatunnel.apache.org), 我们会邀请你加入(在此之前请确认已经注册Slack).
  • bilibili B站 视频

· 阅读需 9 分钟

TiDB 是一款定位于在线事务处理/在线分析处理的融合型数据库产品,实现了一键水平伸缩,强一致性的多副本数据安全,分布式事务,实时 OLAP 等重要特性。

TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势。

直接使用 TiSpark 完成 OLAP 操作需要了解 Spark,还需要一些开发工作。那么,有没有一些开箱即用的工具能帮我们更快速地使用 TiSpark 在 TiDB 上完成 OLAP 分析呢?

目前开源社区上有一款工具 Seatunnel,项目地址 https://github.com/apache/incubator-seatunnel ,可以基于Spark,在 TiSpark 的基础上快速实现 TiDB 数据读取和 OLAP 分析。

使用 Seatunnel 操作TiDB

在我们线上有这么一个需求,从 TiDB 中读取某一天的网站访问数据,统计每个域名以及服务返回状态码的访问次数,最后将统计结果写入 TiDB 另外一个表中。 我们来看看 Seatunnel 是如何实现这么一个功能的。

Seatunnel

Seatunnel 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在 Spark 之上。Seatunnel 拥有着非常丰富的插件,支持从 TiDB、Kafka、HDFS、Kudu 中读取数据,进行各种各样的数据处理,然后将结果写入 TiDB、ClickHouse、Elasticsearch 或者 Kafka 中。

准备工作

1. TiDB 表结构介绍

Input(存储访问日志的表)

CREATE TABLE access_log (
domain VARCHAR(255),
datetime VARCHAR(63),
remote_addr VARCHAR(63),
http_ver VARCHAR(15),
body_bytes_send INT,
status INT,
request_time FLOAT,
url TEXT
)
+-----------------+--------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------+--------------+------+------+---------+-------+
| domain | varchar(255) | YES | | NULL | |
| datetime | varchar(63) | YES | | NULL | |
| remote_addr | varchar(63) | YES | | NULL | |
| http_ver | varchar(15) | YES | | NULL | |
| body_bytes_send | int(11) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| request_time | float | YES | | NULL | |
| url | text | YES | | NULL | |
+-----------------+--------------+------+------+---------+-------+

Output(存储结果数据的表)

CREATE TABLE access_collect (
date VARCHAR(23),
domain VARCHAR(63),
status INT,
hit INT
)
+--------+-------------+------+------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+------+---------+-------+
| date | varchar(23) | YES | | NULL | |
| domain | varchar(63) | YES | | NULL | |
| status | int(11) | YES | | NULL | |
| hit | int(11) | YES | | NULL | |
+--------+-------------+------+------+---------+-------+
2. 安装 Seatunnel

有了 TiDB 输入和输出表之后, 我们需要安装 Seatunnel,安装十分简单,无需配置系统环境变量

  1. 准备 Spark环境
  2. 安装 Seatunnel
  3. 配置 Seatunnel

以下是简易步骤,具体安装可以参照 Quick Start

# 下载安装Spark
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
wget
# 下载安装seatunnel
https://github.com/InterestingLab/seatunnel/releases/download/v1.2.0/seatunnel-1.2.0.zip
unzip seatunnel-1.2.0.zip
cd seatunnel-1.2.0

vim config/seatunnel-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.1.0-bin-hadoop2.7}

实现 Seatunnel 处理流程

我们仅需要编写一个 Seatunnel 配置文件即可完成数据的读取、处理、写入。

Seatunnel 配置文件由四个部分组成,分别是 SparkInputFilterOutputInput 部分用于指定数据的输入源,Filter 部分用于定义各种各样的数据处理、聚合,Output 部分负责将处理之后的数据写入指定的数据库或者消息队列。

整个处理流程为 Input -> Filter -> Output,整个流程组成了 Seatunnel 的 处理流程(Pipeline)。

以下是一个具体配置,此配置来源于线上实际应用,但是为了演示有所简化。

Input (TiDB)

这里部分配置定义输入源,如下是从 TiDB 一张表中读取数据。

input {
tidb {
database = "nginx"
pre_sql = "select * from nginx.access_log"
table_name = "spark_nginx_input"
}
}
Filter

在Filter部分,这里我们配置一系列的转化, 大部分数据分析的需求,都是在Filter完成的。Seatunnel 提供了丰富的插件,足以满足各种数据分析需求。这里我们通过 SQL 插件完成数据的聚合操作。

filter {
sql {
table_name = "spark_nginx_log"
sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
}
}
Output (TiDB)

最后, 我们将处理后的结果写入TiDB另外一张表中。TiDB Output是通过JDBC实现的

output {
tidb {
url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
table = "access_collect"
user = "username"
password = "password"
save_mode = "append"
}
}
Spark

这一部分是 Spark 的相关配置,主要配置 Spark 执行时所需的资源大小以及其他 Spark 配置。

我们的 TiDB Input 插件是基于 TiSpark 实现的,而 TiSpark 依赖于 TiKV 集群和 Placement Driver (PD)。因此我们需要指定 PD 节点信息以及 TiSpark 相关配置spark.tispark.pd.addressesspark.sql.extensions

spark {
spark.app.name = "seatunnel-tidb"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
# Set for TiSpark
spark.tispark.pd.addresses = "localhost:2379"
spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}

运行 Seatunnel

我们将上述四部分配置组合成我们最终的配置文件 conf/tidb.conf

spark {
spark.app.name = "seatunnel-tidb"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
# Set for TiSpark
spark.tispark.pd.addresses = "localhost:2379"
spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
input {
tidb {
database = "nginx"
pre_sql = "select * from nginx.access_log"
table_name = "spark_table"
}
}
filter {
sql {
table_name = "spark_nginx_log"
sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
}
}
output {
tidb {
url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
table = "access_collect"
user = "username"
password = "password"
save_mode = "append"
}
}

执行命令,指定配置文件,运行 Seatunnel ,即可实现我们的数据处理逻辑。

  • Local

./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode client --master 'local[2]'

  • yarn-client

./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode client --master yarn

  • yarn-cluster

./bin/start-seatunnel.sh --config config/tidb.conf --deploy-mode cluster -master yarn

如果是本机测试验证逻辑,用本地模式(Local)就可以了,一般生产环境下,都是使用yarn-client或者yarn-cluster模式。

检查结果

mysql> select * from access_collect;
+------------+--------+--------+------+
| date | domain | status | hit |
+------------+--------+--------+------+
| 2019-01-20 | b.com | 200 | 63 |
| 2019-01-20 | a.com | 200 | 85 |
+------------+--------+--------+------+
2 rows in set (0.21 sec)

总结

在这篇文章中,我们介绍了如何使用 Seatunnel 从 TiDB 中读取数据,做简单的数据处理之后写入 TiDB 另外一个表中。仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码。

除了支持 TiDB 数据源之外,Seatunnel 同样支持Elasticsearch, Kafka, Kudu, ClickHouse等数据源。

于此同时,我们正在研发一个重要功能,就是在 Seatunnel 中,利用 TiDB 的事务特性,实现从 Kafka 到 TiDB 流式数据处理,并且支持端(Kafka)到端(TiDB)的 Exactly-Once 数据一致性。

希望了解 Seatunnel 和 TiDB,ClickHouse、Elasticsearch、Kafka结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

联系我们

  • 邮件列表 : dev@seatunnel.apache.org. 发送任意内容至 dev-subscribe@seatunnel.apache.org, 按照回复订阅邮件列表。
  • Slack: 发送 Request to join SeaTunnel slack 邮件到邮件列表 (dev@seatunnel.apache.org), 我们会邀请你加入(在此之前请确认已经注册Slack).
  • bilibili B站 视频

-- Power by InterestingLab

· 阅读需 11 分钟

前言

StructuredStreaming是Spark 2.0以后新开放的一个模块,相比SparkStreaming,它有一些比较突出的优点:
一、它能做到更低的延迟;
二、可以做实时的聚合,例如实时计算每天每个商品的销售总额;
三、可以做流与流之间的关联,例如计算广告的点击率,需要将广告的曝光记录和点击记录关联。
以上几点如果使用SparkStreaming来实现可能会比较麻烦或者说是很难实现,但是使用StructuredStreaming实现起来会比较轻松。

如何使用StructuredStreaming

可能你没有详细研究过StructuredStreaming,但是发现StructuredStreaming能很好的解决你的需求,如何快速利用StructuredStreaming来解决你的需求?目前社区有一款工具 Seatunnel,项目地址:https://github.com/apache/incubator-seatunnel , 可以高效低成本的帮助你利用StructuredStreaming来完成你的需求。

Seatunnel

Seatunnel 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Seatunnel 拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中

准备工作

首先我们需要安装 Seatunnel,安装十分简单,无需配置系统环境变量

  1. 准备Spark环境
  2. 安装 Seatunnel
  3. 配置 Seatunnel

以下是简易步骤,具体安装可以参照 Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/seatunnel/releases/download/v1.3.0/seatunnel-1.3.0.zip
unzip seatunnel-1.3.0.zip
cd seatunnel-1.3.0

vim config/seatunnel-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Seatunnel Pipeline

我们仅需要编写一个 Seatunnel Pipeline的配置文件即可完成数据的导入。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

Input

下面是一个从kafka读取数据的例子

kafkaStream {
topics = "seatunnel"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
}

通过上面的配置就可以读取kafka里的数据了 ,topics是要订阅的kafka的topic,同时订阅多个topic可以以逗号隔开,consumer.bootstrap.servers就是Kafka的服务器列表,schema是可选项,因为StructuredStreaming从kafka读取到的值(官方固定字段value)是binary类型的,详见http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 但是如果你确定你kafka里的数据是json字符串的话,你可以指定schema,input插件将按照你指定的schema解析

Filter

下面是一个简单的filter例子

filter{
sql{
table_name = "student"
sql = "select name,age from student"
}
}

table_name是注册成的临时表名,以便于在下面的sql使用

Output

处理好的数据往外输出,假设我们的输出也是kafka

output{
kafka {
topic = "seatunnel"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "update"
checkpointLocation = "/your/path"
}
}

topic 是你要输出的topic, producer.bootstrap.servers是kafka集群列表,streaming_output_mode是StructuredStreaming的一个输出模式参数,有三种类型append|update|complete,具体使用参见文档http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

checkpointLocation是StructuredStreaming的checkpoint路径,如果配置了的话,这个目录会存储程序的运行信息,比如程序退出再启动的话会接着上次的offset进行消费。

场景分析

以上就是一个简单的例子,接下来我们就来介绍的稍微复杂一些的业务场景

场景一:实时聚合场景

假设现在有一个商城,上面有10种商品,现在需要实时求每天每种商品的销售额,甚至是求每种商品的购买人数(不要求十分精确)。 这么做的巨大的优势就是海量数据可以在实时处理的时候,完成聚合,再也不需要先将数据写入数据仓库,再跑离线的定时任务进行聚合, 操作起来还是很方便的。

kafka的数据如下

{"good_id":"abc","price":300,"user_id":123456,"time":1553216320}

那我们该怎么利用 Seatunnel 来完成这个需求呢,当然还是只需要配置就好了。

#spark里的配置根据业务需求配置
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

#配置input
input {
kafkaStream {
topics = "good_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"good_id\":\"string\",\"price\":\"integer\",\"user_id\":\"Long\",\"time\":\"Long\"}"
}
}

#配置filter
filter {

#在程序做聚合的时候,内部会去存储程序从启动开始的聚合状态,久而久之会导致OOM,如果设置了watermark,程序自动的会去清理watermark之外的状态
#这里表示使用ts字段设置watermark,界限为1天

Watermark {
time_field = "time"
time_type = "UNIX" #UNIX表示时间字段为10为的时间戳,还有其他的类型详细可以查看插件文档
time_pattern = "yyyy-MM-dd" #这里之所以要把ts对其到天是因为求每天的销售额,如果是求每小时的销售额可以对其到小时`yyyy-MM-dd HH`
delay_threshold = "1 day"
watermark_field = "ts" #设置watermark之后会新增一个字段,`ts`就是这个字段的名字
}

#之所以要group by ts是要让watermark生效,approx_count_distinct是一个估值,并不是精确的count_distinct
sql {
table_name = "good_table_2"
sql = "select good_id,sum(price) total, approx_count_distinct(user_id) person from good_table_2 group by ts,good_id"
}
}

#接下来我们选择将结果实时输出到Kafka
output{
kafka {
topic = "seatunnel"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "update"
checkpointLocation = "/your/path"
}
}

如上配置完成,启动 Seatunnel,就可以获取你想要的结果了。

场景二:多个流关联场景

假设你在某个平台投放了广告,现在要实时计算出每个广告的CTR(点击率),数据分别来自两个topic,一个是广告曝光日志,一个是广告点击日志, 此时我们就需要把两个流数据关联到一起做计算,而 Seatunnel 最近也支持了此功能,让我们一起看一下该怎么做:

点击topic数据格式

{"ad_id":"abc","click_time":1553216320,"user_id":12345}

曝光topic数据格式

{"ad_id":"abc","show_time":1553216220,"user_id":12345}

#spark里的配置根据业务需求配置
spark {
spark.app.name = "seatunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}

#配置input
input {

kafkaStream {
topics = "click_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"ad_id\":\"string\",\"user_id\":\"Long\",\"click_time\":\"Long\"}"
table_name = "click_table"
}

kafkaStream {
topics = "show_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"ad_id\":\"string\",\"user_id\":\"Long\",\"show_time\":\"Long\"}"
table_name = "show_table"
}
}

filter {

#左关联右表必须设置watermark
#右关左右表必须设置watermark
#http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking
Watermark {
source_table_name = "click_table" #这里可以指定为某个临时表添加watermark,不指定的话就是为input中的第一个
time_field = "time"
time_type = "UNIX"
delay_threshold = "3 hours"
watermark_field = "ts"
result_table_name = "click_table_watermark" #添加完watermark之后可以注册成临时表,方便后续在sql中使用
}

Watermark {
source_table_name = "show_table"
time_field = "time"
time_type = "UNIX"
delay_threshold = "2 hours"
watermark_field = "ts"
result_table_name = "show_table_watermark"
}


sql {
table_name = "show_table_watermark"
sql = "select a.ad_id,count(b.user_id)/count(a.user_id) ctr from show_table_watermark as a left join click_table_watermark as b on a.ad_id = b.ad_id and a.user_id = b.user_id "
}

}

#接下来我们选择将结果实时输出到Kafka
output {
kafka {
topic = "seatunnel"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "append" #流关联只支持append模式
checkpointLocation = "/your/path"
}
}

通过配置,到这里流关联的案例也完成了。

结语

通过配置能很快的利用StructuredStreaming做实时数据处理,但是还是需要对StructuredStreaming的一些概念了解,比如其中的watermark机制,还有程序的输出模式。

最后,Seatunnel 当然还支持spark streaming和spark 批处理。 如果你对这两个也感兴趣的话,可以阅读我们以前发布的文章《如何快速地将Hive中的数据导入ClickHouse》、 《优秀的数据工程师,怎么用Spark在TiDB上做OLAP分析》、 《如何使用Spark快速将数据写入Elasticsearch

希望了解 Seatunnel 和 HBase, ClickHouse、Elasticsearch、Kafka、MySQL 等数据源结合使用的更多功能和案例,可以直接进入官网 https://seatunnel.apache.org/

联系我们

  • 邮件列表 : dev@seatunnel.apache.org. 发送任意内容至 dev-subscribe@seatunnel.apache.org, 按照回复订阅邮件列表。
  • Slack: 发送 Request to join SeaTunnel slack 邮件到邮件列表 (dev@seatunnel.apache.org), 我们会邀请你加入(在此之前请确认已经注册Slack).
  • bilibili B站 视频