跳到主要内容
版本:Next

MongoDB

MongoDB 源连接器

支持这些引擎

Spark
Flink
SeaTunnel Zeta

关键特性

描述

MongoDB连接器提供了从MongoDB读取数据和向MongoDB写入数据的能力。 本文档描述了如何设置MongoDB连接器以对MongoDB运行数据读取。

支持的数据源信息

为了使用Mongodb连接器,需要以下依赖关系。 它们可以通过install-plugin.sh或Maven中央存储库下载。

数据源支持的版本依赖
MongoDBuniversalDownload

数据类型映射

下表列出了从MongoDB BSON类型到SeaTunnel数据类型的字段数据类型映射。

MongoDB BSON typeSeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDate
TimestampTimestamp
ObjectROW
ArrayARRAY

对于MongoDB中的特定类型,我们使用扩展JSON格式将其映射到SeaTunnel STRING类型。

MongoDB BSON typeSeaTunnel STRING
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}

提示

1.在SeaTunnel中使用DECIMAL类型时,请注意最大范围不能超过34位数字,这意味着您应该使用DECIMAL(34,18)。

源配置项

参数名类型必须默认值描述
uriString-MongoDB标准连接uri。例如 mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
databaseString-要读取或写入的MongoDB数据库的名称。
collectionString-要读取或写入的MongoDB集合的名称。
schemaString-MongoDB的BSON和seatunnel数据结构映射。
match.queryString-在MongoDB中,过滤器用于过滤查询操作的文档。
match.projectionString-在MongoDB中,投影用于控制查询结果中包含的字段。
partition.split-keyString_id分片字段。
partition.split-sizeLong64 1024 1024分片大小。
cursor.no-timeoutBooleantrueMongoDB服务器通常在非活动期(10分钟)后超时空闲游标,以防止过度使用内存。将此选项设置为true以防止这种情况发生。但是,如果应用程序处理当前一批文档的时间超过30分钟,则会话将标记为已过期并关闭。
fetch.sizeInt2048设置每批从服务器获取的文档数量。设置适当的批大小可以提高查询性能,避免一次获取大量数据造成的内存压力。
max.time-minLong600此参数是一个MongoDB查询选项,用于限制查询操作的最大执行时间。maxTimeMin的值以分钟为单位。如果查询的执行时间超过指定的时间限制,MongoDB将终止操作并返回错误。
flat.sync-stringBooleantrue通过使用flatSyncString,只能设置一个字段属性值,字段类型必须是String。此操作将对单个MongoDB数据条目执行字符串映射。
common-options-源插件常用参数,请参考 源通用选项

提示

1.参数match.query 与历史旧版本参数matchQuery 兼容,它们是等效的替换。

如何创建MongoDB数据同步作业

以下示例演示了如何创建数据同步作业,该作业从MongoDB读取数据并将其打印到本地客户端:

# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}

# 创建MongoDB源
source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "source_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
}

# 控制台打印读取的Mongodb数据
sink {
Console {
parallelism = 1
}
}

参数说明

MongoDB数据库连接URI示例

未经身份验证的单节点连接:

mongodb://192.168.0.100:27017/mydb

副本集连接:

mongodb://192.168.0.100:27017/mydb?replicaSet=xxx

经过身份验证的副本集连接:

mongodb://admin:password@192.168.0.100:27017/mydb?replicaSet=xxx&authSource=admin

多节点副本集连接:

mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb?replicaSet=xxx

分片集群连接:

mongodb://192.168.0.100:27017/mydb

多个mongos连接:

mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb

注意:URI中的用户名和密码在连接到连接字符串之前必须进行URL编码。

匹配查询扫描

在数据同步场景中,需要尽早使用matchQuery方法来减少后续操作员需要处理的文档数量,从而提高性能。 下面是一个使用match.query的seatunnel的简单示例

source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "orders"
match.query = "{status: \"A\"}"
schema = {
fields {
id = bigint
status = string
}
}
}
}

以下是各种数据类型的MatchQuery查询语句的示例:

