Rabbitmq
Rabbitmq source connector
Description
Used to read data from Rabbitmq.
Key features
The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQ’s approach to dispatching messages from a single queue to multiple consumers.
Options
name | type | required | default value |
---|---|---|---|
host | string | yes | - |
port | int | yes | - |
virtual_host | string | yes | - |
username | string | yes | - |
password | string | yes | - |
queue_name | string | yes | - |
schema | config | yes | - |
url | string | no | - |
routing_key | string | no | - |
exchange | string | no | - |
network_recovery_interval | int | no | - |
topology_recovery_enabled | boolean | no | - |
automatic_recovery_enabled | boolean | no | - |
connection_timeout | int | no | - |
requested_channel_max | int | no | - |
requested_frame_max | int | no | - |
requested_heartbeat | int | no | - |
prefetch_count | int | no | - |
delivery_timeout | long | no | - |
common-options | no | - |
host [string]
the default host to use for connections
port [int]
the default port to use for connections
virtual_host [string]
virtual host – the virtual host to use when connecting to the broker
username [string]
the AMQP user name to use when connecting to the broker
password [string]
the password to use when connecting to the broker
url [string]
convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host
queue_name [string]
the queue to publish the message to
routing_key [string]
the routing key to publish the message to
exchange [string]
the exchange to publish the message to
schema [Config]
fields [Config]
the schema fields of upstream data.
network_recovery_interval [int]
how long will automatic recovery wait before attempting to reconnect, in ms
topology_recovery [string]
if true, enables topology recovery
automatic_recovery [string]
if true, enables connection recovery
connection_timeout [int]
connection tcp establishment timeout in milliseconds; zero for infinite
requested_channel_max [int]
initially requested maximum channel number; zero for unlimited **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
requested_frame_max [int]
the requested maximum frame size
requested_heartbeat [int]
Set the requested heartbeat timeout **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
prefetch_count [int]
prefetchCount the max number of messages to receive without acknowledgement
delivery_timeout [long]
deliveryTimeout maximum wait time, in milliseconds, for the next message delivery
common options
Source plugin common parameters, please refer to Source Common Options for details
Example
simple:
source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}
Changelog
2.3.0 2022-12-30
- Add Rabbitmq source Connector