Apache Pulsar
Apache Pulsar 源连接器
描述
Apache Pulsar 的源连接器。
关键特性
参数
| 名称 | 类型 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|---|
| topic | String | 否 | - | 单表读取的 topic 名称。支持逗号分隔多个 topic。注意:只能在 topic、topic-pattern 和 tables_configs 中选择一种 |
| topic-pattern | String | 否 | - | 用于匹配 topic 名称的正则表达式。注意:只能在 topic、topic-pattern 和 tables_configs 中选择一种 |
| table_path | String | 否 | - | 多表模式中单个配置项对应的逻辑表标识 |
| tables_configs | Array | 否 | - | 多表读取配置。每个 item 可覆盖全局默认值。注意:只能在 topic、topic-pattern 和 tables_configs 中选择一种 |
| topic-discovery.interval | Long | 否 | -1 | 发现新 topic 分区的间隔(毫秒)。非正值禁用发现。仅在使用 topic-pattern 时生效 |
| subscription.name | String | 否 | - | 消费者订阅名。在多表模式下可定义在全局或 item 中 |
| client.service-url | String | 是 | - | Pulsar 服务的客户端 URL,例如 pulsar://localhost:6650 |
| admin.service-url | String | 是 | - | Pulsar 管理端点的 HTTP URL,例如 http://localhost:8080 |
| auth.plugin-class | String | 否 | - | Pulsar 客户端认证插件类名 |
| auth.params | String | 否 | - | Pulsar 客户端认证参数 |
| poll.timeout | Integer | 否 | 100 | 从 Pulsar 拉取消息的超时时间(毫秒) |
| poll.interval | Long | 否 | 50 | 两次拉取之间的间隔时间(毫秒) |
| poll.batch.size | Integer | 否 | 500 | 单次拉取的最大消息数 |
| cursor.startup.mode | Enum | 否 | LATEST | 启动位置模式。可选值:EARLIEST、LATEST、SUBSCRIPTION、TIMESTAMP |
| cursor.startup.timestamp | Long | 否 | - | 当 cursor.startup.mode=TIMESTAMP 时的起始时间戳(毫秒) |
| cursor.reset.mode | Enum | 否 | LATEST | 当 cursor.startup.mode=SUBSCRIPTION 时的重置模式。可选值:EARLIEST、LATEST |
| cursor.stop.mode | Enum | 否 | NEVER | 停止位置模式。可选值:NEVER(流式)、LATEST(批式)、TIMESTAMP(批式) |
| cursor.stop.timestamp | Long | 否 | - | 当 cursor.stop.mode=TIMESTAMP 时的停止时间戳(毫秒) |
| schema | Config | 否 | - | 数据结构,包括字段名称和字段类型 |
| format | String | 否 | json | 数据格式。默认为 json。多表模式仅支持 JSON 和 CANAL_JSON |
| common-options | 否 | - | Source 插件通用参数,请参考 Source Common Options 了解详情 |
topic [String]
单表读取的 topic 名称。也支持用逗号分隔多个 topic,例如 'topic-1,topic-2'。
注意,只能在 topic、topic-pattern 和 tables_configs 中选择一种配置方式。
topic-pattern [String]
用于匹配 topic 名称的正则表达式。作业启动时,所有匹配该正则的 topic 都会被订阅。
注意,只能在 topic、topic-pattern 和 tables_configs 中选择一种配置方式。
table_path [String]
单个 tables_configs 配置项对应的逻辑表标识。该参数主要用于多表模式。
tables_configs [Array]
多表读取配置。每个 item 都可以覆盖全局默认值,例如 format、cursor 相关参数和 subscription.name。
每个 item 必须且只能配置以下其中一个参数:
topictopic-pattern
额外约束:
当使用
topic-pattern时,必须显式配置table_path。subscription.name必须在全局或 item 内存在。多表模式当前只支持
JSON和CANAL_JSON。显式配置的
topic不能与任何topic-pattern发生重叠。在 batch 模式下,多表配置必须全部是 bounded。只有当配置了多于一张表且任意一张表使用
cursor.stop.mode = NEVER时,整个 source 才会被视为 unbounded,并拒绝在 batch 作业中运行。单表模式和仅包含一个配置项的tables_configs保持向后兼容的 batch 行为。当多个
topic-pattern同时匹配到同一个 topic 时,会按tables_configs的声明顺序选择第一个匹配项。请将更具体的 pattern 放在更通用的 pattern 之前。
topic-discovery.interval [Long]
Pulsar 源发现新主题分区的间隔(毫秒)。非正值禁用主题分区发现。
注意,该参数只在使用 topic-pattern 时生效。
subscription.name [String]
消费者订阅名。对每个最终生效的表配置都是必需的;在多表模式下,可以定义在全局,也可以在 tables_configs 的 item 中单独覆盖。
client.service-url [String]
Pulsar 服务的服务 URL 提供程序。要使用客户端库连接到 Pulsar,需要指定 Pulsar 协议 URL。
例如:pulsar://localhost:6650,localhost:6651。
admin.service-url [String]
Pulsar 服务管理端点的 HTTP URL。
例如:http://my-broker.example.com:8080,或开启 TLS 时使用 https://my-broker.example.com:8443。
auth.plugin-class [String]
认证插件类名。
auth.params [String]
认证插件参数。
例如:key1:val1,key2:val2
poll.timeout [Integer]
单次拉取数据的最长等待时间,单位毫秒。值越大,吞吐通常越高,但延迟也会增加。
poll.interval [Long]
轮询间隔时间,单位毫秒。值越小,吞吐通常越高,但 CPU 开销也会增加。
poll.batch.size [Integer]
轮询时要获取的最大记录数。更长的时间会增加吞吐量但也会增加延迟。
cursor.startup.mode [Enum]
Pulsar 消费者的启动模式,有效值为 'EARLIEST'、'LATEST'、'SUBSCRIPTION'、'TIMESTAMP'。
cursor.startup.timestamp [Long]
当 cursor.startup.mode = TIMESTAMP 时使用的起始时间戳,单位毫秒。
cursor.reset.mode [Enum]
当 cursor.startup.mode = SUBSCRIPTION 时使用的 cursor reset 策略,可选值为 'EARLIEST'、'LATEST'。
cursor.stop.mode [Enum]
停止模式,可选值为 'NEVER'、'LATEST'、'TIMESTAMP'。
当值为 'NEVER' 时,作业是实时流式读取;其他模式表示有界读取。
cursor.stop.timestamp [Long]
当 cursor.stop.mode = TIMESTAMP 时使用的停止时间戳,单位毫秒。
schema [Config]
数据结构定义,包括字段名和字段类型。参考 Schema Feature。
format [String]
数据格式。默认值为 json。更多格式说明参考 formats。
通用参数
Source 插件通用参数请参考 Source Common Options。
示例
source {
Pulsar {
topic = "example"
subscription.name = "seatunnel"
client.service-url = "pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
plugin_output = "test"
}
}
多表示例
source {
Pulsar {
subscription.name = "seatunnel-sub"
client.service-url = "pulsar://localhost:6650"
admin.service-url = "http://localhost:8080"
cursor.startup.mode = "EARLIEST"
cursor.stop.mode = "NEVER"
format = "json"
tables_configs = [
{
table_path = "default.orders"
topic = "persistent://public/default/orders"
schema = {
fields {
order_id = "bigint"
user_id = "int"
}
}
},
{
table_path = "default.users"
topic-pattern = "persistent://public/default/users-.*"
subscription.name = "users-sub"
format = "canal_json"
schema = {
fields {
user_id = "int"
name = "string"
}
}
}
]
}
}
如果用于 batch 作业,请将 cursor.stop.mode = "NEVER" 改为有界模式,例如 LATEST 或 TIMESTAMP。
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 |
| [improve] pulsar options (#9180) | https://github.com/apache/seatunnel/commit/26a2160c80 | 2.3.12 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Improve][API] Make sure the table name in TablePath not be null (#7252) | https://github.com/apache/seatunnel/commit/764d8b0bc8 | 2.3.7 |
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
| [PulsarSource]Improve pulsar throughput performance. (#6234) | https://github.com/apache/seatunnel/commit/37461f4f3e | 2.3.4 |
| [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. (#4382) | https://github.com/apache/seatunnel/commit/543d2c5086 | 2.3.4 |
| [Chore] Remove useless DeserializationFormatFactory and its implement (#5880) | https://github.com/apache/seatunnel/commit/f0511544ff | 2.3.4 |
| fix: update IDENTIFIER = Pulsar for pulsar-datasource on project:seatunnel-web (#5852) | https://github.com/apache/seatunnel/commit/3b6de3743e | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709ba | 2.3.4 |
| [Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf73 | 2.3.3 |
| [Feature][Json-format] support read format for pulsar (#4111) | https://github.com/apache/seatunnel/commit/7d61ae93e7 | 2.3.2 |
| [hotfix][pulsar] Fix the bug that can't consume messages all the time. (#4125) | https://github.com/apache/seatunnel/commit/a6705cc5bf | 2.3.2 |
| [Feature] add cdc multiple table support & fix zeta bug | https://github.com/apache/seatunnel/commit/533ff2c2fa | 2.3.1 |
| [hotfix][pulsar] PulsarSource consumer ack exception. (#4237) | https://github.com/apache/seatunnel/commit/9725d675da | 2.3.1 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [Improve][Connector-v2][Pulsar] Set the name of the pulsar consumption thread. (#4182) | https://github.com/apache/seatunnel/commit/e567203f7d | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. (#3989) | https://github.com/apache/seatunnel/commit/aee2c580ea | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
| [Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… (#3590) | https://github.com/apache/seatunnel/commit/4fe9323419 | 2.3.0 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Hotfix][Connector-V2][Pulsar] fix conditional options (#3504) | https://github.com/apache/seatunnel/commit/0066affacf | 2.3.0 |
| [Feature][Connector][pulsar] expose configurable options in Pulsar (#3341) | https://github.com/apache/seatunnel/commit/200faa7c29 | 2.3.0 |
| [Connector][Dependency] Add Miss Dependency Cassandra And Change Kudu Plugin Name (#3432) | https://github.com/apache/seatunnel/commit/6ac6a0a0cd | 2.3.0 |
| [chore] fix pulsar consumer comment error (#3356) | https://github.com/apache/seatunnel/commit/91e632c526 | 2.3.0 |
| [Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f26 | 2.3.0 |
| [hotfix][connector][pulsar] Fix not being able to mark #noMoreNewSplits when restoring (#2945) | https://github.com/apache/seatunnel/commit/5ad69076b3 | 2.3.0-beta |
| Move Handover to common module (#2877) | https://github.com/apache/seatunnel/commit/d94a874bcb | 2.3.0-beta |
| [hotfix][connector-v2] fix pulsar source exceptions (#2820) | https://github.com/apache/seatunnel/commit/8ff0ba7015 | 2.2.0-beta |
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
| [SeaTunnel]Simply seatunnel package pipeline. (#2563) | https://github.com/apache/seatunnel/commit/9d88b6221a | 2.2.0-beta |
| [Improve][Connector-V2] Pulsar support user-defined schema (#2436) | https://github.com/apache/seatunnel/commit/16cabe6a35 | 2.2.0-beta |
| [improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3e | 2.2.0-beta |
StateT of SeaTunnelSource should extend Serializable (#2214) | https://github.com/apache/seatunnel/commit/8c426ef850 | 2.2.0-beta |
| [doc][connector-v2] pulsar source options doc (#2128) | https://github.com/apache/seatunnel/commit/59ce8a2b32 | 2.2.0-beta |
| [api-draft][Optimize] Optimize module name (#2062) | https://github.com/apache/seatunnel/commit/f79e3112b1 | 2.2.0-beta |