Skip to main content
Version: 2.3.4

Rabbitmq

Rabbitmq source connector

Descriptionโ€‹

Used to read data from Rabbitmq.

Key featuresโ€‹

tip

The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQโ€™s approach to dispatching messages from a single queue to multiple consumers.

Optionsโ€‹

nametyperequireddefault value
hoststringyes-
portintyes-
virtual_hoststringyes-
usernamestringyes-
passwordstringyes-
queue_namestringyes-
schemaconfigyes-
urlstringno-
routing_keystringno-
exchangestringno-
network_recovery_intervalintno-
topology_recovery_enabledbooleanno-
automatic_recovery_enabledbooleanno-
connection_timeoutintno-
requested_channel_maxintno-
requested_frame_maxintno-
requested_heartbeatintno-
prefetch_countintno-
delivery_timeoutlongno-
common-optionsno-

host [string]โ€‹

the default host to use for connections

port [int]โ€‹

the default port to use for connections

virtual_host [string]โ€‹

virtual host โ€“ the virtual host to use when connecting to the broker

username [string]โ€‹

the AMQP user name to use when connecting to the broker

password [string]โ€‹

the password to use when connecting to the broker

url [string]โ€‹

convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host

queue_name [string]โ€‹

the queue to publish the message to

routing_key [string]โ€‹

the routing key to publish the message to

exchange [string]โ€‹

the exchange to publish the message to

schema [Config]โ€‹

fields [Config]โ€‹

the schema fields of upstream data.

network_recovery_interval [int]โ€‹

how long will automatic recovery wait before attempting to reconnect, in ms

topology_recovery [string]โ€‹

if true, enables topology recovery

automatic_recovery [string]โ€‹

if true, enables connection recovery

connection_timeout [int]โ€‹

connection tcp establishment timeout in milliseconds; zero for infinite

requested_channel_max [int]โ€‹

initially requested maximum channel number; zero for unlimited **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).

requested_frame_max [int]โ€‹

the requested maximum frame size

requested_heartbeat [int]โ€‹

Set the requested heartbeat timeout **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).

prefetch_count [int]โ€‹

prefetchCount the max number of messages to receive without acknowledgement

delivery_timeout [long]โ€‹

deliveryTimeout maximum wait time, in milliseconds, for the next message delivery

common optionsโ€‹

Source plugin common parameters, please refer to Source Common Options for details

Exampleโ€‹

simple:

source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}

Changelogโ€‹

next versionโ€‹

  • Add Rabbitmq source Connector