跳到主要内容
版本:Next

MongoDB CDC

MongoDB CDC 源连接器

支持这些引擎

SeaTunnel Zeta
Flink

关键特性

描述

MongoDB CDC连接器允许从MongoDB数据库读取快照数据和增量数据。

支持的数据源信息

为了使用Mongodb CDC连接器,需要以下依赖关系。 它们可以通过install-plugin.sh或Maven中央存储库下载。

数据源支持的版本Dependency
MongoDBuniversalDownload

可用性设置

1.MongoDB版本:MongoDB版本>=4.0。

2.集群部署:副本集或分片集群。

3.存储引擎:WiredTiger存储引擎。

4.权限:更改流和读取

use admin;
db.createRole(
{
role: "strole",
privileges: [{
resource: { db: "", collection: "" },
actions: [
"splitVector",
"listDatabases",
"listCollections",
"collStats",
"find",
"changeStream" ]
}],
roles: [
{ role: 'read', db: 'config' }
]
}
);

db.createUser(
{
user: 'stuser',
pwd: 'stpw',
roles: [
{ role: 'strole', db: 'admin' }
]
}
);

数据类型映射

下表列出了从MongoDB BSON类型到Seatunnel数据类型的字段数据类型映射。

MongoDB BSON TypeSeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDATE
TimestampTIMESTAMP
ObjectROW
ArrayARRAY

对于MongoDB中的特定类型,我们使用扩展JSON格式将其映射到Seatunnel STRING类型。

MongoDB BSON typeSeaTunnel STRING
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}

提示

1.在SeaTunnel中使用DECIMAL类型时,请注意最大范围不能超过34位数字,这意味着您应该使用DECIMAL(34,18)。

源配置项

Name类型必须默认值描述
hostsString-MongoDB服务器的主机名和端口对的逗号分隔列表。如 localhost:27017,localhost:27018
usernameString-连接到MongoDB时要使用的数据库用户的名称。
passwordString-连接到MongoDB时使用的密码。
databaseList-要监视更改的数据库的名称。如果未设置,则将捕获所有数据库。该数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如db1、db2。
collectionList-要监视更改的数据库中集合的名称。如果未设置,则将捕获所有集合。该集合还支持正则表达式来监视与完全限定的集合标识符匹配的多个集合。例如db1.coll1、db2.coll2。
schema-数据的结构,包括字段名和字段类型,使用单表cdc。
tables_configs-数据的结构,包括字段名和字段类型,使用多表cdc。
connection.optionsString-与号分隔了MongoDB的连接选项。如。 replicaSet=test&connectTimeoutMS=300000.
batch.sizeLong1024批量大小。
poll.max.batch.sizeEnum1024轮询新数据时,单个批中包含的更改流文档的最大数量。
poll.await.time.msLong1000在检查更改流上的新结果之前等待的时间量。
heartbeat.interval.msString0发送心跳消息之间的时间长度(毫秒)。使用0禁用。
incremental.snapshot.chunk.size.mbLong64增量快照的块大小(mb)。
common-options-源插件常用参数,请参考 Source Common Options

提示

1.如果集合更改速度较慢,强烈建议为heartbeat.interval.ms参数设置一个大于0的适当值。当我们从检查点或保存点恢复Seatunnel作业时,心跳事件可以向前推resumeToken以避免其过期。
2.MongoDB对单个文档的限制为16MB。变更文档包含其他信息,因此即使原始文档不超过15MB,变更文档也可能超过16MB的限制,从而导致变更流操作终止。
3.建议使用不可变分片键。在MongoDB中,分片键允许在启用事务后进行修改,但更改分片键可能会导致频繁的分片迁移,从而导致额外的性能开销。此外,修改分片键也可能导致更新查找功能失效,从而导致CDC(变更数据捕获)场景中的结果不一致。
4.“schema”和“tables_config”是互斥的,必须一次配置一个。

更新数据的流

更新流 是MongoDB 3.6为副本集和分片集群提供的一项新功能,允许应用程序访问实时数据更改,而不会出现尾随oplog的复杂性和风险。 应用程序可以使用更改流订阅单个集合、数据库或整个部署上的所有数据更改,并立即对其做出反应。

查找更新操作的完整文档更改流提供的一项功能,它可以配置更改流以返回更新文档的最新多数提交版本。由于此功能,我们可以轻松收集最新的完整文档,并将更改日志转换为Changelog流。

更新流中删除事件捕获的数据格式:delete envet

{
"_id": { <Resume Token> },
"operationType": "delete",
"clusterTime": <Timestamp>,
"ns": {
"db": "engineering",
"coll": "users"
},
"documentKey": {
"_id": ObjectId("599af247bb69cd89961c986d")
}
}

由于在更新流游标向客户端发送删除事件时文档已不存在,因此省略了完整文档。

如何创建MongoDB CDC数据同步作业

CDC数据打印到客户端

以下示例演示了如何创建数据同步作业,该作业从MongoDB读取cdc数据并将其打印到本地客户端:

env {
# 您可以在此处设置engine配置
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
schema = {
table = "inventory.products"
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : string
}
}
}
}

# 控制台打印读取的Mongodb数据
sink {
Console {
parallelism = 1
}
}

CDC数据写入MysqlDB

以下示例演示了如何创建数据同步作业,该作业从MongoDB读取cdc数据并写入mysql数据库:

env {
# 您可以在此处设置engine配置
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
schema = {
table = "inventory.products"
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : string
}
}
}
}

sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"

generate_sink_sql = true
# 您需要同时配置数据库和表
database = mongodb_cdc
table = products
primary_keys = ["_id"]
}
}

多表同步

以下示例演示了如何创建数据同步作业,该作业读取多个库表mongodb的cdc数据并将其打印到本地客户端:

env {
# 您可以在此处设置engine配置
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products", "inventory.orders"]
username = superuser
password = superpw
tables_configs = [
{
schema {
table = "inventory.products"
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : string
}
}
},
{
schema {
table = "inventory.orders"
fields {
"_id" : string,
"order_number" : int,
"order_date" : string,
"quantity" : int,
"product_id" : string
}
}
}
]
}
}

# 控制台打印读取的Mongodb数据
sink {
Console {
}
}

实时流数据格式

{
_id : { <BSON Object> }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream
"operationType" : "<operation>", // The type of change operation that occurred, such as: insert, delete, update, etc.
"fullDocument" : { <document> }, // The full document data involved in the change operation. This field does not exist in delete operations
"ns" : {
"db" : "<database>", // The database where the change operation occurred
"coll" : "<collection>" // The collection where the change operation occurred
},
"to" : { // These fields are displayed only when the operation type is 'rename'
"db" : "<database>", // The new database name after the change
"coll" : "<collection>" // The new collection name after the change
},
"source":{
"ts_ms":"<timestamp>", // The timestamp when the change operation occurred
"table":"<collection>" // The collection where the change operation occurred
"db":"<database>", // The database where the change operation occurred
"snapshot":"false" // Identify the current stage of data synchronization
},
"documentKey" : { "_id" : <value> }, // The _id field value of the document involved in the change operation
"updateDescription" : { // Description of the update operation
"updatedFields" : { <document> }, // The fields and values that the update operation modified
"removedFields" : [ "<field>", ... ] // The fields and values that the update operation removed
}
"clusterTime" : <Timestamp>, // The timestamp of the Oplog log entry corresponding to the change operation
"txnNumber" : <NumberLong>, // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number
"lsid" : { // Represents information related to the Session in which the transaction is located
"id" : <UUID>,
"uid" : <BinData>
}
}

修改日志

Change Log
ChangeCommitVersion
[Fix][Mongo-cdc] Fallback to timestamp startup mode when resume token has expired (#8754)https://github.com/apache/seatunnel/commit/afc990d84e2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Feature][Mongodb-CDC] Support multi-table read (#8029)https://github.com/apache/seatunnel/commit/49cbaeb9b32.3.9
[Bug][connectors-v2] fix mongodb bson convert exception (#8044)https://github.com/apache/seatunnel/commit/b222c13f2f2.3.9
[Feature][Core] Support cdc task ddl restore for zeta (#7463)https://github.com/apache/seatunnel/commit/8e322281ed2.3.9
[Feature][Transform-v2] Add metadata transform (#7899)https://github.com/apache/seatunnel/commit/699d16552a2.3.9
[Bug][Connector-v2] MongoDB CDC Set SeatunnelRow's tableId (#7935)https://github.com/apache/seatunnel/commit/f3970d61882.3.9
[Improve] Add conditional of start.mode with timestamp in mongo cdc option rule (#6770)https://github.com/apache/seatunnel/commit/65ae7782c92.3.6
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a52.3.5
[Improve][CDC] Optimize memory allocation for snapshot split reading (#6281)https://github.com/apache/seatunnel/commit/48566458372.3.5
[Fix][Connector-V2] Fix mongodb cdc start up mode option values not right (#6338)https://github.com/apache/seatunnel/commit/c07f56fbc42.3.5
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Bug][CDC] Fix state recovery error when switching a single table to multiple tables (#5784)https://github.com/apache/seatunnel/commit/37fcff347e2.3.4
[Improve][CDC] Clean unused code (#5785)https://github.com/apache/seatunnel/commit/b5a66d3dbe2.3.4
[Dependency]Bump org.apache.avro:avro (#5583)https://github.com/apache/seatunnel/commit/bb791a6d9e2.3.4
[Improve] Remove catalog tag for config file (#5645)https://github.com/apache/seatunnel/commit/dc509aa0802.3.4
[Improve][Pom] Add junit4 to the root pom (#5611)https://github.com/apache/seatunnel/commit/7b4f7db2a22.3.4
[Feature][CDC] Support MongoDB CDC running on flink (#5644)https://github.com/apache/seatunnel/commit/8c569b15412.3.4
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562)https://github.com/apache/seatunnel/commit/41173357f82.3.4
[BUG][Connector-V2][Mongo-cdc] Incremental data kind error in snapshot phase (#5184)https://github.com/apache/seatunnel/commit/ead1c5fd8c2.3.3
[Hotfix]Fix array index anomalies caused by #5057 (#5195)https://github.com/apache/seatunnel/commit/1c334295062.3.3
[Hotfix][MongodbCDC]Refine data format to adapt to universal logic (#5162)https://github.com/apache/seatunnel/commit/4b4b5f96402.3.3
[Hotfix][Mongodb cdc] Solve startup resume token is negative (#5143)https://github.com/apache/seatunnel/commit/e964c03dca2.3.3
[Hotfix]Fix mongodb cdc e2e instability (#5128)https://github.com/apache/seatunnel/commit/6f30b296622.3.3
[Feature][connector-v2][mongodbcdc]Support source mongodb cdc (#4923)https://github.com/apache/seatunnel/commit/d729fcba4c2.3.3