Rabbitmq
Rabbitmq source connector
Description
Used to read data from Rabbitmq.
Key features
The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQ’s approach to dispatching messages from a single queue to multiple consumers.
Options
| name | type | required | default value |
|---|---|---|---|
| host | string | yes | - |
| port | int | yes | - |
| virtual_host | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| queue_name | string | no | - |
| schema | config | no | - |
| tables_configs | array | no | - |
| url | string | no | - |
| routing_key | string | no | - |
| exchange | string | no | - |
| network_recovery_interval | int | no | - |
| topology_recovery_enabled | boolean | no | - |
| automatic_recovery_enabled | boolean | no | - |
| connection_timeout | int | no | - |
| requested_channel_max | int | no | - |
| requested_frame_max | int | no | - |
| requested_heartbeat | int | no | - |
| prefetch_count | int | no | - |
| delivery_timeout | long | no | - |
| common-options | no | - | |
| durable | boolean | no | true |
| exclusive | boolean | no | false |
| auto_delete | boolean | no | false |
host [string]
the default host to use for connections
port [int]
the default port to use for connections
virtual_host [string]
virtual host – the virtual host to use when connecting to the broker
username [string]
the AMQP user name to use when connecting to the broker
password [string]
the password to use when connecting to the broker
url [string]
convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host
queue_name [string]
the queue to consume messages from. Note: Required if tables_configs is not configured.
routing_key [string]
the routing key to publish the message to
exchange [string]
the exchange to publish the message to
schema [Config]
fields [Config]
the schema fields of upstream data. For more details, please refer to Schema Feature. Note: Required if tables_configs is not configured.
tables_configs [array]
Used to read from multiple queues simultaneously. Each object in the array must contain a queue_name and a schema.
network_recovery_interval [int]
how long will automatic recovery wait before attempting to reconnect, in ms
topology_recovery_enabled [boolean]
if true, enables topology recovery
automatic_recovery_enabled [boolean]
if true, enables connection recovery
connection_timeout [int]
connection tcp establishment timeout in milliseconds; zero for infinite
requested_channel_max [int]
initially requested maximum channel number; zero for unlimited Note: The value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
requested_frame_max [int]
the requested maximum frame size
requested_heartbeat [int]
Set the requested heartbeat timeout Note: The value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
prefetch_count [int]
prefetchCount the max number of messages to receive without acknowledgement
delivery_timeout [long]
deliveryTimeout maximum wait time, in milliseconds, for the next message delivery
common options
Source plugin common parameters, please refer to Source Common Options for details
durable
- true: The queue will survive on server restart.
- false: The queue will be deleted on server restart.
exclusive
- true: The queue is used only by the current connection and will be deleted when the connection closes.
- false: The queue can be used by multiple connections.
auto-delete
- true: The queue will be deleted automatically when the last consumer unsubscribes.
- false: The queue will not be automatically deleted.
Migration Guide & Configuration Rules
If you are upgrading from a previous version that only supported single-table reads, your existing configuration will work without any changes.
Configuration Priority:
- You cannot configure both
tables_configsand the root-levelqueue_name/schemaat the same time. They are mutually exclusive. Doing so will result in a configuration validation error. - Use
tables_configsfor multi-table mode. - Use root-level
queue_nameandschemafor single-table mode.
Example
Single-table Read Example
source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}
Multi-table Read Example
You can use the tables_configs option to consume messages from multiple RabbitMQ queues simultaneously within a single job. The connector will automatically assign the correct table identifier to each row based on the queue it originated from, allowing you to route them to different sinks using plugin_input.
source {
RabbitMQ {
host = "localhost"
port = 5672
username = "guest"
password = "guest"
# Use tables_configs to read from multiple queues
tables_configs = [
{
queue_name = "users_queue"
schema = {
table = "users_table" # Defines the table name for routing
fields {
user_id = bigint
name = string
}
}
},
{
queue_name = "orders_queue"
schema = {
table = "orders_table" # Defines the table name for routing
fields {
order_id = bigint
amount = double
}
}
}
]
}
}
sink {
# The first sink will ONLY receive data from the users_queue
Jdbc {
plugin_input = "users_table"
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/mydb"
query = "insert into users (user_id, name) values (?, ?)"
}
# The second sink will ONLY receive data from the orders_queue
Jdbc {
plugin_input = "orders_table"
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/mydb"
query = "insert into orders (order_id, amount) values (?, ?)"
}
}
Changelog
Change Log
| Change | Commit | Version |
|---|---|---|
| [Fix][connector-rabbitmq] Set default value for durable, exclusive and auto-delete (#9631) | https://github.com/apache/seatunnel/commit/5f9492e62a | 2.3.12 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
| [Improve] rabbit mq options (#8740) | https://github.com/apache/seatunnel/commit/4eec9be012 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365) | https://github.com/apache/seatunnel/commit/aabfc8eb78 | 2.3.8 |
| [Hotfix][connector-v2-rabbit] fix rabbit checkpoint exception in Flink mode (#7108) | https://github.com/apache/seatunnel/commit/423a7b142b | 2.3.6 |
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842) | https://github.com/apache/seatunnel/commit/985fb6642a | 2.3.2 |
| [Hotfix][E2E] Fix RabbitmqIT (#4593) | https://github.com/apache/seatunnel/commit/9bd5403d71 | 2.3.2 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb5 | 2.3.1 |
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector (#3312) | https://github.com/apache/seatunnel/commit/4b12691a8d | 2.3.0 |