跳到主要内容
版本:Next

Apache Pulsar

Apache Pulsar 源连接器

描述

Apache Pulsar 的源连接器。

关键特性

参数

名称类型是否必填默认值描述
topicString-单表读取的 topic 名称。支持逗号分隔多个 topic。注意:只能在 topictopic-patterntables_configs 中选择一种
topic-patternString-用于匹配 topic 名称的正则表达式。注意:只能在 topictopic-patterntables_configs 中选择一种
table_pathString-多表模式中单个配置项对应的逻辑表标识
tables_configsArray-多表读取配置。每个 item 可覆盖全局默认值。注意:只能在 topictopic-patterntables_configs 中选择一种
topic-discovery.intervalLong-1发现新 topic 分区的间隔(毫秒)。非正值禁用发现。仅在使用 topic-pattern 时生效
subscription.nameString-消费者订阅名。在多表模式下可定义在全局或 item 中
client.service-urlString-Pulsar 服务的客户端 URL,例如 pulsar://localhost:6650
admin.service-urlString-Pulsar 管理端点的 HTTP URL,例如 http://localhost:8080
auth.plugin-classString-Pulsar 客户端认证插件类名
auth.paramsString-Pulsar 客户端认证参数
poll.timeoutInteger100从 Pulsar 拉取消息的超时时间(毫秒)
poll.intervalLong50两次拉取之间的间隔时间(毫秒)
poll.batch.sizeInteger500单次拉取的最大消息数
cursor.startup.modeEnumLATEST启动位置模式。可选值:EARLIESTLATESTSUBSCRIPTIONTIMESTAMP
cursor.startup.timestampLong-cursor.startup.mode=TIMESTAMP 时的起始时间戳(毫秒)
cursor.reset.modeEnumLATESTcursor.startup.mode=SUBSCRIPTION 时的重置模式。可选值:EARLIESTLATEST
cursor.stop.modeEnumNEVER停止位置模式。可选值:NEVER(流式)、LATEST(批式)、TIMESTAMP(批式)
cursor.stop.timestampLong-cursor.stop.mode=TIMESTAMP 时的停止时间戳(毫秒)
schemaConfig-数据结构,包括字段名称和字段类型
formatStringjson数据格式。默认为 json。多表模式仅支持 JSON 和 CANAL_JSON
common-options-Source 插件通用参数,请参考 Source Common Options 了解详情

topic [String]

单表读取的 topic 名称。也支持用逗号分隔多个 topic,例如 'topic-1,topic-2'

注意,只能在 topictopic-patterntables_configs 中选择一种配置方式。

topic-pattern [String]

用于匹配 topic 名称的正则表达式。作业启动时,所有匹配该正则的 topic 都会被订阅。

注意,只能在 topictopic-patterntables_configs 中选择一种配置方式。

table_path [String]

单个 tables_configs 配置项对应的逻辑表标识。该参数主要用于多表模式。

tables_configs [Array]

多表读取配置。每个 item 都可以覆盖全局默认值,例如 format、cursor 相关参数和 subscription.name

每个 item 必须且只能配置以下其中一个参数:

  • topic
  • topic-pattern

额外约束:

  • 当使用 topic-pattern 时,必须显式配置 table_path

  • subscription.name 必须在全局或 item 内存在。

  • 多表模式当前只支持 JSONCANAL_JSON

  • 显式配置的 topic 不能与任何 topic-pattern 发生重叠。

  • 在 batch 模式下,多表配置必须全部是 bounded。只有当配置了多于一张表且任意一张表使用 cursor.stop.mode = NEVER 时,整个 source 才会被视为 unbounded,并拒绝在 batch 作业中运行。单表模式和仅包含一个配置项的 tables_configs 保持向后兼容的 batch 行为。

  • 当多个 topic-pattern 同时匹配到同一个 topic 时,会按 tables_configs 的声明顺序选择第一个匹配项。请将更具体的 pattern 放在更通用的 pattern 之前。

topic-discovery.interval [Long]

Pulsar 源发现新主题分区的间隔(毫秒)。非正值禁用主题分区发现。

注意,该参数只在使用 topic-pattern 时生效。

subscription.name [String]

消费者订阅名。对每个最终生效的表配置都是必需的;在多表模式下,可以定义在全局,也可以在 tables_configs 的 item 中单独覆盖。

client.service-url [String]

Pulsar 服务的服务 URL 提供程序。要使用客户端库连接到 Pulsar,需要指定 Pulsar 协议 URL。

例如:pulsar://localhost:6650,localhost:6651

admin.service-url [String]

Pulsar 服务管理端点的 HTTP URL。

例如:http://my-broker.example.com:8080,或开启 TLS 时使用 https://my-broker.example.com:8443

auth.plugin-class [String]

认证插件类名。

auth.params [String]

认证插件参数。

例如:key1:val1,key2:val2

poll.timeout [Integer]

单次拉取数据的最长等待时间,单位毫秒。值越大,吞吐通常越高,但延迟也会增加。

poll.interval [Long]

轮询间隔时间,单位毫秒。值越小,吞吐通常越高,但 CPU 开销也会增加。

poll.batch.size [Integer]

轮询时要获取的最大记录数。更长的时间会增加吞吐量但也会增加延迟。

cursor.startup.mode [Enum]

Pulsar 消费者的启动模式,有效值为 'EARLIEST''LATEST''SUBSCRIPTION''TIMESTAMP'

cursor.startup.timestamp [Long]

cursor.startup.mode = TIMESTAMP 时使用的起始时间戳,单位毫秒。

