跳到主要内容
版本:Next

Rabbitmq

Rabbitmq 源连接器

描述

用于从 Rabbitmq 读取数据。

关键特性

提示

为了实现精确一次,源必须是非并行的(并行度设置为 1)。这个限制主要是由于 RabbitMQ 从单个队列向多个消费者分派消息的方式。

选项

参数名类型必须默认值描述
hoststring-连接的默认主机
portint-连接的默认端口
virtual_hoststring-虚拟主机 – 连接到代理时使用的虚拟主机
usernamestring-连接到代理时使用的 AMQP 用户名
passwordstring-连接到代理时使用的密码
queue_namestring-要发布消息的队列
schemaconfig-上游数据的模式
urlstring-便捷方法,用于设置 AMQP URI 中的字段:主机、端口、用户名、密码和虚拟主机
routing_keystring-要发布消息的路由密钥
exchangestring-要发布消息的交换机
network_recovery_intervalint-自动恢复在尝试重新连接之前等待多长时间(毫秒)
topology_recovery_enabledboolean-如果为 true,启用拓扑恢复
automatic_recovery_enabledboolean-如果为 true,启用连接恢复
connection_timeoutint-连接 tcp 建立超时(毫秒);零表示无限
requested_channel_maxint-最初请求的最大通道数;零表示无限制。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。
requested_frame_maxint-请求的最大帧大小
requested_heartbeatint-设置请求的心跳超时。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。
prefetch_countint-预取计数,无需确认即可接收的最大消息数
delivery_timeoutlong-交付超时,等待下一条消息交付的最大时间(毫秒)
durablebooleantrue队列是否在服务器重启时保留
exclusivebooleanfalse队列是否仅由当前连接使用
auto_deletebooleanfalse队列是否在最后一个消费者取消订阅时自动删除
common-options-源插件通用参数

host [string]

连接的默认主机

port [int]

连接的默认端口

virtual_host [string]

虚拟主机 – 连接到代理时使用的虚拟主机

username [string]

连接到代理时使用的 AMQP 用户名

password [string]

连接到代理时使用的密码

url [string]

便捷方法,用于设置 AMQP URI 中的字段:主机、端口、用户名、密码和虚拟主机

queue_name [string]

要发布消息的队列

routing_key [string]

要发布消息的路由密钥

exchange [string]

要发布消息的交换机

schema [Config]

fields [Config]

上游数据的模式字段。

network_recovery_interval [int]

自动恢复在尝试重新连接之前等待多长时间(毫秒)

topology_recovery_enabled [string]

如果为 true,启用拓扑恢复

automatic_recovery_enabled [string]

如果为 true,启用连接恢复

connection_timeout [int]

连接 tcp 建立超时(毫秒);零表示无限

requested_channel_max [int]

最初请求的最大通道数;零表示无限制。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。

requested_frame_max [int]

请求的最大帧大小

requested_heartbeat [int]

设置请求的心跳超时。**注意:值必须在 0 到 65535 之间(AMQP 0-9-1 中的无符号短整数)。

prefetch_count [int]

预取计数,无需确认即可接收的最大消息数

delivery_timeout [long]

交付超时,等待下一条消息交付的最大时间(毫秒)

通用选项

源插件通用参数,请参考 源通用选项 详见。

durable

  • true:队列将在服务器重启时保留。
  • false:队列将在服务器重启时删除。

exclusive

  • true:队列仅由当前连接使用,连接关闭时将删除。
  • false:队列可以由多个连接使用。

auto-delete

  • true:队列将在最后一个消费者取消订阅时自动删除。
  • false:队列不会自动删除。

示例

简单:

source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}

变更日志

Change Log
ChangeCommitVersion
[Fix][connector-rabbitmq] Set default value for durable, exclusive and auto-delete (#9631)https://github.com/apache/seatunnel/commit/5f9492e62a2.3.12
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Improve] rabbit mq options (#8740)https://github.com/apache/seatunnel/commit/4eec9be0122.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365)https://github.com/apache/seatunnel/commit/aabfc8eb782.3.8
[Hotfix][connector-v2-rabbit] fix rabbit checkpoint exception in Flink mode (#7108)https://github.com/apache/seatunnel/commit/423a7b142b2.3.6
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d12.3.6
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)https://github.com/apache/seatunnel/commit/985fb6642a2.3.2
[Hotfix][E2E] Fix RabbitmqIT (#4593)https://github.com/apache/seatunnel/commit/9bd5403d712.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719)https://github.com/apache/seatunnel/commit/ef1b8b1bb52.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector (#3312)https://github.com/apache/seatunnel/commit/4b12691a8d2.3.0