MongoDB
MongoDB 数据接收(Sink)连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
关键特性
提示
- 如果希望使用 CDC 写入功能,建议启用
upsert-enable配置项。
介绍
MongoDB 连接器提供从 MongoDB 读取数据以及向 MongoDB 写入数据的能力。
本文档将介绍如何配置 MongoDB 连接器,以便执行向 MongoDB 写入数据的任务。
支持的数据源信息
要使用 MongoDB 连接器,需要以下依赖。
可通过 install-plugin.sh 下载,或从 Maven 中央仓库获取。
| 数据源 | 支持版本 | 依赖 |
|---|---|---|
| MongoDB | 通用版本 | 下载 |
数据类型映射
以下表格展示了 MongoDB BSON 类型与 SeaTunnel 数据类型之间的映射关系。
| SeaTunnel 数据类型 | MongoDB BSON 类型 |
|---|---|
| STRING | ObjectId |
| STRING | String |
| BOOLEAN | Boolean |
| BINARY | Binary |
| INTEGER | Int32 |
| TINYINT | Int32 |
| SMALLINT | Int32 |
| BIGINT | Int64 |
| DOUBLE | Double |
| FLOAT | Double |
| DECIMAL | Decimal128 |
| Date | Date |
| Timestamp | Timestamp / Date |
| ROW | Object |
| ARRAY | Array |
提示
- 使用 SeaTunnel 将
Date和Timestamp类型写入 MongoDB 时,MongoDB 中都会生成Date类型字段,但精度不同:SeaTunnel 的Date类型精度为秒,Timestamp类型精度为毫秒。- 当使用
DECIMAL类型时,最大精度不能超过 34 位,也就是说应使用decimal(34, 18)。
Sink 参数说明
| 参数名称 | 类型 | 是否必填 | 默认值 | 说明 |
|---|---|---|---|---|
| uri | String | 是 | - | MongoDB 标准连接 URI,例如:mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true。 |
| database | String | 是 | - | 要读取或写入的 MongoDB 数据库名称。配置多表同步时,可使用占位符 ${database_name},例如:database = "${database_name}_test_database"。 |
| collection | String | 是 | - | 要读取或写入的 MongoDB 集合名称。配置多表同步时,可使用 ${table_name}、${schema_name} 等占位符,例如:collection = "${database_name}_${schema_name}_${table_name}_check"。 |
| buffer-flush.max-rows | String | 否 | 1000 | 每次批量写入请求的最大缓存行数。 |
| buffer-flush.interval | String | 否 | 30000 | 批量写入的最大时间间隔(毫秒)。 |
| retry.max | String | 否 | 3 | 写入失败时的最大重试次数。 |
| retry.interval | Duration | 否 | 1000 | 写入失败后的重试间隔时间(毫秒)。 |
| upsert-enable | Boolean | 否 | false | 是否启用 upsert 模式进行写入。 |
| primary-key | List | 否 | - | 用于 upsert 或更新操作的主键,格式为 ["id","name",...]。 |
| transaction | Boolean | 否 | false | 是否在 MongoSink 中使用事务(需要 MongoDB 4.2+)。 |
| common-options | - | 否 | - | 通用 Sink 插件参数,详见 Sink Common Options。 |
| data_save_mode | String | 否 | APPEND_DATA | 数据写入模式: - DROP_DATA: 插入数据前清空集合;- APPEND_DATA: 追加数据;- ERROR_WHEN_DATA_EXISTS: 如果集合已有数据则报错。 |
提示
- MongoDB Sink 连接器的数据刷新逻辑由以下三个参数共同控制:
buffer-flush.max-rows、buffer-flush.interval和checkpoint.interval。
任一条件满足时,都会触发数据刷写。- 兼容历史参数
upsert-key。若已设置upsert-key,请勿同时设置primary-key。
如何创建 MongoDB 数据同步任务
下面示例展示了一个将随机生成的数据写入 MongoDB 的数据同步任务:
# 设置作业的基本配置
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 1000
}
source {
FakeSource {
row.num = 2
bigint.min = 0
bigint.max = 10000000
split.num = 1
split.read-interval = 300
schema {
fields {
c_bigint = bigint
}
}
}
}
sink {
MongoDB {
uri = mongodb://user:password@127.0.0.1:27017
database = "test"
collection = "test"
}
}
参数详解
MongoDB 数据库连接 URI 示例
无认证的单节点连接:
mongodb://127.0.0.0:27017/mydb
副本集连接:
mongodb://127.0.0.0:27017/mydb?replicaSet=xxx
带认证的副本集连接:
mongodb://admin:password@127.0.0.0:27017/mydb?replicaSet=xxx&authSource=admin
多节点副本集连接:
mongodb://127.0.0.1:27017,127.0.0.2:27017,127.0.0.3:27017/mydb?replicaSet=xxx
分片集群连接:
mongodb://127.0.0.0:27017/mydb
多个 mongos 节点连接:
mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
注意:URI 中的用户名与密码在拼接前必须进行 URL 编码。
Buffer Flush 示例
sink {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
buffer-flush.max-rows = 2000
buffer-flush.interval = 1000
}
}
为什么不推荐频繁使用事务?
虽然 MongoDB 自 4.2 版本起已完全支持多文档事务,但这并不意味着所有场景都应使用。
事务意味着加锁、节点协调、额外开销和性能损耗。
设计系统时应遵循的原则是:能不用事务就不要用事务。
合理的系统设计可以在大多数情况下避免对事务的依赖。
幂等写入(Idempotent Writes)
通过定义明确的主键并启用 upsert 模式,可以实现精准一次写入(exactly-once)语义。
当配置中定义了 primary-key 且启用了 upsert-enable,MongoDB Sink 将使用 Upsert 语义而非普通 INSERT 语句。
SeaTunnel 会将定义的主键作为 MongoDB 的复合主键,在 Upsert 模式下进行写入,以确保幂等性。
若作业在运行过程中失败,SeaTunnel 会从上一个成功的 checkpoint 恢复并重新处理数据,这可能导致重复数据。
强烈建议启用 Upsert 模式,以避免主键冲突或重复插入。
sink {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
upsert-enable = true
primary-key = ["name","status"]
}
}
更新日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 |
| [fix][connector-mango] fix split with avgSize zero error (#9255) | https://github.com/apache/seatunnel/commit/564863b933 | 2.3.11 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
| [Fix][MongoDB] The Long type cannot handle string values in scientific notation (#8783) | https://github.com/apache/seatunnel/commit/00f550e3d0 | 2.3.11 |
| [Improve] sink mongodb schema is not required (#8887) | https://github.com/apache/seatunnel/commit/3cfe8c12b9 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Fix][Connector-Mongodb] close MongodbClient when close MongodbReader (#8592) | https://github.com/apache/seatunnel/commit/06b2fc0e06 | 2.3.10 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Bug][connectors-v2] fix mongodb bson convert exception (#8044) | https://github.com/apache/seatunnel/commit/b222c13f2f | 2.3.9 |
| [Hotfix][Connector-v2] Fix the ClassCastException for connector-mongodb (#7586) | https://github.com/apache/seatunnel/commit/dc43370e8c | 2.3.8 |
| [Improve][Test][Connector-V2][MongoDB] Add few test cases for BsonToRowDataConverters (#7579) | https://github.com/apache/seatunnel/commit/a797041e5d | 2.3.8 |
| [Improve][Connector-V2][MongoDB] A BsonInt32 will be convert to a long type (#7567) | https://github.com/apache/seatunnel/commit/adf26c20c5 | 2.3.8 |
| [Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (#6997) | https://github.com/apache/seatunnel/commit/c5159a2760 | 2.3.6 |
| [bugfix][connector-mongodb] fix mongodb null value write (#6967) | https://github.com/apache/seatunnel/commit/c5ecda50f8 | 2.3.6 |
| [Improve][MongoDB] Implement TableSourceFactory to create mongodb source (#5813) | https://github.com/apache/seatunnel/commit/59cccb6097 | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
| [bugfix][mongodb] Fixed unsupported exception caused by bsonNull (#5659) | https://github.com/apache/seatunnel/commit/cab864aa4d | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf73 | 2.3.3 |
| [Improve][Connector-v2][Mongodb]sink support transaction update/writing (#5034) | https://github.com/apache/seatunnel/commit/b1203c905e | 2.3.3 |
| [Hotfix][Connector-V2][Mongodb] Compatible with historical parameters (#4997) | https://github.com/apache/seatunnel/commit/31db35bee7 | 2.3.3 |
| [Improve][Connector-v2][Mongodb]Optimize reading logic (#5001) | https://github.com/apache/seatunnel/commit/830196d8b7 | 2.3.3 |
| [Hotfix][Connector-V2][Mongodb] Fix document error content and remove redundant code (#4982) | https://github.com/apache/seatunnel/commit/526197af67 | 2.3.3 |
| [Feature][connector-v2][mongodb] mongodb support cdc sink (#4833) | https://github.com/apache/seatunnel/commit/cb651cd7f3 | 2.3.3 |
| [Feature][Connector-v2][Mongodb]Refactor mongodb connector (#4620) | https://github.com/apache/seatunnel/commit/5b1a843e40 | 2.3.2 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
| [Improve] mongodb connector v2 add source query capability (#3697) | https://github.com/apache/seatunnel/commit/8a7fe6fcb6 | 2.3.1 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Improve][Connector-V2][MongoDB] Unified exception for MongoDB source & sink connector (#3522) | https://github.com/apache/seatunnel/commit/5af632e32b | 2.3.0 |
| [Feature][Connector V2] expose configurable options in MongoDB (#3347) | https://github.com/apache/seatunnel/commit/ffd5778efc | 2.3.0 |
| [Improve][all] change Log to @Slf4j (#3001) | https://github.com/apache/seatunnel/commit/6016100f12 | 2.3.0-beta |
| [Improve][Connector-V2] Improve mongodb connector (#2778) | https://github.com/apache/seatunnel/commit/efbf793fa5 | 2.2.0-beta |
| [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755c | 2.2.0-beta |
| [Feature][Connector-V2] Add mongodb connecter sink (#2694) | https://github.com/apache/seatunnel/commit/51c28a3387 | 2.2.0-beta |
| [Feature][Connector-V2] Add mongodb connecter source (#2596) | https://github.com/apache/seatunnel/commit/3ee8a8a619 | 2.2.0-beta |