Rabbitmq
Rabbitmq 源连接器
描述
用于从 Rabbitmq 读取数据。
关键特性
提示
为了实现精确一次,源必须是非并行的(并行度设置为 1)。这个限制主要是由于 RabbitMQ 从单个队列向多个消费者分派消息的方式。
选项
| 参数名 | 类型 | 必须 | 默认值 | 描述 |
|---|---|---|---|---|
| host | string | 是 | - | 连接的默认主机 |
| port | int | 是 | - | 连接的默认端口 |
| virtual_host | string | 是 | - | 虚拟主机 – 连接到代理时使用的虚拟主机 |
| username | string | 是 | - | 连接到代理时使用的 AMQP 用户名 |
| password | string | 是 | - | 连接到代理时使用的密码 |
| queue_name | string | 是 | - | 要发布消息的队列 |
| schema | config | 是 | - | 上游数据的模式 |
| url | string | 否 | - | 便捷方法,用于设置 AMQP URI 中的字段:主机、端口、用户名、密码和虚拟主机 |
| routing_key | string | 否 | - | 要发布消息的路由密钥 |
| exchange | string | 否 | - | 要发布消息的交换机 |
| network_recovery_interval | int | 否 | - | 自动恢复在尝试重新连接之前等待多长时间(毫秒) |
| topology_recovery_enabled | boolean | 否 | - | 如果为 true,启用拓扑恢复 |
| automatic_recovery_enabled | boolean | 否 | - | 如果为 true,启用连接恢复 |
| connection_timeout | int | 否 | - | 连接 tcp 建立超时(毫秒);零表示无限 |
| requested_channel_max | int | 否 | - | 最初请求的最大通道数;零表示无限制。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。 |
| requested_frame_max | int | 否 | - | 请求的最大帧大小 |
| requested_heartbeat | int | 否 | - | 设置请求的心跳超时。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。 |
| prefetch_count | int | 否 | - | 预取计数,无需确认即可接收的最大消息数 |
| delivery_timeout | long | 否 | - | 交付超时,等待下一条消息交付的最大时间(毫秒) |
| durable | boolean | 否 | true | 队列是否在服务器重启时保留 |
| exclusive | boolean | 否 | false | 队列是否仅由当前连接使用 |
| auto_delete | boolean | 否 | false | 队列是否在最后一个消费者取消订阅时自动删除 |
| common-options | 否 | - | 源插件通用参数 |
host [string]
连接的默认主机
port [int]
连接的默认端口
virtual_host [string]
虚拟主机 – 连接到代理时使用的虚拟主机
username [string]
连接到代理时使用的 AMQP 用户名
password [string]
连接到代理时使用的密码
url [string]
便捷方法,用于设置 AMQP URI 中的字段:主机、端口、用户名、密码和虚拟主机
queue_name [string]
要发布消息的队列
routing_key [string]
要发布消息的路由密钥
exchange [string]
要发布消息的交换机
schema [Config]
fields [Config]
上游数据的模式字段。
network_recovery_interval [int]
自动恢复在尝试重新连接之前等待多长时间(毫秒)
topology_recovery_enabled [string]
如果为 true,启用拓扑恢复
automatic_recovery_enabled [string]
如果为 true,启用连接恢复
connection_timeout [int]
连接 tcp 建立超时(毫秒);零表示无限
requested_channel_max [int]
最初请求的最大通道数;零表示无限制。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。
requested_frame_max [int]
请求的最大帧大小
requested_heartbeat [int]
设置请求的心跳超时。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。
prefetch_count [int]
预取计数,无需确认即可接收的最大消息数
delivery_timeout [long]
交付超时,等待下一条消息交付的最大时间(毫秒)
通用选项
源插件通用参数,请参考 源通用选项 详见。
durable
- true:队列将在服务器重启时保留。
- false:队列将在服务器重启时删除。
exclusive
- true:队列仅由当前连接使用,连接关闭时将删除。
- false:队列可以由多个连接使用。
auto-delete
- true:队列将在最后一个消费者取消订阅时自动删除。
- false:队列不会自动删除。
示例
简单:
source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Fix][connector-rabbitmq] Set default value for durable, exclusive and auto-delete (#9631) | https://github.com/apache/seatunnel/commit/5f9492e62a | 2.3.12 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
| [Improve] rabbit mq options (#8740) | https://github.com/apache/seatunnel/commit/4eec9be012 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365) | https://github.com/apache/seatunnel/commit/aabfc8eb78 | 2.3.8 |
| [Hotfix][connector-v2-rabbit] fix rabbit checkpoint exception in Flink mode (#7108) | https://github.com/apache/seatunnel/commit/423a7b142b | 2.3.6 |
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842) | https://github.com/apache/seatunnel/commit/985fb6642a | 2.3.2 |
| [Hotfix][E2E] Fix RabbitmqIT (#4593) | https://github.com/apache/seatunnel/commit/9bd5403d71 | 2.3.2 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb5 | 2.3.1 |
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector (#3312) | https://github.com/apache/seatunnel/commit/4b12691a8d | 2.3.0 |