Kafka
Kafka 数据接收器
支持引擎
Spark
Flink
SeaTunnel Zeta
主要特性
默认情况下,我们将使用 2pc 来保证消息只发送一次到kafka
描述
将 Rows 内容发送到 Kafka topic
支持的数据源信息
为了使用 Kafka 连接器,需要以下依赖项 可以通过 install-plugin.sh 或从 Maven 中央存储库下载
数据源 | 支持版本 | Maven |
---|---|---|
Kafka | 通用 | 下载 |
接收器选项
名称 | 类型 | 是否需要 | 默认值 | 描述 |
---|---|---|---|---|
topic | String | 是 | - | 当表用作接收器时,topic 名称是要写入数据的 topic |
bootstrap.servers | String | 是 | - | Kafka brokers 使用逗号分隔 |
kafka.config | Map | 否 | - | 除了上述 Kafka Producer 客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 Kafka官方文档中指定的所有生产者参数 |
semantics | String | 否 | NON | 可以选择的语义是 EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。 |
partition_key_fields | Array | 否 | - | 配置字段用作 kafka 消息的key |
partition | Int | 否 | - | 可以指定分区,所有消息都会发送到此分区 |
assign_partitions | Array | 否 | - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息 |
transaction_prefix | String | 否 | - | 如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀 |
format | String | 否 | json | 数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 、 avro 、 protobuf 和native。如果使用 json 或文本格式。默认字段分隔符是, 。如果自定义分隔符,请添加field_delimiter 选项。如果使用canal格式,请参考canal-json。如果使用debezium格式,请参阅 debezium-json 了解详细信息 |
field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符 |
common-options | 否 | - | Sink插件常用参数,请参考 Sink常用选项 了解详情 | |
protobuf_message_name | String | 否 | - | format配置为protobuf时生效,取Message名称 |
protobuf_schema | String | 否 | - | format配置为protobuf时生效取Schema名称 |
参数解释
Topic 格式
目前支持两种格式:
填写topic名称
使用上游数据中的字段值作为 topic ,格式是
${your field name}
, 其中 topic 是上游数据的其中一列的值例如,上游数据如下:
name | age | data |
---|---|---|
Jack | 16 | data-example1 |
Mary | 23 | data-example2 |
如果 ${name}
设置为 topic。因此,第一行发送到 Jack topic,第二行发送到 Mary topic。
语义
在 EXACTLY_ONCE 中,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给 Kafka,该模式下能保证数据精确写入kafka一次,即使任务失败重试也不会出现数据重复和丢失 在 AT_LEAST_ONCE 中,生产者将等待 Kafka 缓冲区中所有未完成的消息在检查点上被 Kafka 生产者确认,该模式下能保证数据至少写入kafka一次,即使任务失败 NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢失,并且消息可能会重复,该模式下,任务失败重试可能会产生数据丢失或重复。
分区关键字段
例如,如果你想使用上游数据中的字段值作为键,可以将这些字段名指定给此属性
上游数据如下所示:
name | age | data |
---|---|---|
Jack | 16 | data-example1 |
Mary | 23 | data-example2 |
如果将 name 设置为 key,那么 name 列的哈希值将决定消息发送到哪个分区。
如果没有设置分区键字段,则将发送空消息键。
消息 key 的格式为 json,如果设置 name 为 key,例如 {"name":"Jack"}
。
所选的字段必须是上游数据中已存在的字段。
分区分配
假设总有五个分区,配置中的 assign_partitions 字段设置为: assign_partitions = ["shoe", "clothing"] 在这种情况下,包含 "shoe" 的消息将被发送到第零个分区,因为 "shoe" 在 assign_partitions 中被标记为零, 而包含 "clothing" 的消息将被发送到第一个分区。 对于其他的消息,我们将使用哈希算法将它们均匀地分配到剩余的分区中。 这个功能是通过 MessageContentPartitioner 类实现的,该类实现了 org.apache.kafka.clients.producer.Partitioner 接口。如果我们需要自定义分区,我们需要实现这个接口。
任务示例
简单:
此示例展示了如何定义一个 SeaTunnel 同步任务,该任务能够通过 FakeSource 自动产生数据并将其发送到 Kafka Sink。在这个例子中,FakeSource 会生成总共 16 行数据(
row.num=16
),每一行都包含两个字段,即name
(字符串类型)和age
(整型)。最终,这些数据将被发送到名为 test_topic 的 topic 中,因此该 topic 也将包含 16 行数据。 如果你还未安装和部署 SeaTunnel,你需要参照 安装SeaTunnel 的指南来进行安装和部署。完成安装和部署后,你可以按照 快速开始使用 SeaTunnel 引擎 的指南来运行任务。
# Defining the runtime environment
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
kafka {
topic = "test_topic"
bootstrap.servers = "localhost:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}
}
AWS MSK SASL/SCRAM
将以下 ${username}
和 ${password}
替换为 AWS MSK 中的配置值。
sink {
kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};"
}
}
}
AWS MSK IAM
从 https://github.com/aws/aws-msk-iam-auth/releases 下载 aws-msk-iam-auth-1.1.5.jar
并将其放入 $SEATUNNEL_HOME/plugin/kafka/lib
中目录。
请确保 IAM 策略具有 kafka-cluster:Connect
如下配置:
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
接收器配置
sink {
kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
}
}
Protobuf配置
format
设置为 protobuf
,配置protobuf
数据结构,protobuf_message_name
和protobuf_schema
参数
使用样例:
sink {
kafka {
topic = "test_protobuf_topic_fake_source"
bootstrap.servers = "kafkaCluster:9092"
format = protobuf
kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
protobuf_message_name = Person
protobuf_schema = """
syntax = "proto3";
package org.apache.seatunnel.format.protobuf;
option java_outer_classname = "ProtobufE2E";
message Person {
int32 c_int32 = 1;
int64 c_int64 = 2;
float c_float = 3;
double c_double = 4;
bool c_bool = 5;
string c_string = 6;
bytes c_bytes = 7;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 8;
map<string, float> attributes = 9;
repeated string phone_numbers = 10;
}
"""
}
}
format
如果需要写入Kafka原生的信息,可以参考下面的配置。
配置示例:
sink {
kafka {
topic = "test_topic_native_sink"
bootstrap.servers = "kafkaCluster:9092"
format = "NATIVE"
}
}
输入参数要求如下:
{
"headers": {
"header1": "header1",
"header2": "header2"
},
"key": "dGVzdF9ieXRlc19kYXRh",
"partition": 3,
"timestamp": 1672531200000,
"timestampType": "CREATE_TIME",
"value": "dGVzdF9ieXRlc19kYXRh"
}
Note:key/value 需要 byte[]类型.
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Kafka] Support native format read/write kafka record (#8724) | https://github.com/apache/seatunnel/commit/86e2d6fcf | 2.3.10 |
[improve] update kafka source default schema from content<ROW<content STRING>> to content<STRING> (#8642) | https://github.com/apache/seatunnel/commit/db6e2994d | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 |
[improve] kafka connector options (#8616) | https://github.com/apache/seatunnel/commit/aadfe99f8 | 2.3.10 |
[Fix][Kafka Source] kafka source use topic as table name instead of fullName (#8401) | https://github.com/apache/seatunnel/commit/3d4f4bb33 | 2.3.10 |
[Feature][Kafka] Add debezium_record_table_filter and fix error (#8391) | https://github.com/apache/seatunnel/commit/b27a30a5a | 2.3.9 |
[Bug][Kafka] kafka reads repeatedly (#8465) | https://github.com/apache/seatunnel/commit/f67f27279 | 2.3.9 |
[Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exception (#7857) | https://github.com/apache/seatunnel/commit/92b3253a5 | 2.3.9 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 |
[Improve][Kafka] Support custom topic for debezium compatible format (#8145) | https://github.com/apache/seatunnel/commit/deefe8762 | 2.3.9 |
[Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d66 | 2.3.9 |
[Fix][Kafka] Fix in kafka streaming mode can not read incremental data (#7871) | https://github.com/apache/seatunnel/commit/a0eeeb9b6 | 2.3.9 |
[Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281e | 2.3.9 |
[Fix][Connector-V2] Fix kafka format_error_handle_way not work (#7838) | https://github.com/apache/seatunnel/commit/63c7b4e9c | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 |
[Feature][kafka] Add arg poll.timeout for interval poll messages (#7606) | https://github.com/apache/seatunnel/commit/09d12fc40 | 2.3.8 |
[Improve][Kafka] kafka source refactored some reader read logic (#6408) | https://github.com/apache/seatunnel/commit/10598b6ae | 2.3.8 |
[Feature][connector-v2]Add Kafka Protobuf Data Parsing Support (#7361) | https://github.com/apache/seatunnel/commit/51c8e1a83 | 2.3.8 |
[Hotfix][Connector] Fix kafka consumer log next startup offset (#7312) | https://github.com/apache/seatunnel/commit/891652399 | 2.3.7 |
[Fix][Connector kafka]Fix Kafka consumer stop fetching after TM node restarted (#7233) | https://github.com/apache/seatunnel/commit/7dc3fa8a1 | 2.3.6 |
[Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135) | https://github.com/apache/seatunnel/commit/1784c01a3 | 2.3.6 |
[Feature][connector][kafka] Support read Maxwell format message from kafka #4415 (#4428) | https://github.com/apache/seatunnel/commit/4281b867a | 2.3.6 |
[Hotfix][Connector-V2][kafka]Kafka consumer group automatically commits offset logic error fix (#6961) | https://github.com/apache/seatunnel/commit/181f01ee5 | 2.3.6 |
[Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740) | https://github.com/apache/seatunnel/commit/c3ac95352 | 2.3.6 |
[Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d | 2.3.6 |
[Fix][Kafka-Sink] fix kafka sink factory option rule (#6657) | https://github.com/apache/seatunnel/commit/37578e103 | 2.3.5 |
[Feature][Connector-V2] Remove useless code for kafka connector (#6157) | https://github.com/apache/seatunnel/commit/0f286d162 | 2.3.4 |
[Feature] support avro format (#5084) | https://github.com/apache/seatunnel/commit/93a006156 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 |
[Feature][formats][ogg] Support read ogg format message #4201 (#4225) | https://github.com/apache/seatunnel/commit/7728e241e | 2.3.4 |
[Improve] Remove all useless prepare , getProducedType method (#5741) | https://github.com/apache/seatunnel/commit/ed94fffbb | 2.3.4 |
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682) | https://github.com/apache/seatunnel/commit/86cba8745 | 2.3.4 |
KafkaSource use Factory to create source (#5635) | https://github.com/apache/seatunnel/commit/1c6176e51 | 2.3.4 |
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562) | https://github.com/apache/seatunnel/commit/41173357f | 2.3.4 |
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709b | 2.3.4 |
[Feature][Connector-V2] connector-kafka source support data conversion extracted by kafka connect source (#4516) | https://github.com/apache/seatunnel/commit/bd7498909 | 2.3.3 |
[Feature][connector][kafka] Support read debezium format message from kafka (#5066) | https://github.com/apache/seatunnel/commit/53a1f0c6c | 2.3.3 |
[hotfix][kafka] Fix the problem that the partition information cannot be obtained when kafka is restored (#4764) | https://github.com/apache/seatunnel/commit/c203ef5f8 | 2.3.2 |
Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687) | https://github.com/apache/seatunnel/commit/228257b2e | 2.3.2 |
[hotfix][e2e][kafka] Fix the job not stopping (#4600) | https://github.com/apache/seatunnel/commit/93471c9ad | 2.3.2 |
[Improve][connector][kafka] Set default value for partition option (#4524) | https://github.com/apache/seatunnel/commit/884f733c3 | 2.3.2 |
[chore] delete unavailable S3 & Kafka Catalogs (#4477) | https://github.com/apache/seatunnel/commit/e0aec5ece | 2.3.2 |
[Feature][API] Add options check before create source and sink and transform in FactoryUtil (#4424) | https://github.com/apache/seatunnel/commit/38f1903be | 2.3.2 |
[Feature][Connector-V2][Kafka] Kafka source supports data deserialization failure skipping (#4364) | https://github.com/apache/seatunnel/commit/e1ed22b15 | 2.3.2 |
[Bug][Connector-v2][KafkaSource]Fix KafkaConsumerThread exit caused by commit offset error. (#4379) | https://github.com/apache/seatunnel/commit/71f4d0c78 | 2.3.2 |
[Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246) | https://github.com/apache/seatunnel/commit/3cdb7cfa4 | 2.3.2 |
Fix KafkaProducer resources have never been released. (#4302) | https://github.com/apache/seatunnel/commit/f99f02caa | 2.3.2 |
[Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351) | https://github.com/apache/seatunnel/commit/336f59049 | 2.3.1 |
[Hotfix][Zeta] Fix TaskExecutionService Deploy Failed The Job Can't Stop (#4265) | https://github.com/apache/seatunnel/commit/cf55b070b | 2.3.1 |
[Feature][CDC] Support export debezium-json format to kafka (#4339) | https://github.com/apache/seatunnel/commit/5817ec07b | 2.3.1 |
[Improve]]Connector-V2[Kafka] Set kafka consumer default group (#4271) | https://github.com/apache/seatunnel/commit/82c784a3e | 2.3.1 |
[chore] Fix the words of canal & kafka (#4261) | https://github.com/apache/seatunnel/commit/077a8d27a | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 |
[Improve][Connector-V2] [StarRocks] Starrocks Support Auto Create Table (#4177) | https://github.com/apache/seatunnel/commit/7e0008e6f | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 |
[Imprve][Connector-V2][Hive] Support read text table & Column projection (#4105) | https://github.com/apache/seatunnel/commit/717620f54 | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 |
Add convertor factory (#4119) | https://github.com/apache/seatunnel/commit/cbdea45d9 | 2.3.1 |
Add ElasticSearch catalog (#4108) | https://github.com/apache/seatunnel/commit/9ee4d8394 | 2.3.1 |
Add Kafka catalog (#4106) | https://github.com/apache/seatunnel/commit/34f1f21e4 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 |
[Feature][Json-format][canal] Support read canal format message (#3950) | https://github.com/apache/seatunnel/commit/b80be72c8 | 2.3.1 |
[Improve][Connector-V2][Kafka] Support extract topic from SeaTunnelRow field (#3742) | https://github.com/apache/seatunnel/commit/8aff80730 | 2.3.1 |
[Feature][shade][Jackson] Add seatunnel-jackson module (#3947) | https://github.com/apache/seatunnel/commit/5d8862ec9 | 2.3.1 |
[Hotfix][Connector-V2][Kafka] Fix the bug that kafka consumer is not close. (#3836) | https://github.com/apache/seatunnel/commit/344726642 | 2.3.1 |
fix commit kafka offset bug. (#3933) | https://github.com/apache/seatunnel/commit/e60ad938b | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 |
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba | 2.3.1 |
[Bug][KafkaSource]Fix the default value of commit_on_checkpoint. (#3831) | https://github.com/apache/seatunnel/commit/df969849f | 2.3.1 |
[Bug][KafkaSource]Failed to parse offset format (#3810) | https://github.com/apache/seatunnel/commit/8e1196acc | 2.3.1 |
[Improve][Connector-V2] Kafka client user configured clientid is preferred (#3783) | https://github.com/apache/seatunnel/commit/aacf0abc0 | 2.3.1 |
[Improve][Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics (#3724) | https://github.com/apache/seatunnel/commit/5e3f196e2 | 2.3.0 |
[Improve][Connector-V2] fix kafka admin client can't get property config (#3721) | https://github.com/apache/seatunnel/commit/74c335170 | 2.3.0 |
[Improve][Connector-V2][Kafka] Add text format for kafka sink connector (#3711) | https://github.com/apache/seatunnel/commit/74bbd76b6 | 2.3.0 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 |
[Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink connector (#3574) | https://github.com/apache/seatunnel/commit/3b573798d | 2.3.0 |
options in conditional need add to required or optional options (#3501) | https://github.com/apache/seatunnel/commit/51d5bcba1 | 2.3.0 |
[Improve][Connector-V2-kafka] Support for dynamic discover topic & partition in streaming mode (#3125) | https://github.com/apache/seatunnel/commit/999cfd606 | 2.3.0 |
[Improve][Connector-V2][Kafka] Support to specify multiple partition keys (#3230) | https://github.com/apache/seatunnel/commit/f65f44f44 | 2.3.0 |
[Feature][Connector-V2][Kafka] Add Kafka option rules (#3388) | https://github.com/apache/seatunnel/commit/cc0cb8cdb | 2.3.0 |
[Improve][Connector-V2][Kafka]Improve kafka metadata code format (#3397) | https://github.com/apache/seatunnel/commit/379da3097 | 2.3.0 |
[Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config (#3157) | https://github.com/apache/seatunnel/commit/3da19d444 | 2.3.0 |
update (#3150) | https://github.com/apache/seatunnel/commit/2b4499275 | 2.3.0-beta |
[Feature][connectors-v2][kafka] Kafka supports custom schema #2371 (#2783) | https://github.com/apache/seatunnel/commit/6506e306e | 2.3.0-beta |
[feature][connector][kafka] Support extract partition from SeaTunnelRow fields (#3085) | https://github.com/apache/seatunnel/commit/385e1f42c | 2.3.0-beta |
[Improve][connector][kafka] sink support custom partition (#3041) | https://github.com/apache/seatunnel/commit/ebddc18c4 | 2.3.0-beta |
[Improve][all] change Log to @Slf4j (#3001) | https://github.com/apache/seatunnel/commit/6016100f1 | 2.3.0-beta |
[Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866) | https://github.com/apache/seatunnel/commit/254223fdb | 2.3.0-beta |
[Connector-V2][Kafka] Fix Kafka Streaming problem (#2759) | https://github.com/apache/seatunnel/commit/e92e7b728 | 2.2.0-beta |
[Improve][Connector-V2] Fix kafka connector (#2745) | https://github.com/apache/seatunnel/commit/90ce3851d | 2.2.0-beta |
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755 | 2.2.0-beta |
[#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta |
StateT of SeaTunnelSource should extend Serializable (#2214) | https://github.com/apache/seatunnel/commit/8c426ef85 | 2.2.0-beta |
[api-draft][Optimize] Optimize module name (#2062) | https://github.com/apache/seatunnel/commit/f79e3112b | 2.2.0-beta |