# Query Boolean type
"{c_boolean:true}"
# Query string type
"{c_string:\"OCzCj\"}"
# Query the integer
"{c_int:2}"
# Type of query time
"{c_date:ISODate(\"2023-06-26T16:00:00.000Z\")}"
# Query floating point type
{c_double:{$gte:1.71763202185342e+308}}

请参阅如何编写match.query的语法https://www.mongodb.com/docs/manual/tutorial/query-documents

投影扫描

在MongoDB中,Projection用于控制查询结果中包含哪些字段。这可以通过指定哪些字段需要返回,哪些字段不需要返回来实现。 在find()方法中,投影对象可以作为第二个参数传递。投影对象的键表示要包含或排除的字段,值1表示包含,0表示排除。 这里有一个简单的例子,假设我们有一个名为users的集合:

# Returns only the name and email fields
db.users.find({}, { name: 1, email: 0 });

在数据同步场景中,需要尽早使用投影来减少后续操作员需要处理的文档数量,从而提高性能。 以下是一个使用投影的seatunnel的简单示例:

source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
match.projection = "{ name: 1, email: 0 }"
schema = {
fields {
name = string
}
}
}
}

分区扫描

为了加快并行源任务实例中的数据读取速度,seatunnel为MongoDB集合提供了分区扫描功能。提供了以下分区策略。 用户可以通过设置用于分片字段的partition.split-key和用于分片大小的partition.split-size来控制数据分片。

source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
partition.split-key = "id"
partition.split-size = 1024
schema = {
fields {
id = bigint
status = string
}
}
}
}

Flat Sync String

通过使用“flat.sync string”,只能设置一个字段属性值,并且字段类型必须是string。 此操作将对单个MongoDB数据条目执行字符串映射。

env {
parallelism = 10
job.mode = "BATCH"
}
source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
flat.sync-string = true
schema = {
fields {
data = string
}
}
}
}
sink {
Console {}
}

使用与修改后的参数同步的数据样本,例如:

{
"_id":{
"$oid":"643d41f5fdc6a52e90e59cbf"
},
"c_map":{
"OQBqH":"jllt",
"rkvlO":"pbfdf",
"pCMEX":"hczrdtve",
"DAgdj":"t",
"dsJag":"voo"
},
"c_array":[
{
"$numberInt":"-865590937"
},
{
"$numberInt":"833905600"
},
{
"$numberInt":"-1104586446"
},
{
"$numberInt":"2076336780"
},
{
"$numberInt":"-1028688944"
}
],
"c_string":"bddkzxr",
"c_boolean":false,
"c_tinyint":{
"$numberInt":"39"
},
"c_smallint":{
"$numberInt":"23672"
},
"c_int":{
"$numberInt":"-495763561"
},
"c_bigint":{
"$numberLong":"3768307617923954543"
},
"c_float":{
"$numberDouble":"5.284220288280258E37"
},
"c_double":{
"$numberDouble":"1.1706091642478246E308"
},
"c_bytes":{
"$binary":{
"base64":"ZWJ4",
"subType":"00"
}
},
"c_date":{
"$date":{
"$numberLong":"1686614400000"
}
},
"c_decimal":{
"$numberDecimal":"683265300"
},
"c_timestamp":{
"$date":{
"$numberLong":"1684283772000"
}
},
"c_row":{
"c_map":{
"OQBqH":"cbrzhsktmm",
"rkvlO":"qtaov",
"pCMEX":"tuq",
"DAgdj":"jzop",
"dsJag":"vwqyxtt"
},
"c_array":[
{
"$numberInt":"1733526799"
},
{
"$numberInt":"-971483501"
},
{
"$numberInt":"-1716160960"
},
{
"$numberInt":"-919976360"
},
{
"$numberInt":"727499700"
}
],
"c_string":"oboislr",
"c_boolean":true,
"c_tinyint":{
"$numberInt":"-66"
},
"c_smallint":{
"$numberInt":"1308"
},
"c_int":{
"$numberInt":"-1573886733"
},
"c_bigint":{
"$numberLong":"4877994302999518682"
},
"c_float":{
"$numberDouble":"1.5353209063652051E38"
},
"c_double":{
"$numberDouble":"1.1952441956458565E308"
},
"c_bytes":{
"$binary":{
"base64":"cWx5Ymp0Yw==",
"subType":"00"
}
},
"c_date":{
"$date":{
"$numberLong":"1686614400000"
}
},
"c_decimal":{
"$numberDecimal":"656406177"
},
"c_timestamp":{
"$date":{
"$numberLong":"1684283772000"
}
}
},
"id":{
"$numberInt":"2"
}
}

