Skip to main content
Version: Next

Rabbitmq

Rabbitmq sink connector

Description

Used to write data to Rabbitmq.

Key features

Options

nametyperequireddefault value
hoststringyes-
portintyes-
virtual_hoststringyes-
usernamestringyes-
passwordstringyes-
queue_namestringyes-
urlstringno-
network_recovery_intervalintno-
topology_recovery_enabledbooleanno-
automatic_recovery_enabledbooleanno-
use_correlation_idbooleannofalse
connection_timeoutintno-
rabbitmq.configmapno-
common-optionsno-
durablebooleannotrue
exclusivebooleannofalse
auto_deletebooleannofalse

host [string]

the default host to use for connections

port [int]

the default port to use for connections

virtual_host [string]

virtual host – the virtual host to use when connecting to the broker

username [string]

the AMQP user name to use when connecting to the broker

password [string]

the password to use when connecting to the broker

url [string]

convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host

queue_name [string]

the queue to write the message to

durable [boolean]

true: The queue will survive a server restart. false: The queue will be deleted on server restart.

exclusive [boolean]

true: The queue is used only by the current connection and will be deleted when the connection closes. false: The queue can be used by multiple connections.

auto_delete [boolean]

true: The queue will be deleted automatically when the last consumer unsubscribes. false: The queue will not be automatically deleted.

schema [Config]

fields [Config]

the schema fields of upstream data.

network_recovery_interval [int]

how long will automatic recovery wait before attempting to reconnect, in ms

topology_recovery_enabled [boolean]

if true, enables topology recovery

automatic_recovery_enabled [boolean]

if true, enables connection recovery

use_correlation_id [boolean]

whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments).

connection_timeout [int]

connection TCP establishment timeout in milliseconds; zero for infinite

rabbitmq.config [map]

In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, covering all the parameters specified in the official RabbitMQ document.

common options

Sink plugin common parameters, please refer to Sink Common Options for details

durable

  • true: The queue will survive on server restart.
  • false: The queue will be deleted on server restart.

exclusive

  • true: The queue is used only by the current connection and will be deleted when the connection closes.
  • false: The queue can be used by multiple connections.

auto-delete

  • true: The queue will be deleted automatically when the last consumer unsubscribes.
  • false: The queue will not be automatically deleted.

Example

simple:

sink {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}

Example 2

queue with durable, exclusive, auto_delete:

sink {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
durable = "true"
exclusive = "false"
auto_delete = "false"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}

Changelog

Change Log
ChangeCommitVersion
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Improve] rabbit mq options (#8740)https://github.com/apache/seatunnel/commit/4eec9be0122.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365)https://github.com/apache/seatunnel/commit/aabfc8eb782.3.8
[Hotfix][connector-v2-rabbit] fix rabbit checkpoint exception in Flink mode (#7108)https://github.com/apache/seatunnel/commit/423a7b142b2.3.6
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d12.3.6
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)https://github.com/apache/seatunnel/commit/985fb6642a2.3.2
[Hotfix][E2E] Fix RabbitmqIT (#4593)https://github.com/apache/seatunnel/commit/9bd5403d712.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719)https://github.com/apache/seatunnel/commit/ef1b8b1bb52.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector (#3312)https://github.com/apache/seatunnel/commit/4b12691a8d2.3.0