跳到主要内容
版本:Next

MongoDB

MongoDB 数据接收(Sink)连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

关键特性

提示

  1. 如果希望使用 CDC 写入功能,建议启用 upsert-enable 配置项。

介绍

MongoDB 连接器提供从 MongoDB 读取数据以及向 MongoDB 写入数据的能力。
本文档将介绍如何配置 MongoDB 连接器,以便执行向 MongoDB 写入数据的任务。

支持的数据源信息

要使用 MongoDB 连接器,需要以下依赖。
可通过 install-plugin.sh 下载,或从 Maven 中央仓库获取。

数据源支持版本依赖
MongoDB通用版本下载

数据类型映射

以下表格展示了 MongoDB BSON 类型与 SeaTunnel 数据类型之间的映射关系。

SeaTunnel 数据类型MongoDB BSON 类型
STRINGObjectId
STRINGString
BOOLEANBoolean
BINARYBinary
INTEGERInt32
TINYINTInt32
SMALLINTInt32
BIGINTInt64
DOUBLEDouble
FLOATDouble
DECIMALDecimal128
DateDate
TimestampTimestamp / Date
ROWObject
ARRAYArray

提示

  1. 使用 SeaTunnel 将 DateTimestamp 类型写入 MongoDB 时,MongoDB 中都会生成 Date 类型字段,但精度不同:SeaTunnel 的 Date 类型精度为秒,Timestamp 类型精度为毫秒。
  2. 当使用 DECIMAL 类型时,最大精度不能超过 34 位,也就是说应使用 decimal(34, 18)

Sink 参数说明

参数名称类型是否必填默认值说明
uriString-MongoDB 标准连接 URI,例如:mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true
databaseString-要读取或写入的 MongoDB 数据库名称。配置多表同步时,可使用占位符 ${database_name},例如:database = "${database_name}_test_database"
collectionString-要读取或写入的 MongoDB 集合名称。配置多表同步时,可使用 ${table_name}${schema_name} 等占位符,例如:collection = "${database_name}_${schema_name}_${table_name}_check"
buffer-flush.max-rowsString1000每次批量写入请求的最大缓存行数。
buffer-flush.intervalString30000批量写入的最大时间间隔(毫秒)。
retry.maxString3写入失败时的最大重试次数。
retry.intervalDuration1000写入失败后的重试间隔时间(毫秒)。
upsert-enableBooleanfalse是否启用 upsert 模式进行写入。
primary-keyList-用于 upsert 或更新操作的主键,格式为 ["id","name",...]
transactionBooleanfalse是否在 MongoSink 中使用事务(需要 MongoDB 4.2+)。
common-options--通用 Sink 插件参数,详见 Sink Common Options
data_save_modeStringAPPEND_DATA数据写入模式:
- DROP_DATA: 插入数据前清空集合;
- APPEND_DATA: 追加数据;
- ERROR_WHEN_DATA_EXISTS: 如果集合已有数据则报错。

提示

  1. MongoDB Sink 连接器的数据刷新逻辑由以下三个参数共同控制:buffer-flush.max-rowsbuffer-flush.intervalcheckpoint.interval
    任一条件满足时,都会触发数据刷写。
  2. 兼容历史参数 upsert-key。若已设置 upsert-key,请勿同时设置 primary-key

如何创建 MongoDB 数据同步任务

下面示例展示了一个将随机生成的数据写入 MongoDB 的数据同步任务:

# 设置作业的基本配置
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 1000
}

source {
FakeSource {
row.num = 2
bigint.min = 0
bigint.max = 10000000
split.num = 1
split.read-interval = 300
schema {
fields {
c_bigint = bigint
}
}
}
}

sink {
MongoDB {
uri = mongodb://user:password@127.0.0.1:27017
database = "test"
collection = "test"
}
}

参数详解

MongoDB 数据库连接 URI 示例

无认证的单节点连接:

mongodb://127.0.0.0:27017/mydb

副本集连接:

mongodb://127.0.0.0:27017/mydb?replicaSet=xxx

带认证的副本集连接:

mongodb://admin:password@127.0.0.0:27017/mydb?replicaSet=xxx&authSource=admin

多节点副本集连接:

mongodb://127.0.0.1:27017,127.0.0.2:27017,127.0.0.3:27017/mydb?replicaSet=xxx

分片集群连接:

mongodb://127.0.0.0:27017/mydb

多个 mongos 节点连接:

mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb

注意:URI 中的用户名与密码在拼接前必须进行 URL 编码。

Buffer Flush 示例

sink {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
buffer-flush.max-rows = 2000
buffer-flush.interval = 1000
}
}

为什么不推荐频繁使用事务?

