事件监听器
介绍
SeaTunnel提供了丰富的事件监听器功能,用于管理数据同步时的状态。此功能在需要监听任务运行状态时十分重要(org.apache.seatunnel.api.event
)。本文档将指导您如何使用这些参数并有效地利用他们。
支持的引擎
SeaTunnel Zeta
Flink
Spark
API
事件(event)API的定义在 org.apache.seatunnel.api.event
包中。
Event Data API
org.apache.seatunnel.api.event.Event
- 事件数据的接口。org.apache.seatunnel.api.event.EventType
- 事件数据的枚举值。
EventType 枚举说明
EventType
枚举定义了系统中所有可能的事件类型,主要包括:
事件类型 | 说明 | 关联事件类 |
---|---|---|
JOB_STATUS | 作业状态变更事件 | JobStateEvent |
SCHEMA_CHANGE_UPDATE_COLUMNS | 表结构更新事件 | AlterTableColumnsEvent |
SCHEMA_CHANGE_ADD_COLUMN | 表添加列事件 | AlterTableAddColumnEvent |
SCHEMA_CHANGE_DROP_COLUMN | 表删除列事件 | AlterTableDropColumnEvent |
SCHEMA_CHANGE_MODIFY_COLUMN | 表修改列事件 | AlterTableModifyColumnEvent |
READER_OPEN | 读取器打开事件 | ReaderOpenEvent |
READER_CLOSE | 读取器关闭事件 | ReaderCloseEvent |
WRITER_OPEN | 写入器打开事件 | WriterOpenEvent |
WRITER_CLOSE | 写入器关闭事件 | WriterCloseEvent |
注意:不同事件类型对应不同的事件数据结构,在自定义事件处理器时需通过
event.getEventType()
进行类型判断,以确保类型安全转换。
Event Listener API
您可以自定义事件处理器,例如将事件发送到外部系统。
org.apache.seatunnel.api.event.EventHandler
- 事件处理器的接口,SPI将会自动从类路径中加载子类。
Event Collect API
org.apache.seatunnel.api.source.SourceSplitEnumerator
- 在SourceSplitEnumerator
加载事件监听器。
package org.apache.seatunnel.api.source;
public interface SourceSplitEnumerator {
interface Context {
/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this enumerator.
*
* @return
*/
EventListener getEventListener();
}
}
org.apache.seatunnel.api.source.SourceReader
- 在SourceReader
加载事件监听器。
package org.apache.seatunnel.api.source;
public interface SourceReader {
interface Context {
/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this reader.
*
* @return
*/
EventListener getEventListener();
}
}
org.apache.seatunnel.api.sink.SinkWriter
- 在SinkWriter
加载事件监听器。
package org.apache.seatunnel.api.sink;
public interface SinkWriter {
interface Context {
/**
* Get the {@link org.apache.seatunnel.api.event.EventListener} of this writer.
*
* @return
*/
EventListener getEventListener();
}
}
设置监听器
您需要设置引擎配置以使用事件监听器功能。
Zeta 引擎
配置样例(seatunnel.yaml):
seatunnel:
engine:
event-report-http:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
Flink 引擎
您可以定义 org.apache.seatunnel.api.event.EventHandler
接口并添加到类路径,SPI会自动加载。
支持的flink版本: 1.14.0+
样例: org.apache.seatunnel.api.event.LoggingEventHandler
Spark 引擎
您可以定义 org.apache.seatunnel.api.event.EventHandler
接口并添加到类路径,SPI会自动加载。
自定义事件处理器实现步骤
下面以 JobStateEvent
为例,介绍如何实现一个自定义事件处理器,您可以根据需要扩展此方法以处理其他类型的事件。
1. 添加依赖
在项目 pom.xml
中引入必要依赖:
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-engine-common</artifactId>
<version>${seatunnel.version}</version>
<scope>provided</scope>
</dependency>
注意:需将
${seatunnel.version}
替换为实际使用的 SeaTunnel 版本。
2. 实现事件处理器
自定义类实现 org.apache.seatunnel.api.event.EventHandler
接口,并重写 handle
方法,针对需要处理的事件类型进行业务逻辑处理。
核心逻辑:通过 event.getEventType()
过滤事件类型——由于 SeaTunnel 引擎会分发多种类型的事件,需显式判断事件类型,以确保仅处理目标事件。
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.event.EventHandler;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
import org.apache.seatunnel.api.event.schema.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.event.source.ReaderOpenEvent;
import org.apache.seatunnel.api.event.sink.WriterCloseEvent;
/**
* 自定义多类型事件处理器示例,包含多种事件的处理逻辑
*/
@Slf4j
public class CustomMultiEventHandler implements EventHandler {
@Override
public void handle(Event event) {
// 根据事件类型进行不同处理
EventType eventType = event.getEventType();
switch (eventType) {
case JOB_STATUS:
handleJobStateEvent((JobStateEvent) event);
break;
case SCHEMA_CHANGE_ADD_COLUMN:
handleAddColumnEvent((AlterTableAddColumnEvent) event);
break;
case READER_OPEN:
handleReaderOpenEvent((ReaderOpenEvent) event);
break;
case WRITER_CLOSE:
handleWriterCloseEvent((WriterCloseEvent) event);
break;
// 可根据需要添加其他事件类型的处理
default:
// 忽略不处理的事件类型
log.debug("忽略未处理的事件类型: {}", eventType);
}
}
/**
* 处理作业状态事件
*/
private void handleJobStateEvent(JobStateEvent jobEvent) {
String jobId = jobEvent.getJobId();
String jobName = jobEvent.getJobName();
JobStatus status = jobEvent.getJobStatus();
long eventTime = jobEvent.getCreatedTime();
switch (status) {
case FAILED:
log.error("任务失败 | jobId: {}, jobName: {}, 时间: {}",
jobId, jobName, eventTime);
// 添加失败告警逻辑
sendAlert("任务失败", "jobId: " + jobId);
break;
case FINISHED:
log.info("任务完成 | jobId: {}, jobName: {}, 时间: {}",
jobId, jobName, eventTime);
break;
// 处理其他状态...
default:
log.info("任务状态变更 | jobId: {}, 状态: {}, 时间: {}",
jobId, status, eventTime);
}
}
/**
* 处理表添加列事件
*/
private void handleAddColumnEvent(AlterTableAddColumnEvent event) {
log.info("表添加列 | 表名: {}, 新增列: {}, 时间: {}",
event.getTableName(), event.getAddedColumns(), event.getEventTime());
// 处理表结构变更逻辑
}
/**
* 处理读取器打开事件
*/
private void handleReaderOpenEvent(ReaderOpenEvent event) {
log.info("读取器打开 | 插件ID: {}, 并行度: {}, 时间: {}",
event.getPluginId(), event.getParallelism(), event.getEventTime());
// 处理读取器初始化逻辑
}
/**
* 处理写入器关闭事件
*/
private void handleWriterCloseEvent(WriterCloseEvent event) {
log.info("写入器关闭 | 插件ID: {}, 处理记录数: {}, 时间: {}",
event.getPluginId(), event.getRecordCount(), event.getEventTime());
// 处理写入器资源清理逻辑
}
/**
* 发送告警通知
*/
private void sendAlert(String title, String content) {
// 实现告警逻辑(如调用HTTP接口、发送邮件等)
log.info("[告警] {}: {}", title, content);
}
}
3. 配置 SPI 加载
为使引擎自动发现并加载自定义处理器,需在项目资源目录中添加 SPI 配置文件:
- 创建目录:
src/main/resources/META-INF/services/
- 新建文件:
org.apache.seatunnel.api.event.EventHandler
- 在文件中添加自定义处理器的全类名:
com.example.CustomMultiEventHandler
4. 部署与验证
- 将包含自定义处理器的 JAR 包放入 SeaTunnel 引擎的类路径(如
lib/
目录) - 启动任务后,当对应事件发生时,处理器会自动触发并执行相应的处理逻辑
- 可通过日志输出验证处理器是否生效
注意事项
- 处理器逻辑应尽量轻量,避免阻塞事件处理线程
- 若需网络调用(如发送告警),建议使用异步方式实现,防止超时影响任务本身
- 不同引擎对事件的支持情况可能不同,例如
JobStateEvent
目前仅支持 Zeta 引擎 - 事件类型与事件类是一一对应的,转换时需确保类型匹配,避免
ClassCastException
- 可以根据业务需求,实现多个事件处理器分别处理不同类型的事件,也可以在一个处理器中处理多种事件类型
通过上述步骤,您可以灵活地监听和处理 SeaTunnel 中的各种事件,实现自定义的业务逻辑,如状态监控、告警通知、数据统计等功能。