Skip to main content
Version: 2.3.5

Ogg Format

Oracle GoldenGate (a.k.a ogg) is a managed service providing a real-time data mesh platform, which uses replication to keep data highly available, and enabling real-time analysis. Customers can design, execute, and monitor their data replication and stream data processing solutions without the need to allocate or manage compute environments. Ogg provides a format schema for changelog and supports to serialize messages using JSON.

Seatunnel supports to interpret Ogg JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as

    synchronizing incremental data from databases to other systems
auditing logs
real-time materialized views on databases
temporal join changing history of a database table and so on.

Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Ogg JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Ogg messages.

Format Options

OptionDefaultRequiredDescription
format(none)yesSpecify what format to use, here should be '-json'.
ogg_json.ignore-parse-errorsfalsenoSkip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
ogg_json.database.include(none)noAn optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern.
ogg_json.table.include(none)noAn optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern.

How to use Ogg format

Kafka uses example

Ogg provides a unified format for changelog, here is a simple example for an update operation captured from a Oracle products table:

{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}

Note: please refer to Debezium documentation about the meaning of each fields.

The Oracle products table has 4 columns (id, name, description and weight). The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events.

env {
parallelism = 1
job.mode = "STREAMING"
}
source {
Kafka {
bootstrap.servers = "127.0.0.1:9092"
topic = "ogg"
result_table_name = "kafka_name"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "double"
}
},
format = ogg_json
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "12345678"
table = "ogg"
primary_keys = ["id"]
}
}