虽然 MongoDB 自 4.2 版本起已完全支持多文档事务,但这并不意味着所有场景都应使用。
事务意味着加锁、节点协调、额外开销和性能损耗。
设计系统时应遵循的原则是:能不用事务就不要用事务
合理的系统设计可以在大多数情况下避免对事务的依赖。

幂等写入(Idempotent Writes)

通过定义明确的主键并启用 upsert 模式,可以实现精准一次写入(exactly-once)语义。

当配置中定义了 primary-key 且启用了 upsert-enable,MongoDB Sink 将使用 Upsert 语义而非普通 INSERT 语句。
SeaTunnel 会将定义的主键作为 MongoDB 的复合主键,在 Upsert 模式下进行写入,以确保幂等性。

若作业在运行过程中失败,SeaTunnel 会从上一个成功的 checkpoint 恢复并重新处理数据,这可能导致重复数据。
强烈建议启用 Upsert 模式,以避免主键冲突或重复插入。

sink {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
upsert-enable = true
primary-key = ["name","status"]
}
}

更新日志

Change Log
ChangeCommitVersion
[Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671)https://github.com/apache/seatunnel/commit/9212a771402.3.12
[fix][connector-mango] fix split with avgSize zero error (#9255)https://github.com/apache/seatunnel/commit/564863b9332.3.11
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Fix][MongoDB] The Long type cannot handle string values in scientific notation (#8783)https://github.com/apache/seatunnel/commit/00f550e3d02.3.11
[Improve] sink mongodb schema is not required (#8887)https://github.com/apache/seatunnel/commit/3cfe8c12b92.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Fix][Connector-Mongodb] close MongodbClient when close MongodbReader (#8592)https://github.com/apache/seatunnel/commit/06b2fc0e062.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Bug][connectors-v2] fix mongodb bson convert exception (#8044)https://github.com/apache/seatunnel/commit/b222c13f2f2.3.9
[Hotfix][Connector-v2] Fix the ClassCastException for connector-mongodb (#7586)https://github.com/apache/seatunnel/commit/dc43370e8c2.3.8
[Improve][Test][Connector-V2][MongoDB] Add few test cases for BsonToRowDataConverters (#7579)https://github.com/apache/seatunnel/commit/a797041e5d2.3.8
[Improve][Connector-V2][MongoDB] A BsonInt32 will be convert to a long type (#7567)https://github.com/apache/seatunnel/commit/adf26c20c52.3.8
[Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (#6997)https://github.com/apache/seatunnel/commit/c5159a27602.3.6
[bugfix][connector-mongodb] fix mongodb null value write (#6967)https://github.com/apache/seatunnel/commit/c5ecda50f82.3.6
[Improve][MongoDB] Implement TableSourceFactory to create mongodb source (#5813)https://github.com/apache/seatunnel/commit/59cccb60972.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
[bugfix][mongodb] Fixed unsupported exception caused by bsonNull (#5659)https://github.com/apache/seatunnel/commit/cab864aa4d2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
[Improve][Connector-v2][Mongodb]sink support transaction update/writing (#5034)https://github.com/apache/seatunnel/commit/b1203c905e2.3.3
[Hotfix][Connector-V2][Mongodb] Compatible with historical parameters (#4997)https://github.com/apache/seatunnel/commit/31db35bee72.3.3
[Improve][Connector-v2][Mongodb]Optimize reading logic (#5001)https://github.com/apache/seatunnel/commit/830196d8b72.3.3
[Hotfix][Connector-V2][Mongodb] Fix document error content and remove redundant code (#4982)https://github.com/apache/seatunnel/commit/526197af672.3.3
[Feature][connector-v2][mongodb] mongodb support cdc sink (#4833)https://github.com/apache/seatunnel/commit/cb651cd7f32.3.3
[Feature][Connector-v2][Mongodb]Refactor mongodb connector (#4620)https://github.com/apache/seatunnel/commit/5b1a843e402.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Improve] mongodb connector v2 add source query capability (#3697)https://github.com/apache/seatunnel/commit/8a7fe6fcb62.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][MongoDB] Unified exception for MongoDB source & sink connector (#3522)https://github.com/apache/seatunnel/commit/5af632e32b2.3.0
[Feature][Connector V2] expose configurable options in MongoDB (#3347)https://github.com/apache/seatunnel/commit/ffd5778efc2.3.0
[Improve][all] change Log to @Slf4j (#3001)https://github.com/apache/seatunnel/commit/6016100f122.3.0-beta
[Improve][Connector-V2] Improve mongodb connector (#2778)https://github.com/apache/seatunnel/commit/efbf793fa52.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f755c2.2.0-beta
[Feature][Connector-V2] Add mongodb connecter sink (#2694)https://github.com/apache/seatunnel/commit/51c28a33872.2.0-beta
[Feature][Connector-V2] Add mongodb connecter source (#2596)https://github.com/apache/seatunnel/commit/3ee8a8a6192.2.0-beta