跳到主要内容
版本:Next

Ogg 格式

Oracle GoldenGate (a.k.a ogg) 是一项托管服务,提供实时数据网格平台,该平台使用复制来保持数据高度可用,并支持实时分析。客户可以设计、执行和监控其数据复制和流数据处理解决方案,而无需分配或管理计算环境。 Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。

SeaTunnel 支持将 Ogg JSON 消息解释为 Seatunnel 系统中的 INSERT/UPDATE/DELETE 消息。在许多情况下,这个特性带来了很多便利,例如

    将增量数据从数据库同步到其他系统
审计日志
数据库的实时物化视图
关联维度数据库的变更历史,等等。

SeaTunnel 还支持将 SeaTunnel 中的 INSERT/UPDATE/DELETE 消息转化为 Ogg JSON 消息,并将其发送到类似 Kafka 这样的存储中。然而,目前 SeaTunnel 无法将 UPDATE_BEFORE 和 UPDATE_AFTER 组合成单个 UPDATE 消息。因此,Seatunnel 将 UPDATE_BEFORE 和 UPDATE_AFTER 转化为 DELETE 和 INSERT Ogg 消息来实现

格式选项

选项默认值是否需要描述
format(none)指定要使用的格式,这里应该是-json
ogg_json.ignore-parse-errorsfalse跳过有解析错误的字段和行而不是失败。如果出现错误,字段将设置为 null
ogg_json.database.include(none)正则表达式,可选,通过正则匹配 Canal 记录中的database元字段来仅读取特定数据库变更日志行。此字符串Pattern模式与Java的Pattern兼容
ogg_json.table.include(none)正则表达式,可选,通过正则匹配 Canal 记录中的 table 元字段来仅读取特定表的更改日志行。此字符串Pattern模式与Java的Pattern兼容

如何使用 Ogg 格式

Kafka 使用示例

Ogg 为变更日志提供了统一的格式,下面是从 Oracle PRODUCTS 表捕获变更操作的简单示例:

{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}

注:各字段含义请参考 Debezium 文档

此 Oracle PRODUCTS 表有 4 列 (id, name, description 和 weight) 上面的 JSON 消息是 products 表上的更新更改事件,其中 id = 111 的行的字段 weight 的值从 5.18 更改为 5.15。 假设此表的 binlog 的消息已经同步到 Kafka topic,那么我们可以使用下面的 SeaTunnel 示例来消费这个 topic 并体现变更事件。

env {
parallelism = 1
job.mode = "STREAMING"
}
source {
Kafka {
bootstrap.servers = "127.0.0.1:9092"
topic = "ogg"
plugin_output = "kafka_name"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "double"
}
},
format = ogg_json
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "12345678"
table = "ogg"
primary_keys = ["id"]
}
}