RocketMQ
RocketMQ source connector
Support Apache RocketMQ Version
- 4.9.0 (Or a newer version, for reference)
Support These Engines
Spark
Flink
SeaTunnel Zeta
Key Features
Description
Source connector for Apache RocketMQ.
Source Options
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| topics | String | yes | - | RocketMQ topicname. If there are multipletopics, use,to split, for example:"tpc1,tpc2". | 
| name.srv.addr | String | yes | - | RocketMQname server cluster address. | 
| tags | String | no | - | RocketMQ tagname. If there are multipletags, use,to split, for example:"tag1,tag2". | 
| acl.enabled | Boolean | no | false | If true, access control is enabled, and access key and secret key need to be configured. | 
| access.key | String | no | ||
| secret.key | String | no | When ACL_ENABLED is true, secret key cannot be empty. | |
| batch.size | int | no | 100 | RocketMQconsumer pull batch size | 
| consumer.group | String | no | SeaTunnel-Consumer-Group | RocketMQ consumer group id, used to distinguish different consumer groups. | 
| commit.on.checkpoint | Boolean | no | true | If true the consumer's offset will be periodically committed in the background. | 
| schema | no | - | The structure of the data, including field names and field types. | |
| format | String | no | json | Data format. The default format is json. Optional text format. The default field separator is ",".If you customize the delimiter, add the "field.delimiter" option. | 
| field.delimiter | String | no | , | Customize the field delimiter for data format | 
| start.mode | String | no | CONSUME_FROM_GROUP_OFFSETS | The initial consumption pattern of consumers,there are several types: [CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS] | 
| start.mode.offsets | no | |||
| start.mode.timestamp | Long | no | The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP". | |
| partition.discovery.interval.millis | long | no | -1 | The interval for dynamically discovering topics and partitions. | 
| ignore_parse_errors | Boolean | no | false | Optional flag to skip parse errors instead of failing. | 
| common-options | config | no | - | Source plugin common parameters, please refer to Source Common Options for details. | 
start.mode.offsets
The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
for example:
start.mode.offsets = {
  topic1-0 = 70
  topic1-1 = 10
  topic1-2 = 10
}
Task Example
Simple
Consumer reads Rocketmq data and prints it to the console type
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  Rocketmq {
    name.srv.addr = "rocketmq-e2e:9876"
    topics = "test_topic_json"
    plugin_output = "rocketmq_table"
    schema = {
      fields {
        id = bigint
        c_map = "map<string, smallint>"
        c_array = "array<tinyint>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(2, 1)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}
transform {
  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
  Console {
  }
}
Specified format consumption simple
When I consume the topic data in json format parsing and pulling the number of bars each time is 400, the consumption starts from the original location
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    plugin_output = "rocketmq_table"
    start.mode = "CONSUME_FROM_FIRST_OFFSET"
    batch.size = "400"
    consumer.group = "test_topic_group"
    format = "json"
    format = json
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}
transform {
  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
  Console {
  }
}
Specified timestamp simple
This is to specify a time to consume, and I dynamically sense the existence of a new partition every 1000 milliseconds to pull the consumption
env {
  parallelism = 1
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
  job.mode = "BATCH"
}
source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    partition.discovery.interval.millis = "1000"
    start.mode.timestamp="1694508382000"
    consumer.group="test_topic_group"
    format="json"
    format = json
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}
transform {
  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
  Console {
  }
}
Specified tag example
Here you can specify a tag to consume data. If there are multiple tags, use
,to separate them, for example: "tag1,tag2"
env {
  parallelism = 1
  job.mode = "BATCH"
  
  # You can set spark configuration here
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
}
source {
  Rocketmq {
    plugin_output = "rocketmq_table"
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    format = text
    # The default field delimiter is ","
    field_delimiter = ","
    tags = "test_tag"
    schema = {
      fields {
        id = bigint
        c_map = "map<string, smallint>"
        c_array = "array<tinyint>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(2, 1)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}
transform {
  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
  Console {
    plugin_input = "rocketmq_table"
  }
}
Changelog
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 | 
| [improve] rocketmq options (#9251) | https://github.com/apache/seatunnel/commit/4cbe3b9172 | 2.3.12 | 
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 | 
| [Improve][Connector-V2] RocketMQ Source add message tag config (#8825) | https://github.com/apache/seatunnel/commit/5913e8c35f | 2.3.10 | 
| [Improve][Connector-V2] Add optional flag for rocketmq connector to skip parse errors instead of failing (#8737) | https://github.com/apache/seatunnel/commit/701f17b5d4 | 2.3.10 | 
| [Improve][Connector-V2] RocketMQ Sink add message tag config (#7996) | https://github.com/apache/seatunnel/commit/97a1b00e48 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 | 
| [Fix][Connector-V2] Fix some throwable error not be caught (#7657) | https://github.com/apache/seatunnel/commit/e19d73282e | 2.3.8 | 
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 | 
| [Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log (#6668) | https://github.com/apache/seatunnel/commit/b7480e1a89 | 2.3.6 | 
| [fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval is set too small(#6624) (#6625) | https://github.com/apache/seatunnel/commit/6e0c81d492 | 2.3.5 | 
| [Test][E2E] Add thread leak check for connector (#5773) | https://github.com/apache/seatunnel/commit/1f2f3fc5f0 | 2.3.4 | 
| [Fix][Connector] Rocketmq source startOffset greater than endOffset error (#6287) | https://github.com/apache/seatunnel/commit/cd44b5894e | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 | 
| [Improve] Remove use SeaTunnelSink::getConsumedTypemethod and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 | 
| [Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709ba | 2.3.4 | 
| [Improve][pom] Formatting pom (#4761) | https://github.com/apache/seatunnel/commit/1d6d3815ec | 2.3.2 | 
| [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583) | https://github.com/apache/seatunnel/commit/e711f6ef4c | 2.3.2 | 
| [Feature][Connector-V2] Add rocketmq source and sink (#4007) | https://github.com/apache/seatunnel/commit/e333897552 | 2.3.2 |