跳到主要内容
版本:2.3.2

RocketMQ

RocketMQ source connector

Description

Source connector for Apache RocketMQ.

Key features

Options

nametyperequireddefault value
topicsStringyes-
name.srv.addrStringyes-
acl.enabledBooleannofalse
access.keyStringno
secret.keyStringno
batch.sizeintno100
consumer.groupStringnoSeaTunnel-Consumer-Group
commit.on.checkpointBooleannotrue
schemano-
formatStringnojson
field.delimiterStringno,
start.modeStringnoCONSUME_FROM_GROUP_OFFSETS
start.mode.offsetsno
start.mode.timestampLongno
partition.discovery.interval.millislongno-1
common-optionsconfigno-

topics [string]

RocketMQ topic name. If there are multiple topics, use , to split, for example: "tpc1,tpc2".

name.srv.addr [string]

RocketMQ name server cluster address.

consumer.group [string]

RocketMQ consumer group id, used to distinguish different consumer groups.

acl.enabled [boolean]

If true, access control is enabled, and access key and secret key need to be configured.

access.key [string]

When ACL_ENABLED is true, access key cannot be empty.

secret.key [string]

When ACL_ENABLED is true, secret key cannot be empty.

batch.size [int]

RocketMQ consumer pull batch size

commit.on.checkpoint [boolean]

If true the consumer's offset will be periodically committed in the background.

partition.discovery.interval.millis [long]

The interval for dynamically discovering topics and partitions.

schema

The structure of the data, including field names and field types.

format

Data format. The default format is json. Optional text format. The default field separator is ", ". If you customize the delimiter, add the "field.delimiter" option.

field.delimiter

Customize the field delimiter for data format.

start.mode

The initial consumption pattern of consumers,there are several types: [CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP] ,[CONSUME_FROM_SPECIFIC_OFFSETS]

start.mode.timestamp

The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".

start.mode.offsets

The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".

for example:

start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}

common-options [config]

Source plugin common parameters, please refer to Source Common Options for details.

Example

Simple

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test-topic-002"
consumer.group = "consumer-group"
parallelism = 2
batch.size = 20
schema = {
fields {
age = int
name = string
}
}
start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
start.mode.offsets = {
test-topic-002-0 = 20
}

}
}