跳到主要内容
版本:2.3.10

RocketMQ

RocketMQ sink 连接器

支持Apache RocketMQ版本

  • 4.9.0 (或更新版本,供参考)

支持这些引擎

Spark
Flink
SeaTunnel Zeta

主要特性

默认情况下,我们将使用2pc来保证消息精确一次到RocketMQ。

描述

将数据行写入Apache RocketMQ主题

Sink 参数

名称类型是否必填默认值描述
topicstring-RocketMQ topic 名称.
name.srv.addrstring-RocketMQ名称服务器集群地址。
acl.enabledBooleanfalsefalse
access.keyString当ACL_ENABLED为true时,access key不能为空。
secret.keyString当ACL_ENABLED为true时, secret key 不能为空。
producer.groupStringSeaTunnel-producer-GroupSeaTunnel-producer-Group
tagString-RocketMQ消息标签。
partition.key.fieldsarray--
formatStringjson数据格式。默认格式为json。可选text格式。默认字段分隔符为“,”。如果自定义分隔符,请添加“field_delimiter”选项。
field.delimiterString,自定义数据格式的字段分隔符。
producer.send.syncBooleanfalse如果为 true, 则消息将同步发送。
common-optionsconfig-Sink插件常用参数,请参考[sink common options](../Sink-common-Options.md)了解详细信息。

partition.key.fields [array]

配置哪些字段用作RocketMQ消息的键。 例如,如果要使用上游数据中的字段值作为键,可以为此属性指定字段名。 上游数据如下:

nameagedata
Jack16data-example1
Mary23data-example2

如果name被设置为主键,那么name列的哈希值将决定消息被发送到哪个分区。

任务示例

Fake 到 RocketMQ 简单示例

数据是随机生成的,并异步发送到测试主题

env {
parallelism = 1
}

source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
#如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表,
#请前往https://seatunnel.apache.org/docs/category/transform
}

sink {
Rocketmq {
name.srv.addr = "localhost:9876"
topic = "test_topic"
}
}

Rocketmq 到 Rocketmq 简单示例

使用RocketMQ时,会向c_int字段写入哈希数,该哈希数表示写入不同分区的分区数量。这是默认的异步写入方式

env {
parallelism = 1
}

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
plugin_output = "rocketmq_table"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
Rocketmq {
name.srv.addr = "localhost:9876"
topic = "test_topic_sink"
partition.key.fields = ["c_int"]
}
}

时间戳消费写入示例

这是流消费中特定的时间戳消费,当添加新分区时,程序将定期刷新感知和消费,并写入另一个主题类型


env {
parallelism = 1
job.mode = "STREAMING"
}

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
plugin_output = "rocketmq_table"
start.mode = "CONSUME_FROM_FIRST_OFFSET"
batch.size = "400"
consumer.group = "test_topic_group"
format = "json"
format = json
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
#如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表,
#请前往https://seatunnel.apache.org/docs/category/transform
}
sink {
Rocketmq {
name.srv.addr = "localhost:9876"
topic = "test_topic"
partition.key.fields = ["c_int"]
producer.send.sync = true
}
}

变更日志

Change Log
ChangeCommitVersion
[Improve][Connector-V2] RocketMQ Source add message tag config (#8825)https://github.com/apache/seatunnel/commit/5913e8c352.3.10
[Improve][Connector-V2] Add optional flag for rocketmq connector to skip parse errors instead of failing (#8737)https://github.com/apache/seatunnel/commit/701f17b5d2.3.10
[Improve][Connector-V2] RocketMQ Sink add message tag config (#7996)https://github.com/apache/seatunnel/commit/97a1b00e42.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Fix][Connector-V2] Fix some throwable error not be caught (#7657)https://github.com/apache/seatunnel/commit/e19d732822.3.8
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d2.3.6
[Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log (#6668)https://github.com/apache/seatunnel/commit/b7480e1a82.3.6
[fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval is set too small(#6624) (#6625)https://github.com/apache/seatunnel/commit/6e0c81d492.3.5
[Test][E2E] Add thread leak check for connector (#5773)https://github.com/apache/seatunnel/commit/1f2f3fc5f2.3.4
[Fix][Connector] Rocketmq source startOffset greater than endOffset error (#6287)https://github.com/apache/seatunnel/commit/cd44b58942.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de7408102.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[Improve][pom] Formatting pom (#4761)https://github.com/apache/seatunnel/commit/1d6d3815e2.3.2
[Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)https://github.com/apache/seatunnel/commit/e711f6ef42.3.2
[Feature][Connector-V2] Add rocketmq source and sink (#4007)https://github.com/apache/seatunnel/commit/e333897552.3.2