RocketMQ
RocketMQ Sink 连接器
支持的 Apache RocketMQ 版本
- 4.9.0 或更新版本
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
描述
将 SeaTunnel 数据行写入 Apache RocketMQ topic。该 Sink 支持 JSON 和文本消息体、消息 tag、同步发送、按字段选择分区,以及在 exactly.once = true 时使用事务消息保证精确一次写入。
Sink 参数
| 参数名 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| topic | String | 是 | - | RocketMQ topic 名称。 |
| name.srv.addr | String | 是 | - | RocketMQ NameServer 地址,例如 localhost:9876。 |
| acl.enabled | Boolean | 否 | false | 是否启用 RocketMQ ACL 鉴权。 |
| access.key | String | 否 | - | 访问密钥。acl.enabled = true 时必填。 |
| secret.key | String | 否 | - | 秘密密钥。acl.enabled = true 时必填。 |
| producer.group | String | 否 | SeaTunnel-Producer-Group | RocketMQ 生产者组 ID。 |
| tag | String | 否 | - | 写入每条消息时使用的 RocketMQ tag。 |
| partition.key.fields | List | 否 | - | 用来选择 RocketMQ 队列的字段名。配置的字段必须存在于上游 schema 中。 |
| format | String | 否 | json | 消息格式。支持 json 和 text。 |
| field.delimiter | String | 否 | , | format = text 时使用的字段分隔符。 |
| producer.send.sync | Boolean | 否 | false | 是否同步发送消息。为 false 时异步发送。 |
| exactly.once | Boolean | 否 | false | 是否使用事务消息实现精确一次写入。 |
| max.message.size | int | 否 | 4194304 | 最大消息体大小,单位字节。 |
| send.message.timeout | int | 否 | 3000 | 发送消息超时时间,单位毫秒。 |
| common-options | config | 否 | - | Sink 连接器通用参数,详情请参考 Sink 通用参数。 |
参数说明
partition.key.fields
partition.key.fields 控制消息写入哪个 RocketMQ 队列。SeaTunnel 会对这些字段的值做哈希,并用哈希结果选择队列。如果不配置该参数,则由 RocketMQ 自行选择队列。
例如,上游字段中有 c_int 时,可以这样配置:
partition.key.fields = ["c_int"]
exactly.once
Sink 支持通过 RocketMQ 事务消息实现精确一次写入。该能力默认关闭。确认 RocketMQ 集群和作业 checkpoint 配置满足事务写入要求后,可设置 exactly.once = true。
任务示例
写入 JSON 消息
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 10
schema = {
fields {
c_string = string
c_int = int
c_timestamp = timestamp
}
}
}
}
sink {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_topic"
partition.key.fields = ["c_int"]
producer.send.sync = true
}
}
写入文本消息
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 10
schema = {
fields {
id = bigint
content = string
}
}
}
}
sink {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_text_topic"
format = text
field.delimiter = ","
producer.send.sync = true
}
}
写入带 tag 的消息
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 10
schema = {
fields {
c_string = string
c_int = int
}
}
}
}
sink {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_topic_message_tag"
tag = "test_tag"
partition.key.fields = ["c_string"]
producer.send.sync = true
}
}
RocketMQ 读写 RocketMQ
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topics = "test_topic_source"
plugin_output = "rocketmq_table"
format = json
start.mode = "CONSUME_FROM_FIRST_OFFSET"
consumer.group = "rocketmq_to_rocketmq_group"
schema = {
fields {
id = bigint
c_string = string
}
}
}
}
sink {
Rocketmq {
plugin_input = "rocketmq_table"
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_topic_sink"
partition.key.fields = ["id"]
exactly.once = true
}
}