RocketMQ
RocketMQ sink 连接器
支持Apache RocketMQ版本
- 4.9.0 (或更新版本,供参考)
支持这些引擎
Spark
Flink
SeaTunnel Zeta
主要特性
默认情况下,我们将使用2pc来保证消息精确一次到RocketMQ。
描述
将数据行写入Apache RocketMQ主题
Sink 参数
| 名称 | 类型 | 是否必填 | 默认值 | 描述 | 
|---|---|---|---|---|
| topic | string | 是 | - | RocketMQ topic名称. | 
| name.srv.addr | string | 是 | - | RocketMQ名称服务器集群地址。 | 
| acl.enabled | Boolean | 否 | false | false | 
| access.key | String | 否 | 当ACL_ENABLED为true时,access key不能为空。 | |
| secret.key | String | 否 | 当ACL_ENABLED为true时, secret key 不能为空。 | |
| producer.group | String | 否 | SeaTunnel-producer-Group | SeaTunnel-producer-Group | 
| tag | String | 否 | - | RocketMQ消息标签。 | 
| partition.key.fields | array | 否 | - | - | 
| format | String | 否 | json | 数据格式。默认格式为json。可选text格式。默认字段分隔符为“,”。如果自定义分隔符,请添加“field_delimiter”选项。 | 
| field.delimiter | String | 否 | , | 自定义数据格式的字段分隔符。 | 
| producer.send.sync | Boolean | 否 | false | 如果为 true, 则消息将同步发送。 | 
| common-options | config | 否 | - | Sink插件常用参数,请参考[sink common options](../Sink-common-Options.md)了解详细信息。 | 
partition.key.fields [array]
配置哪些字段用作RocketMQ消息的键。 例如,如果要使用上游数据中的字段值作为键,可以为此属性指定字段名。 上游数据如下:
| name | age | data | 
|---|---|---|
| Jack | 16 | data-example1 | 
| Mary | 23 | data-example2 | 
如果name被设置为主键,那么name列的哈希值将决定消息被发送到哪个分区。
任务示例
Fake 到 RocketMQ 简单示例
数据是随机生成的,并异步发送到测试主题
env {
  parallelism = 1
}
source {
  FakeSource {
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}
transform {
    #如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表,
    #请前往https://seatunnel.apache.org/docs/category/transform
}
sink {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topic = "test_topic"
  }
}
Rocketmq 到 Rocketmq 简单示例
使用RocketMQ时,会向c_int字段写入哈希数,该哈希数表示写入不同分区的分区数量。这是默认的异步写入方式
env {
  parallelism = 1
}
source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    plugin_output = "rocketmq_table"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}
sink {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topic = "test_topic_sink"
    partition.key.fields = ["c_int"]
  }
}
时间戳消费写入示例
这是流消费中特定的时间戳消费,当添加新分区时,程序将定期刷新感知和消费,并写入另一个主题类型
env {
  parallelism = 1
  job.mode = "STREAMING"
}
source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    plugin_output = "rocketmq_table"
    start.mode = "CONSUME_FROM_FIRST_OFFSET"
    batch.size = "400"
    consumer.group = "test_topic_group"
    format = "json"
    format = json
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}
transform {
    #如果你想了解更多关于如何配置seatunnel的信息,并查看转换插件的完整列表,
    #请前往https://seatunnel.apache.org/docs/category/transform
}
sink {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topic = "test_topic"
    partition.key.fields = ["c_int"]
    producer.send.sync = true
  }
}
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 | 
| [improve] rocketmq options (#9251) | https://github.com/apache/seatunnel/commit/4cbe3b9172 | 2.3.12 | 
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 | 
| [Improve][Connector-V2] RocketMQ Source add message tag config (#8825) | https://github.com/apache/seatunnel/commit/5913e8c35f | 2.3.10 | 
| [Improve][Connector-V2] Add optional flag for rocketmq connector to skip parse errors instead of failing (#8737) | https://github.com/apache/seatunnel/commit/701f17b5d4 | 2.3.10 | 
| [Improve][Connector-V2] RocketMQ Sink add message tag config (#7996) | https://github.com/apache/seatunnel/commit/97a1b00e48 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 | 
| [Fix][Connector-V2] Fix some throwable error not be caught (#7657) | https://github.com/apache/seatunnel/commit/e19d73282e | 2.3.8 | 
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 | 
| [Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log (#6668) | https://github.com/apache/seatunnel/commit/b7480e1a89 | 2.3.6 | 
| [fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval is set too small(#6624) (#6625) | https://github.com/apache/seatunnel/commit/6e0c81d492 | 2.3.5 | 
| [Test][E2E] Add thread leak check for connector (#5773) | https://github.com/apache/seatunnel/commit/1f2f3fc5f0 | 2.3.4 | 
| [Fix][Connector] Rocketmq source startOffset greater than endOffset error (#6287) | https://github.com/apache/seatunnel/commit/cd44b5894e | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 | 
| [Improve] Remove use SeaTunnelSink::getConsumedTypemethod and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 | 
| [Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709ba | 2.3.4 | 
| [Improve][pom] Formatting pom (#4761) | https://github.com/apache/seatunnel/commit/1d6d3815ec | 2.3.2 | 
| [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583) | https://github.com/apache/seatunnel/commit/e711f6ef4c | 2.3.2 | 
| [Feature][Connector-V2] Add rocketmq source and sink (#4007) | https://github.com/apache/seatunnel/commit/e333897552 | 2.3.2 |