MongoDB
MongoDB 源连接器
支持这些引擎
Spark
Flink
SeaTunnel Zeta
关键特性
描述
MongoDB连接器提供了从MongoDB读取数据和向MongoDB写入数据的能力。 本文档描述了如何设置MongoDB连接器以对MongoDB运行数据读取。
支持的数据源信息
为了使用Mongodb连接器,需要以下依赖关系。 它们可以通过install-plugin.sh或Maven中央存储库下载。
数据源 | 支持的版本 | 依赖 |
---|---|---|
MongoDB | universal | Download |
数据类型映射
下表列出了从MongoDB BSON类型到SeaTunnel数据类型的字段数据类型映射。
MongoDB BSON type | SeaTunnel 数据类型 |
---|---|
ObjectId | STRING |
String | STRING |
Boolean | BOOLEAN |
Binary | BINARY |
Int32 | INTEGER |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Date | Date |
Timestamp | Timestamp |
Object | ROW |
Array | ARRAY |
对于MongoDB中的特定类型,我们使用扩展JSON格式将其映射到SeaTunnel STRING类型。
MongoDB BSON type | SeaTunnel STRING |
---|---|
Symbol | {"_value": {"$symbol": "12"}} |
RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
提示
1.在SeaTunnel中使用DECIMAL类型时,请注意最大范围不能超过34位数字,这意味着您应该使用DECIMAL(34,18)。
源配置项
参数名 | 类型 | 必须 | 默认值 | 描述 |
---|---|---|---|---|
uri | String | 是 | - | MongoDB标准连接uri。例如 mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true. |
database | String | 是 | - | 要读取或写入的MongoDB数据库的名称。 |
collection | String | 是 | - | 要读取或写入的MongoDB集合的名称。 |
schema | String | 是 | - | MongoDB的BSON和seatunnel数据结构映射。 |
match.query | String | 否 | - | 在MongoDB中,过滤器用于过滤查询操作的文档。 |
match.projection | String | 否 | - | 在MongoDB中,投影用于控制查询结果中包含的字段。 |
partition.split-key | String | 否 | _id | 分片字段。 |
partition.split-size | Long | 否 | 64 1024 1024 | 分片大小。 |
cursor.no-timeout | Boolean | 否 | true | MongoDB服务器通常在非活动期(10分钟)后超时空闲游标,以防止过度使用内存。将此选项设置为true以防止这种情况发生。但是,如果应用程序处理当前一批文档的时间超过30分钟,则会话将标记为已过期并关闭。 |
fetch.size | Int | 否 | 2048 | 设置每批从服务器获取的文档数量。设置适当的批大小可以提高查询性能,避免一次获取大量数据造成的内存压力。 |
max.time-min | Long | 否 | 600 | 此参数是一个MongoDB查询选项,用于限制查询操作的最大执行时间。maxTimeMin的值以分钟为单位。如果查询的执行时间超过指定的时间限制,MongoDB将终止操作并返回错误。 |
flat.sync-string | Boolean | 否 | true | 通过使用flatSyncString,只能设置一个字段属性值,字段类型必须是String。此操作将对单个MongoDB数据条目执行字符串映射。 |
common-options | 否 | - | 源插件常用参数,请参考 源通用选项 |
提示
1.参数
match.query
与历史旧版本参数matchQuery
兼容,它们是等效的替换。
如何创建MongoDB数据同步作业
以下示例演示了如何创建数据同步作业,该作业从MongoDB读取数据并将其打印到本地客户端:
# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}
# 创建MongoDB源
source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "source_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_int = int
c_bigint = bigint
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
}
# 控制台打印读取的Mongodb数据
sink {
Console {
parallelism = 1
}
}
参数说明
MongoDB数据库连接URI示例
未经身份验证的单节点连接:
mongodb://192.168.0.100:27017/mydb
副本集连接:
mongodb://192.168.0.100:27017/mydb?replicaSet=xxx
经过身份验证的副本集连接:
mongodb://admin:password@192.168.0.100:27017/mydb?replicaSet=xxx&authSource=admin
多节点副本集连接:
mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb?replicaSet=xxx
分片集群连接:
mongodb://192.168.0.100:27017/mydb
多个mongos连接:
mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
注意:URI中的用户名和密码在连接到连接字符串之前必须进行URL编码。
匹配查询扫描
在数据同步场景中,需要尽早使用matchQuery方法来减少后续操作员需要处理的文档数量,从而提高性能。
下面是一个使用match.query的seatunnel的简单示例
source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "orders"
match.query = "{status: \"A\"}"
schema = {
fields {
id = bigint
status = string
}
}
}
}
以下是各种数据类型的MatchQuery查询语句的示例:
# Query Boolean type
"{c_boolean:true}"
# Query string type
"{c_string:\"OCzCj\"}"
# Query the integer
"{c_int:2}"
# Type of query time
"{c_date:ISODate(\"2023-06-26T16:00:00.000Z\")}"
# Query floating point type
{c_double:{$gte:1.71763202185342e+308}}
请参阅如何编写match.query的语法
:https://www.mongodb.com/docs/manual/tutorial/query-documents
投影扫描
在MongoDB中,Projection用于控制查询结果中包含哪些字段。这可以通过指定哪些字段需要返回,哪些字段不需要返回来实现。 在find()方法中,投影对象可以作为第二个参数传递。投影对象的键表示要包含或排除的字段,值1表示包含,0表示排除。 这里有一个简单的例子,假设我们有一个名为users的集合:
# Returns only the name and email fields
db.users.find({}, { name: 1, email: 0 });
在数据同步场景中,需要尽早使用投影来减少后续操作员需要处理的文档数量,从而提高性能。 以下是一个使用投影的seatunnel的简单示例:
source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
match.projection = "{ name: 1, email: 0 }"
schema = {
fields {
name = string
}
}
}
}
分区扫描
为了加快并行源任务实例中的数据读取速度,seatunnel为MongoDB集合提供了分区扫描功能。提供了以下分区策略。 用户可以通过设置用于分片字段的partition.split-key和用于分片大小的partition.split-size来控制数据分片。
source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
partition.split-key = "id"
partition.split-size = 1024
schema = {
fields {
id = bigint
status = string
}
}
}
}
Flat Sync String
通过使用“flat.sync string”,只能设置一个字段属性值,并且字段类型必须是string。 此操作将对单个MongoDB数据条目执行字符串映射。
env {
parallelism = 10
job.mode = "BATCH"
}
source {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
flat.sync-string = true
schema = {
fields {
data = string
}
}
}
}
sink {
Console {}
}
使用与修改后的参数同步的数据样本,例如:
{
"_id":{
"$oid":"643d41f5fdc6a52e90e59cbf"
},
"c_map":{
"OQBqH":"jllt",
"rkvlO":"pbfdf",
"pCMEX":"hczrdtve",
"DAgdj":"t",
"dsJag":"voo"
},
"c_array":[
{
"$numberInt":"-865590937"
},
{
"$numberInt":"833905600"
},
{
"$numberInt":"-1104586446"
},
{
"$numberInt":"2076336780"
},
{
"$numberInt":"-1028688944"
}
],
"c_string":"bddkzxr",
"c_boolean":false,
"c_tinyint":{
"$numberInt":"39"
},
"c_smallint":{
"$numberInt":"23672"
},
"c_int":{
"$numberInt":"-495763561"
},
"c_bigint":{
"$numberLong":"3768307617923954543"
},
"c_float":{
"$numberDouble":"5.284220288280258E37"
},
"c_double":{
"$numberDouble":"1.1706091642478246E308"
},
"c_bytes":{
"$binary":{
"base64":"ZWJ4",
"subType":"00"
}
},
"c_date":{
"$date":{
"$numberLong":"1686614400000"
}
},
"c_decimal":{
"$numberDecimal":"683265300"
},
"c_timestamp":{
"$date":{
"$numberLong":"1684283772000"
}
},
"c_row":{
"c_map":{
"OQBqH":"cbrzhsktmm",
"rkvlO":"qtaov",
"pCMEX":"tuq",
"DAgdj":"jzop",
"dsJag":"vwqyxtt"
},
"c_array":[
{
"$numberInt":"1733526799"
},
{
"$numberInt":"-971483501"
},
{
"$numberInt":"-1716160960"
},
{
"$numberInt":"-919976360"
},
{
"$numberInt":"727499700"
}
],
"c_string":"oboislr",
"c_boolean":true,
"c_tinyint":{
"$numberInt":"-66"
},
"c_smallint":{
"$numberInt":"1308"
},
"c_int":{
"$numberInt":"-1573886733"
},
"c_bigint":{
"$numberLong":"4877994302999518682"
},
"c_float":{
"$numberDouble":"1.5353209063652051E38"
},
"c_double":{
"$numberDouble":"1.1952441956458565E308"
},
"c_bytes":{
"$binary":{
"base64":"cWx5Ymp0Yw==",
"subType":"00"
}
},
"c_date":{
"$date":{
"$numberLong":"1686614400000"
}
},
"c_decimal":{
"$numberDecimal":"656406177"
},
"c_timestamp":{
"$date":{
"$numberLong":"1684283772000"
}
}
},
"id":{
"$numberInt":"2"
}
}
修改日志
Change Log
Change | Commit | Version |
---|---|---|
[Improve] sink mongodb schema is not required (#8887) | https://github.com/apache/seatunnel/commit/3cfe8c12b9 | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
[Fix][Connector-Mongodb] close MongodbClient when close MongodbReader (#8592) | https://github.com/apache/seatunnel/commit/06b2fc0e06 | 2.3.10 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
[Bug][connectors-v2] fix mongodb bson convert exception (#8044) | https://github.com/apache/seatunnel/commit/b222c13f2f | 2.3.9 |
[Hotfix][Connector-v2] Fix the ClassCastException for connector-mongodb (#7586) | https://github.com/apache/seatunnel/commit/dc43370e8c | 2.3.8 |
[Improve][Test][Connector-V2][MongoDB] Add few test cases for BsonToRowDataConverters (#7579) | https://github.com/apache/seatunnel/commit/a797041e5d | 2.3.8 |
[Improve][Connector-V2][MongoDB] A BsonInt32 will be convert to a long type (#7567) | https://github.com/apache/seatunnel/commit/adf26c20c5 | 2.3.8 |
[Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (#6997) | https://github.com/apache/seatunnel/commit/c5159a2760 | 2.3.6 |
[bugfix][connector-mongodb] fix mongodb null value write (#6967) | https://github.com/apache/seatunnel/commit/c5ecda50f8 | 2.3.6 |
[Improve][MongoDB] Implement TableSourceFactory to create mongodb source (#5813) | https://github.com/apache/seatunnel/commit/59cccb6097 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
[bugfix][mongodb] Fixed unsupported exception caused by bsonNull (#5659) | https://github.com/apache/seatunnel/commit/cab864aa4d | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf73 | 2.3.3 |
[Improve][Connector-v2][Mongodb]sink support transaction update/writing (#5034) | https://github.com/apache/seatunnel/commit/b1203c905e | 2.3.3 |
[Hotfix][Connector-V2][Mongodb] Compatible with historical parameters (#4997) | https://github.com/apache/seatunnel/commit/31db35bee7 | 2.3.3 |
[Improve][Connector-v2][Mongodb]Optimize reading logic (#5001) | https://github.com/apache/seatunnel/commit/830196d8b7 | 2.3.3 |
[Hotfix][Connector-V2][Mongodb] Fix document error content and remove redundant code (#4982) | https://github.com/apache/seatunnel/commit/526197af67 | 2.3.3 |
[Feature][connector-v2][mongodb] mongodb support cdc sink (#4833) | https://github.com/apache/seatunnel/commit/cb651cd7f3 | 2.3.3 |
[Feature][Connector-v2][Mongodb]Refactor mongodb connector (#4620) | https://github.com/apache/seatunnel/commit/5b1a843e40 | 2.3.2 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
[Improve] mongodb connector v2 add source query capability (#3697) | https://github.com/apache/seatunnel/commit/8a7fe6fcb6 | 2.3.1 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
[Improve][Connector-V2][MongoDB] Unified exception for MongoDB source & sink connector (#3522) | https://github.com/apache/seatunnel/commit/5af632e32b | 2.3.0 |
[Feature][Connector V2] expose configurable options in MongoDB (#3347) | https://github.com/apache/seatunnel/commit/ffd5778efc | 2.3.0 |
[Improve][all] change Log to @Slf4j (#3001) | https://github.com/apache/seatunnel/commit/6016100f12 | 2.3.0-beta |
[Improve][Connector-V2] Improve mongodb connector (#2778) | https://github.com/apache/seatunnel/commit/efbf793fa5 | 2.2.0-beta |
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755c | 2.2.0-beta |
[Feature][Connector-V2] Add mongodb connecter sink (#2694) | https://github.com/apache/seatunnel/commit/51c28a3387 | 2.2.0-beta |
[Feature][Connector-V2] Add mongodb connecter source (#2596) | https://github.com/apache/seatunnel/commit/3ee8a8a619 | 2.2.0-beta |