Skip to main content
Version: 2.3.3

RocketMQ

RocketMQ source connector

Description​

Source connector for Apache RocketMQ.

Key features​

Options​

nametyperequireddefault value
topicsStringyes-
name.srv.addrStringyes-
acl.enabledBooleannofalse
access.keyStringno
secret.keyStringno
batch.sizeintno100
consumer.groupStringnoSeaTunnel-Consumer-Group
commit.on.checkpointBooleannotrue
schemano-
formatStringnojson
field.delimiterStringno,
start.modeStringnoCONSUME_FROM_GROUP_OFFSETS
start.mode.offsetsno
start.mode.timestampLongno
partition.discovery.interval.millislongno-1
common-optionsconfigno-

topics [string]​

RocketMQ topic name. If there are multiple topics, use , to split, for example: "tpc1,tpc2".

name.srv.addr [string]​

RocketMQ name server cluster address.

consumer.group [string]​

RocketMQ consumer group id, used to distinguish different consumer groups.

acl.enabled [boolean]​

If true, access control is enabled, and access key and secret key need to be configured.

access.key [string]​

When ACL_ENABLED is true, access key cannot be empty.

secret.key [string]​

When ACL_ENABLED is true, secret key cannot be empty.

batch.size [int]​

RocketMQ consumer pull batch size

commit.on.checkpoint [boolean]​

If true the consumer's offset will be periodically committed in the background.

partition.discovery.interval.millis [long]​

The interval for dynamically discovering topics and partitions.

schema​

The structure of the data, including field names and field types.

format​

Data format. The default format is json. Optional text format. The default field separator is ", ". If you customize the delimiter, add the "field.delimiter" option.

field.delimiter​

Customize the field delimiter for data format.

start.mode​

The initial consumption pattern of consumers,there are several types: [CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP] ,[CONSUME_FROM_SPECIFIC_OFFSETS]

start.mode.timestamp​

The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".

start.mode.offsets​

The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".

for example:

start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}

common-options [config]​

Source plugin common parameters, please refer to Source Common Options for details.

Example​

Simple​

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test-topic-002"
consumer.group = "consumer-group"
parallelism = 2
batch.size = 20
schema = {
fields {
age = int
name = string
}
}
start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
start.mode.offsets = {
test-topic-002-0 = 20
}

}
}