cursor.reset.mode [Enum]

cursor.startup.mode = SUBSCRIPTION 时使用的 cursor reset 策略,可选值为 'EARLIEST''LATEST'

cursor.stop.mode [Enum]

停止模式,可选值为 'NEVER''LATEST''TIMESTAMP'

当值为 'NEVER' 时,作业是实时流式读取;其他模式表示有界读取。

cursor.stop.timestamp [Long]

cursor.stop.mode = TIMESTAMP 时使用的停止时间戳,单位毫秒。

schema [Config]

数据结构定义,包括字段名和字段类型。参考 Schema Feature

format [String]

数据格式。默认值为 json。更多格式说明参考 formats

通用参数

Source 插件通用参数请参考 Source Common Options

示例

source {
Pulsar {
topic = "example"
subscription.name = "seatunnel"
client.service-url = "pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
plugin_output = "test"
}
}

多表示例

source {
Pulsar {
subscription.name = "seatunnel-sub"
client.service-url = "pulsar://localhost:6650"
admin.service-url = "http://localhost:8080"
cursor.startup.mode = "EARLIEST"
cursor.stop.mode = "NEVER"
format = "json"

tables_configs = [
{
table_path = "default.orders"
topic = "persistent://public/default/orders"
schema = {
fields {
order_id = "bigint"
user_id = "int"
}
}
},
{
table_path = "default.users"
topic-pattern = "persistent://public/default/users-.*"
subscription.name = "users-sub"
format = "canal_json"
schema = {
fields {
user_id = "int"
name = "string"
}
}
}
]
}
}

如果用于 batch 作业,请将 cursor.stop.mode = "NEVER" 改为有界模式,例如 LATESTTIMESTAMP

变更日志

Change Log
ChangeCommitVersion
[Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671)https://github.com/apache/seatunnel/commit/9212a771402.3.12
[improve] pulsar options (#9180)https://github.com/apache/seatunnel/commit/26a2160c802.3.12
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Improve][API] Make sure the table name in TablePath not be null (#7252)https://github.com/apache/seatunnel/commit/764d8b0bc82.3.7
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d12.3.6
[PulsarSource]Improve pulsar throughput performance. (#6234)https://github.com/apache/seatunnel/commit/37461f4f3e2.3.4
[Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. (#4382)https://github.com/apache/seatunnel/commit/543d2c50862.3.4
[Chore] Remove useless DeserializationFormatFactory and its implement (#5880)https://github.com/apache/seatunnel/commit/f0511544ff2.3.4
fix: update IDENTIFIER = Pulsar for pulsar-datasource on project:seatunnel-web (#5852)https://github.com/apache/seatunnel/commit/3b6de3743e2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709ba2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
[Feature][Json-format] support read format for pulsar (#4111)https://github.com/apache/seatunnel/commit/7d61ae93e72.3.2
[hotfix][pulsar] Fix the bug that can't consume messages all the time. (#4125)https://github.com/apache/seatunnel/commit/a6705cc5bf2.3.2
[Feature] add cdc multiple table support & fix zeta bughttps://github.com/apache/seatunnel/commit/533ff2c2fa2.3.1
[hotfix][pulsar] PulsarSource consumer ack exception. (#4237)https://github.com/apache/seatunnel/commit/9725d675da2.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[Improve][Connector-v2][Pulsar] Set the name of the pulsar consumption thread. (#4182)https://github.com/apache/seatunnel/commit/e567203f7d2.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. (#3989)https://github.com/apache/seatunnel/commit/aee2c580ea2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… (#3590)https://github.com/apache/seatunnel/commit/4fe93234192.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Hotfix][Connector-V2][Pulsar] fix conditional options (#3504)https://github.com/apache/seatunnel/commit/0066affacf2.3.0
[Feature][Connector][pulsar] expose configurable options in Pulsar (#3341)https://github.com/apache/seatunnel/commit/200faa7c292.3.0
[Connector][Dependency] Add Miss Dependency Cassandra And Change Kudu Plugin Name (#3432)https://github.com/apache/seatunnel/commit/6ac6a0a0cd2.3.0
[chore] fix pulsar consumer comment error (#3356)https://github.com/apache/seatunnel/commit/91e632c5262.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f262.3.0
[hotfix][connector][pulsar] Fix not being able to mark #noMoreNewSplits when restoring (#2945)https://github.com/apache/seatunnel/commit/5ad69076b32.3.0-beta
Move Handover to common module (#2877)https://github.com/apache/seatunnel/commit/d94a874bcb2.3.0-beta
[hotfix][connector-v2] fix pulsar source exceptions (#2820)https://github.com/apache/seatunnel/commit/8ff0ba70152.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be69b2.2.0-beta
[SeaTunnel]Simply seatunnel package pipeline. (#2563)https://github.com/apache/seatunnel/commit/9d88b6221a2.2.0-beta
[Improve][Connector-V2] Pulsar support user-defined schema (#2436)https://github.com/apache/seatunnel/commit/16cabe6a352.2.0-beta
[improve][UT] Upgrade junit to 5.+ (#2305)https://github.com/apache/seatunnel/commit/362319ff3e2.2.0-beta
StateT of SeaTunnelSource should extend Serializable (#2214)https://github.com/apache/seatunnel/commit/8c426ef8502.2.0-beta
[doc][connector-v2] pulsar source options doc (#2128)https://github.com/apache/seatunnel/commit/59ce8a2b322.2.0-beta
[api-draft][Optimize] Optimize module name (#2062)https://github.com/apache/seatunnel/commit/f79e3112b12.2.0-beta