修改日志

Change Log
ChangeCommitVersion
[Improve] sink mongodb schema is not required (#8887)https://github.com/apache/seatunnel/commit/3cfe8c12b92.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Fix][Connector-Mongodb] close MongodbClient when close MongodbReader (#8592)https://github.com/apache/seatunnel/commit/06b2fc0e062.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Bug][connectors-v2] fix mongodb bson convert exception (#8044)https://github.com/apache/seatunnel/commit/b222c13f2f2.3.9
[Hotfix][Connector-v2] Fix the ClassCastException for connector-mongodb (#7586)https://github.com/apache/seatunnel/commit/dc43370e8c2.3.8
[Improve][Test][Connector-V2][MongoDB] Add few test cases for BsonToRowDataConverters (#7579)https://github.com/apache/seatunnel/commit/a797041e5d2.3.8
[Improve][Connector-V2][MongoDB] A BsonInt32 will be convert to a long type (#7567)https://github.com/apache/seatunnel/commit/adf26c20c52.3.8
[Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (#6997)https://github.com/apache/seatunnel/commit/c5159a27602.3.6
[bugfix][connector-mongodb] fix mongodb null value write (#6967)https://github.com/apache/seatunnel/commit/c5ecda50f82.3.6
[Improve][MongoDB] Implement TableSourceFactory to create mongodb source (#5813)https://github.com/apache/seatunnel/commit/59cccb60972.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
[bugfix][mongodb] Fixed unsupported exception caused by bsonNull (#5659)https://github.com/apache/seatunnel/commit/cab864aa4d2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
[Improve][Connector-v2][Mongodb]sink support transaction update/writing (#5034)https://github.com/apache/seatunnel/commit/b1203c905e2.3.3
[Hotfix][Connector-V2][Mongodb] Compatible with historical parameters (#4997)https://github.com/apache/seatunnel/commit/31db35bee72.3.3
[Improve][Connector-v2][Mongodb]Optimize reading logic (#5001)https://github.com/apache/seatunnel/commit/830196d8b72.3.3
[Hotfix][Connector-V2][Mongodb] Fix document error content and remove redundant code (#4982)https://github.com/apache/seatunnel/commit/526197af672.3.3
[Feature][connector-v2][mongodb] mongodb support cdc sink (#4833)https://github.com/apache/seatunnel/commit/cb651cd7f32.3.3
[Feature][Connector-v2][Mongodb]Refactor mongodb connector (#4620)https://github.com/apache/seatunnel/commit/5b1a843e402.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Improve] mongodb connector v2 add source query capability (#3697)https://github.com/apache/seatunnel/commit/8a7fe6fcb62.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][MongoDB] Unified exception for MongoDB source & sink connector (#3522)https://github.com/apache/seatunnel/commit/5af632e32b2.3.0
[Feature][Connector V2] expose configurable options in MongoDB (#3347)https://github.com/apache/seatunnel/commit/ffd5778efc2.3.0
[Improve][all] change Log to @Slf4j (#3001)https://github.com/apache/seatunnel/commit/6016100f122.3.0-beta
[Improve][Connector-V2] Improve mongodb connector (#2778)https://github.com/apache/seatunnel/commit/efbf793fa52.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f755c2.2.0-beta
[Feature][Connector-V2] Add mongodb connecter sink (#2694)https://github.com/apache/seatunnel/commit/51c28a33872.2.0-beta
[Feature][Connector-V2] Add mongodb connecter source (#2596)https://github.com/apache/seatunnel/commit/3ee8a8a6192.2.0-beta