Kafka
Kafka 源连接器
支持以下引擎
Spark
Flink
Seatunnel Zeta
主要功能
描述
用于 Apache Kafka 的源连接器。
支持的数据源信息
使用 Kafka 连接器需要以下依赖项。
可以通过 install-plugin.sh 下载或从 Maven 中央仓库获取。
| 数据源 | 支持的版本 | Maven 下载链接 | 
|---|---|---|
| Kafka | 通用版本 | 下载 | 
源选项
| 名称 | 类型 | 是否必填 | 默认值 | 描述 | 
|---|---|---|---|---|
| topic | String | 是 | - | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 'topic-1,topic-2'。 | 
| table_list | Map | 否 | - | 主题列表配置,你可以同时配置一个 table_list和一个topic。 | 
| bootstrap.servers | String | 是 | - | 逗号分隔的 Kafka brokers 列表。 | 
| pattern | Boolean | 否 | false | 如果 pattern设置为true,则会使用指定的正则表达式匹配并订阅主题。 | 
| consumer.group | String | 否 | SeaTunnel-Consumer-Group | Kafka 消费者组 ID,用于区分不同的消费者组。 | 
| commit_on_checkpoint | Boolean | 否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。 | 
| poll.timeout | Long | 否 | 10000 | kafka主动拉取时间间隔(毫秒)。 | 
| kafka.config | Map | 否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 Kafka 官方文档 中指定的所有消费者参数。 | 
| schema | Config | 否 | - | 数据结构,包括字段名称和字段类型。 | 
| format | String | 否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ", "。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 canal-json 了解详细信息。如果使用 debezium 格式,请参考 debezium-json。一些Format的详细信息请参考 formats | 
| format_error_handle_way | String | 否 | fail | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。 | 
| debezium_record_table_filter | Config | 否 | - | 用于过滤 debezium 格式的数据,仅当格式设置为 debezium_json时使用。请参阅下面的debezium_record_table_filter | 
| field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符。 | 
| start_mode | StartMode[earliest],[group_offsets] | 否 | group_offsets | 消费者的初始消费模式。 | 
| start_mode.offsets | Config | 否 | - | 用于 specific_offsets 消费模式的偏移量。 | 
| start_mode.timestamp | Long | 否 | - | 用于 "timestamp" 消费模式的时间。 | 
| partition-discovery.interval-millis | Long | 否 | -1 | 动态发现主题和分区的间隔时间。 | 
| common-options | 否 | - | 源插件的常见参数,详情请参考 Source Common Options。 | |
| protobuf_message_name | String | 否 | - | 当格式设置为 protobuf 时有效,指定消息名称。 | 
| protobuf_schema | String | 否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。 | 
| is_native | Boolean | No | false | 支持保留record的源信息。 | 
debezium_record_table_filter
我们可以使用 debezium_record_table_filter 来过滤 debezium 格式的数据。配置如下:
debezium_record_table_filter {
  database_name = "test"
  schema_name = "public" // null 如果不存在
  table_name = "products"
}
只有 test.public.products 表的数据将被消费。
任务示例
简单示例
此示例读取 Kafka 的 topic_1、topic_2 和 topic_3 的数据并将其打印到客户端。如果尚未安装和部署 SeaTunnel,请按照 安装指南 进行安装和部署。然后,按照 快速开始 运行此任务。
# 定义运行环境
env {
  parallelism = 2
  job.mode = "BATCH"
}
source {
  Kafka {
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
    format = text
    field_delimiter = "#"
    topic = "topic_1,topic_2,topic_3"
    bootstrap.servers = "localhost:9092"
    kafka.config = {
      client.id = client_1
      max.poll.records = 500
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    }
  }  
}
sink {
  Console {}
}
正则表达式主题
source {
    Kafka {
          topic = ".*seatunnel*."
          pattern = "true" 
          bootstrap.servers = "localhost:9092"
          consumer.group = "seatunnel_group"
    }
}
AWS MSK SASL/SCRAM
将以下 ${username} 和 ${password} 替换为 AWS MSK 中的配置值。
source {
    Kafka {
        topic = "seatunnel"
        bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096"
        consumer.group = "seatunnel_group"
        kafka.config = {
            security.protocol=SASL_SSL
            sasl.mechanism=SCRAM-SHA-512
            sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
        }
    }
}
AWS MSK IAM
从 此处 下载 aws-msk-iam-auth-1.1.5.jar 并将其放在 $SEATUNNEL_HOME/plugin/kafka/lib 目录下。
确保 IAM 策略中包含 "kafka-cluster:Connect" 权限,如下所示:
"Effect": "Allow",
"Action": [
    "kafka-cluster:Connect",
    "kafka-cluster:AlterCluster",
    "kafka-cluster:DescribeCluster"
],
源配置示例:
source {
    Kafka {
        topic = "seatunnel"
        bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098"
        consumer.group = "seatunnel_group"
        kafka.config = {
            security.protocol=SASL_SSL
            sasl.mechanism=AWS_MSK_IAM
            sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
            sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        }
    }
}
Kerberos 认证示例
源配置示例:
source {
    Kafka {
        topic = "seatunnel"
        bootstrap.servers = "127.0.0.1:9092"
        consumer.group = "seatunnel_group"
        kafka.config = {
            security.protocol=SASL_PLAINTEXT
            sasl.kerberos.service.name=kafka
            sasl.mechanism=GSSAPI
            java.security.krb5.conf="/etc/krb5.conf"
            sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n        useKeyTab=true \n        storeKey=true  \n        keyTab=\"/path/to/xxx.keytab\" \n        principal=\"user@xxx.com\";"
        }
    }
}
多 Kafka 源示例
根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。
注意: Kafka是一个非结构化数据源,应该使用
tables_configs,将来会删除table_list
env {
  execution.parallelism = 1
  job.mode = "BATCH"
}
source {
  Kafka {
    bootstrap.servers = "kafka_e2e:9092"
    tables_configs = [
      {
        topic = "^test-ogg-sou.*"
        pattern = "true"
        consumer.group = "ogg_multi_group"
        start_mode = earliest
        schema = {
          fields {
            id = "int"
            name = "string"
            description = "string"
            weight = "string"
          }
        },
        format = ogg_json
      },
      {
        topic = "test-cdc_mds"
        start_mode = earliest
        schema = {
          fields {
            id = "int"
            name = "string"
            description = "string"
            weight = "string"
          }
        },
        format = canal_json
      }
    ]
  }
}
sink {
  Jdbc {
    driver = org.postgresql.Driver
    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
    user = test
    password = test
    generate_sink_sql = true
    database = test
    table = public.sink
    primary_keys = ["id"]
  }
}
env {
  execution.parallelism = 1
  job.mode = "BATCH"
}
source {
  Kafka {
   
 bootstrap.servers = "kafka_e2e:9092"
    table_list = [
      {
        topic = "^test-ogg-sou.*"
        pattern = "true"
        consumer.group = "ogg_multi_group"
        start_mode = earliest
        schema = {
          fields {
            id = "int"
            name = "string"
            description = "string"
            weight = "string"
          }
        },
        format = ogg_json
      },
      {
        topic = "test-cdc_mds"
        start_mode = earliest
        schema = {
          fields {
            id = "int"
            name = "string"
            description = "string"
            weight = "string"
          }
        },
        format = canal_json
      }
    ]
  }
}
sink {
  Jdbc {
    driver = org.postgresql.Driver
    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
    user = test
    password = test
    generate_sink_sql = true
    database = test
    table = public.sink
    primary_keys = ["id"]
  }
}
Protobuf配置
format 设置为 protobuf,配置protobuf数据结构,protobuf_message_name和protobuf_schema参数
使用样例:
source {
  Kafka {
    topic = "test_protobuf_topic_fake_source"
    format = protobuf
    protobuf_message_name = Person
    protobuf_schema = """
              syntax = "proto3";
              package org.apache.seatunnel.format.protobuf;
              option java_outer_classname = "ProtobufE2E";
              message Person {
                int32 c_int32 = 1;
                int64 c_int64 = 2;
                float c_float = 3;
                double c_double = 4;
                bool c_bool = 5;
                string c_string = 6;
                bytes c_bytes = 7;
                message Address {
                  string street = 1;
                  string city = 2;
                  string state = 3;
                  string zip = 4;
                }
                Address address = 8;
                map<string, float> attributes = 9;
                repeated string phone_numbers = 10;
              }
              """
    bootstrap.servers = "kafkaCluster:9092"
    start_mode = "earliest"
    plugin_output = "kafka_table"
  }
}
format
如果需要保留Kafka原生的信息,可以参考如下配置。
配置示例:
source {
  Kafka {
    topic = "test_topic_native_source"
    bootstrap.servers = "kafkaCluster:9092"
    start_mode = "earliest"
    format_error_handle_way = skip
    format = "NATIVE"
    value_converter_schema_enabled = false
    consumer.group = "native_group"
  }
}
返回数据格式如下:
{
  "headers": {
    "header1": "header1",
    "header2": "header2"
  },
  "key": "dGVzdF9ieXRlc19kYXRh",  
  "partition": 3,
  "timestamp": 1672531200000,
  "timestampType": "CREATE_TIME",
  "value": "dGVzdF9ieXRlc19kYXRh"
}
注意:key/value是byte[]类型。
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Feature][Kafka] Support native format read/write kafka record (#8724) | https://github.com/apache/seatunnel/commit/86e2d6fcf | 2.3.10 | 
| [improve] update kafka source default schema from content<ROW<content STRING>> to content<STRING> (#8642) | https://github.com/apache/seatunnel/commit/db6e2994d | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 | 
| [improve] kafka connector options (#8616) | https://github.com/apache/seatunnel/commit/aadfe99f8 | 2.3.10 | 
| [Fix][Kafka Source] kafka source use topic as table name instead of fullName (#8401) | https://github.com/apache/seatunnel/commit/3d4f4bb33 | 2.3.10 | 
| [Feature][Kafka] Add debezium_record_table_filterand fix error (#8391) | https://github.com/apache/seatunnel/commit/b27a30a5a | 2.3.9 | 
| [Bug][Kafka] kafka reads repeatedly (#8465) | https://github.com/apache/seatunnel/commit/f67f27279 | 2.3.9 | 
| [Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exception (#7857) | https://github.com/apache/seatunnel/commit/92b3253a5 | 2.3.9 | 
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 | 
| [Improve][Kafka] Support custom topic for debezium compatible format (#8145) | https://github.com/apache/seatunnel/commit/deefe8762 | 2.3.9 | 
| [Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d66 | 2.3.9 | 
| [Fix][Kafka] Fix in kafka streaming mode can not read incremental data (#7871) | https://github.com/apache/seatunnel/commit/a0eeeb9b6 | 2.3.9 | 
| [Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281e | 2.3.9 | 
| [Fix][Connector-V2] Fix kafka format_error_handle_waynot work (#7838) | https://github.com/apache/seatunnel/commit/63c7b4e9c | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Feature][kafka] Add arg poll.timeout for interval poll messages (#7606) | https://github.com/apache/seatunnel/commit/09d12fc40 | 2.3.8 | 
| [Improve][Kafka] kafka source refactored some reader read logic (#6408) | https://github.com/apache/seatunnel/commit/10598b6ae | 2.3.8 | 
| [Feature][connector-v2]Add Kafka Protobuf Data Parsing Support (#7361) | https://github.com/apache/seatunnel/commit/51c8e1a83 | 2.3.8 | 
| [Hotfix][Connector] Fix kafka consumer log next startup offset (#7312) | https://github.com/apache/seatunnel/commit/891652399 | 2.3.7 | 
| [Fix][Connector kafka]Fix Kafka consumer stop fetching after TM node restarted (#7233) | https://github.com/apache/seatunnel/commit/7dc3fa8a1 | 2.3.6 | 
| [Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135) | https://github.com/apache/seatunnel/commit/1784c01a3 | 2.3.6 | 
| [Feature][connector][kafka] Support read Maxwell format message from kafka #4415 (#4428) | https://github.com/apache/seatunnel/commit/4281b867a | 2.3.6 | 
| [Hotfix][Connector-V2][kafka]Kafka consumer group automatically commits offset logic error fix (#6961) | https://github.com/apache/seatunnel/commit/181f01ee5 | 2.3.6 | 
| [Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740) | https://github.com/apache/seatunnel/commit/c3ac95352 | 2.3.6 | 
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d | 2.3.6 | 
| [Fix][Kafka-Sink] fix kafka sink factory option rule (#6657) | https://github.com/apache/seatunnel/commit/37578e103 | 2.3.5 | 
| [Feature][Connector-V2] Remove useless code for kafka connector (#6157) | https://github.com/apache/seatunnel/commit/0f286d162 | 2.3.4 | 
| [Feature] support avro format (#5084) | https://github.com/apache/seatunnel/commit/93a006156 | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 | 
| [Improve] Remove use SeaTunnelSink::getConsumedTypemethod and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 | 
| [Feature][formats][ogg] Support read ogg format message #4201 (#4225) | https://github.com/apache/seatunnel/commit/7728e241e | 2.3.4 | 
| [Improve] Remove all useless prepare,getProducedTypemethod (#5741) | https://github.com/apache/seatunnel/commit/ed94fffbb | 2.3.4 | 
| [Improve] Add default implement for SeaTunnelSink::setTypeInfo(#5682) | https://github.com/apache/seatunnel/commit/86cba8745 | 2.3.4 | 
| KafkaSource use Factory to create source (#5635) | https://github.com/apache/seatunnel/commit/1c6176e51 | 2.3.4 | 
| [Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables(#5562) | https://github.com/apache/seatunnel/commit/41173357f | 2.3.4 | 
| [Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709b | 2.3.4 | 
| [Feature][Connector-V2] connector-kafka source support data conversion extracted by kafka connect source (#4516) | https://github.com/apache/seatunnel/commit/bd7498909 | 2.3.3 | 
| [Feature][connector][kafka] Support read debezium format message from kafka (#5066) | https://github.com/apache/seatunnel/commit/53a1f0c6c | 2.3.3 | 
| [hotfix][kafka] Fix the problem that the partition information cannot be obtained when kafka is restored (#4764) | https://github.com/apache/seatunnel/commit/c203ef5f8 | 2.3.2 | 
| Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687) | https://github.com/apache/seatunnel/commit/228257b2e | 2.3.2 | 
| [hotfix][e2e][kafka] Fix the job not stopping (#4600) | https://github.com/apache/seatunnel/commit/93471c9ad | 2.3.2 | 
| [Improve][connector][kafka] Set default value for partition option (#4524) | https://github.com/apache/seatunnel/commit/884f733c3 | 2.3.2 | 
| [chore] delete unavailable S3 & Kafka Catalogs (#4477) | https://github.com/apache/seatunnel/commit/e0aec5ece | 2.3.2 | 
| [Feature][API] Add options check before create source and sink and transform in FactoryUtil (#4424) | https://github.com/apache/seatunnel/commit/38f1903be | 2.3.2 | 
| [Feature][Connector-V2][Kafka] Kafka source supports data deserialization failure skipping (#4364) | https://github.com/apache/seatunnel/commit/e1ed22b15 | 2.3.2 | 
| [Bug][Connector-v2][KafkaSource]Fix KafkaConsumerThread exit caused by commit offset error. (#4379) | https://github.com/apache/seatunnel/commit/71f4d0c78 | 2.3.2 | 
| [Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246) | https://github.com/apache/seatunnel/commit/3cdb7cfa4 | 2.3.2 | 
| Fix KafkaProducer resources have never been released. (#4302) | https://github.com/apache/seatunnel/commit/f99f02caa | 2.3.2 | 
| [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351) | https://github.com/apache/seatunnel/commit/336f59049 | 2.3.1 | 
| [Hotfix][Zeta] Fix TaskExecutionService Deploy Failed The Job Can't Stop (#4265) | https://github.com/apache/seatunnel/commit/cf55b070b | 2.3.1 | 
| [Feature][CDC] Support export debezium-json format to kafka (#4339) | https://github.com/apache/seatunnel/commit/5817ec07b | 2.3.1 | 
| [Improve]]Connector-V2[Kafka] Set kafka consumer default group (#4271) | https://github.com/apache/seatunnel/commit/82c784a3e | 2.3.1 | 
| [chore] Fix the words of canal&kafka(#4261) | https://github.com/apache/seatunnel/commit/077a8d27a | 2.3.1 | 
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 | 
| [Improve][Connector-V2] [StarRocks] Starrocks Support Auto Create Table (#4177) | https://github.com/apache/seatunnel/commit/7e0008e6f | 2.3.1 | 
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 | 
| [Imprve][Connector-V2][Hive] Support read text table & Column projection (#4105) | https://github.com/apache/seatunnel/commit/717620f54 | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 | 
| Add convertor factory (#4119) | https://github.com/apache/seatunnel/commit/cbdea45d9 | 2.3.1 | 
| Add ElasticSearch catalog (#4108) | https://github.com/apache/seatunnel/commit/9ee4d8394 | 2.3.1 | 
| Add Kafka catalog (#4106) | https://github.com/apache/seatunnel/commit/34f1f21e4 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 | 
| [Feature][Json-format][canal] Support read canal format message (#3950) | https://github.com/apache/seatunnel/commit/b80be72c8 | 2.3.1 | 
| [Improve][Connector-V2][Kafka] Support extract topic from SeaTunnelRow field (#3742) | https://github.com/apache/seatunnel/commit/8aff80730 | 2.3.1 | 
| [Feature][shade][Jackson] Add seatunnel-jackson module (#3947) | https://github.com/apache/seatunnel/commit/5d8862ec9 | 2.3.1 | 
| [Hotfix][Connector-V2][Kafka] Fix the bug that kafka consumer is not close. (#3836) | https://github.com/apache/seatunnel/commit/344726642 | 2.3.1 | 
| fix commit kafka offset bug. (#3933) | https://github.com/apache/seatunnel/commit/e60ad938b | 2.3.1 | 
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 | 
| [Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb | 2.3.1 | 
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba | 2.3.1 | 
| [Bug][KafkaSource]Fix the default value of commit_on_checkpoint. (#3831) | https://github.com/apache/seatunnel/commit/df969849f | 2.3.1 | 
| [Bug][KafkaSource]Failed to parse offset format (#3810) | https://github.com/apache/seatunnel/commit/8e1196acc | 2.3.1 | 
| [Improve][Connector-V2] Kafka client user configured clientid is preferred (#3783) | https://github.com/apache/seatunnel/commit/aacf0abc0 | 2.3.1 | 
| [Improve][Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics (#3724) | https://github.com/apache/seatunnel/commit/5e3f196e2 | 2.3.0 | 
| [Improve][Connector-V2] fix kafka admin client can't get property config (#3721) | https://github.com/apache/seatunnel/commit/74c335170 | 2.3.0 | 
| [Improve][Connector-V2][Kafka] Add text format for kafka sink connector (#3711) | https://github.com/apache/seatunnel/commit/74bbd76b6 | 2.3.0 | 
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 | 
| [Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink connector (#3574) | https://github.com/apache/seatunnel/commit/3b573798d | 2.3.0 | 
| options in conditional need add to required or optional options (#3501) | https://github.com/apache/seatunnel/commit/51d5bcba1 | 2.3.0 | 
| [Improve][Connector-V2-kafka] Support for dynamic discover topic & partition in streaming mode (#3125) | https://github.com/apache/seatunnel/commit/999cfd606 | 2.3.0 | 
| [Improve][Connector-V2][Kafka] Support to specify multiple partition keys (#3230) | https://github.com/apache/seatunnel/commit/f65f44f44 | 2.3.0 | 
| [Feature][Connector-V2][Kafka] Add Kafka option rules (#3388) | https://github.com/apache/seatunnel/commit/cc0cb8cdb | 2.3.0 | 
| [Improve][Connector-V2][Kafka]Improve kafka metadata code format (#3397) | https://github.com/apache/seatunnel/commit/379da3097 | 2.3.0 | 
| [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config (#3157) | https://github.com/apache/seatunnel/commit/3da19d444 | 2.3.0 | 
| update (#3150) | https://github.com/apache/seatunnel/commit/2b4499275 | 2.3.0-beta | 
| [Feature][connectors-v2][kafka] Kafka supports custom schema #2371 (#2783) | https://github.com/apache/seatunnel/commit/6506e306e | 2.3.0-beta | 
| [feature][connector][kafka] Support extract partition from SeaTunnelRow fields (#3085) | https://github.com/apache/seatunnel/commit/385e1f42c | 2.3.0-beta | 
| [Improve][connector][kafka] sink support custom partition (#3041) | https://github.com/apache/seatunnel/commit/ebddc18c4 | 2.3.0-beta | 
| [Improve][all] change Log to @Slf4j (#3001) | https://github.com/apache/seatunnel/commit/6016100f1 | 2.3.0-beta | 
| [Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866) | https://github.com/apache/seatunnel/commit/254223fdb | 2.3.0-beta | 
| [Connector-V2][Kafka] Fix Kafka Streaming problem (#2759) | https://github.com/apache/seatunnel/commit/e92e7b728 | 2.2.0-beta | 
| [Improve][Connector-V2] Fix kafka connector (#2745) | https://github.com/apache/seatunnel/commit/90ce3851d | 2.2.0-beta | 
| [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755 | 2.2.0-beta | 
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta | 
| StateT of SeaTunnelSource should extend Serializable(#2214) | https://github.com/apache/seatunnel/commit/8c426ef85 | 2.2.0-beta | 
| [api-draft][Optimize] Optimize module name (#2062) | https://github.com/apache/seatunnel/commit/f79e3112b | 2.2.0-beta |