S3File
S3 文件 Sink 连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
默认情况下,我们使用 2PC 提交来确保 精确一次。
-  文件格式类型
- text
 - csv
 - parquet
 - orc
 - json
 - excel
 - xml
 - binary
 
 
描述
将数据输出到 AWS S3 文件系统。
支持的数据源信息
| 数据源 | 支持的版本 | 
|---|---|
| S3 | 当前版本 | 
数据库依赖
如果您使用 Spark/Flink,为了使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hadoop。测试的 Hadoop 版本为 2.x。
如果您使用 SeaTunnel引擎,当您下载并安装 SeaTunnel引擎时,它会自动集成 Hadoop jar 包。您可以在
${SEATUNNEL_HOME}/lib下检查 jar 包以确认这一点。 要使用此连接器,您需要将hadoop-aws-3.1.4.jar和aws-java-sdk-bundle-1.12.692.jar放在${SEATUNNEL_HOME}/lib目录下。
数据类型映射
如果写入 csv、text 文件类型,所有列都将为字符串类型。
Orc 文件类型
| SeaTunnel 数据类型 | Orc 数据类型 | 
|---|---|
| STRING | STRING | 
| BOOLEAN | BOOLEAN | 
| TINYINT | BYTE | 
| SMALLINT | SHORT | 
| INT | INT | 
| BIGINT | LONG | 
| FLOAT | FLOAT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| DECIMAL | DECIMAL | 
| BYTES | BINARY | 
| DATE | DATE | 
| TIME  TIMESTAMP  | TIMESTAMP | 
| ROW | STRUCT | 
| NULL | 不支持的数据类型 | 
| ARRAY | LIST | 
| Map | Map | 
Parquet 文件类型
| SeaTunnel 数据类型 | Parquet 数据类型 | 
|---|---|
| STRING | STRING | 
| BOOLEAN | BOOLEAN | 
| TINYINT | INT_8 | 
| SMALLINT | INT_16 | 
| INT | INT32 | 
| BIGINT | INT64 | 
| FLOAT | FLOAT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| DECIMAL | DECIMAL | 
| BYTES | BINARY | 
| DATE | DATE | 
| TIME  TIMESTAMP  | TIMESTAMP_MILLIS | 
| ROW | GroupType | 
| NULL | 不支持的数据类型 | 
| ARRAY | LIST | 
| Map | Map | 
Sink 选项
| 名称 | 类型 | 是否必填 | 默认值 | 描述 | 
|---|---|---|---|---|
| path | string | 是 | - | |
| tmp_path | string | 否 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 mv 将临时目录提交到目标目录。需要一个 S3 目录。 | 
| bucket | string | 是 | - | |
| fs.s3a.endpoint | string | 是 | - | |
| fs.s3a.aws.credentials.provider | string | 是 | com.amazonaws.auth.InstanceProfileCredentialsProvider | 认证 s3a 的方式。目前仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 和 com.amazonaws.auth.InstanceProfileCredentialsProvider。 | 
| access_key | string | 否 | - | 仅当 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用 | 
| access_secret | string | 否 | - | 仅当 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用 | 
| custom_filename | boolean | 否 | false | 是否需要自定义文件名 | 
| file_name_expression | string | 否 | "${transactionId}" | 仅当 custom_filename 为 true 时使用 | 
| filename_time_format | string | 否 | "yyyy.MM.dd" | 仅当 custom_filename 为 true 时使用 | 
| file_format_type | string | 否 | "csv" | |
| field_delimiter | string | 否 | '\001' | 仅当 file_format 为 text 时使用 | 
| row_delimiter | string | 否 | "\n" | 仅当 file_format 为 text 时使用 | 
| have_partition | boolean | 否 | false | 是否需要处理分区。 | 
| partition_by | array | 否 | - | 仅当 have_partition 为 true 时使用 | 
| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 仅当 have_partition 为 true 时使用 | 
| is_partition_field_write_in_file | boolean | 否 | false | 仅当 have_partition 为 true 时使用 | 
| sink_columns | array | 否 | 当此参数为空时,所有字段均为 sink 列 | |
| is_enable_transaction | boolean | 否 | true | |
| batch_size | int | 否 | 1000000 | |
| compress_codec | string | 否 | none | |
| common-options | object | 否 | - | |
| max_rows_in_memory | int | 否 | - | 仅当 file_format 为 excel 时使用 | 
| sheet_name | string | 否 | Sheet${Random number} | 仅当 file_format 为 excel 时使用 | 
| csv_string_quote_mode | enum | 否 | MINIMAL | 仅当 file_format 为 csv 时使用 | 
| xml_root_tag | string | 否 | RECORDS | 仅当 file_format 为 xml 时使用,指定 XML 文件中根元素的标签名称。 | 
| xml_row_tag | string | 否 | RECORD | 仅当 file_format 为 xml 时使用,指定 XML 文件中数据行的标签名称。 | 
| xml_use_attr_format | boolean | 否 | - | 仅当 file_format 为 xml 时使用,指定是否使用标签属性格式处理数据。 | 
| single_file_mode | boolean | 否 | false | 每个并行度只会输出一个文件。当此参数开启时,batch_size 将不会生效。输出文件名不会有文件块后缀。 | 
| create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,仍然会生成相应的数据文件。 | 
| parquet_avro_write_timestamp_as_int96 | boolean | 否 | false | 仅当 file_format 为 parquet 时使用 | 
| parquet_avro_write_fixed_as_int96 | array | 否 | - | 仅当 file_format 为 parquet 时使用 | 
| hadoop_s3_properties | map | 否 | 如果您需要添加其他选项,可以在此处添加,并参考此链接 | |
| schema_save_mode | Enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | 在开启同步任务之前,对目标路径进行不同的处理 | 
| data_save_mode | Enum | 否 | APPEND_DATA | 在开启同步任务之前,对目标路径中的数据文件进行不同的处理 | 
| enable_header_write | boolean | 否 | false | 仅当 file_format_type 为 text,csv 时使用。 false: 不写入表头, true: 写入表头。  | 
| encoding | string | 否 | "UTF-8" | 仅当 file_format_type 为 json,text,csv,xml 时使用。 | 
path [string]
存储数据文件的路径,支持变量替换。例如:path=/test/${database_name}/${schema_name}/${table_name}
hadoop_s3_properties [map]
如果您需要添加其他选项,可以在此处添加,并参考此链接
hadoop_s3_properties {
      "fs.s3a.buffer.dir" = "/data/st_test/s3a"
      "fs.s3a.fast.upload.buffer" = "disk"
   }
custom_filename [boolean]
是否自定义文件名
file_name_expression [string]
仅当 custom_filename 为 true 时使用
file_name_expression 描述了将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now} 或 ${uuid},例如 test_${uuid}_${now},
${now} 表示当前时间,其格式可以通过指定选项 filename_time_format 来定义。
请注意,如果 is_enable_transaction 为 true,我们会在文件头部自动添加 ${transactionId}_。
filename_time_format [string]
仅当 custom_filename 为 true 时使用
当 file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式如下:
| 符号 | 描述 | 
|---|---|
| y | 年 | 
| M | 月 | 
| d | 日 | 
| H | 小时 (0-23) | 
| m | 分钟 | 
| s | 秒 | 
file_format_type [string]
我们支持以下文件类型:
text csv parquet orc json excel xml binary
请注意,最终文件名将以文件格式类型的后缀结尾,文本文件的后缀为 txt。
field_delimiter [string]
行数据中列之间的分隔符。仅在 text 文件格式中需要。
row_delimiter [string]
文件中行之间的分隔符。仅在 text 文件格式中需要。
have_partition [boolean]
是否需要处理分区。
partition_by [array]
仅当 have_partition 为 true 时使用。
根据选定的字段对数据进行分区。
partition_dir_expression [string]
仅当 have_partition 为 true 时使用。
如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,最终文件将放置在分区目录中。
默认的 partition_dir_expression 为 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/。k0 是第一个分区字段,v0 是第一个分区字段的值。
is_partition_field_write_in_file [boolean]
仅当 have_partition 为 true 时使用。
如果 is_partition_field_write_in_file 为 true,分区字段及其值将被写入数据文件。
例如,如果您想写入 Hive 数据文件,其值应为 false。
sink_columns [array]
哪些列需要写入文件,默认值为从 Transform 或 Source 获取的所有列。
字段的顺序决定了文件实际写入的顺序。
is_enable_transaction [boolean]
如果 is_enable_transaction 为 true,我们将确保在将数据写入目标目录时不会丢失或重复。
请注意,如果 is_enable_transaction 为 true,我们会在文件头部自动添加 ${transactionId}_。
目前仅支持 true。
batch_size [int]
文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size 和 checkpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,sink writer 将一直写入文件,直到文件中的行数超过 batch_size。如果 checkpoint.interval 较小,sink writer 将在新的 checkpoint 触发时创建一个新文件。
compress_codec [string]
文件的压缩编解码器,支持的详细信息如下:
- txt: 
lzonone - json: 
lzonone - csv: 
lzonone - orc: 
lzosnappylz4zlibnone - parquet: 
lzosnappylz4gzipbrotlizstdnone 
提示:excel 类型不支持任何压缩格式
common options
Sink 插件通用参数,请参考 Sink 通用选项 获取详细信息。
max_rows_in_memory [int]
当文件格式为 Excel 时,内存中可以缓存的最大数据项数。
sheet_name [string]
写入工作表的名称
csv_string_quote_mode [string]
当文件格式为 CSV 时,CSV 的字符串引用模式。
- ALL: 所有字符串字段都会被引用。
 - MINIMAL: 引用包含特殊字符的字段,如字段分隔符、引用字符或行分隔符字符串中的任何字符。
 - NONE: 从不引用字段。当数据中出现分隔符时,打印机会在其前面加上转义字符。如果未设置转义字符,格式验证将抛出异常。
 
xml_root_tag [string]
指定 XML 文件中根元素的标签名称。
xml_row_tag [string]
指定 XML 文件中数据行的标签名称。
xml_use_attr_format [boolean]
指定是否使用标签属性格式处理数据。
parquet_avro_write_timestamp_as_int96 [boolean]
支持将时间戳写入 Parquet INT96,仅对 parquet 文件有效。
parquet_avro_write_fixed_as_int96 [array]
支持将 12-byte 字段写入 Parquet INT96,仅对 parquet 文件有效。
schema_save_mode[Enum]
在开启同步任务之前,对目标路径进行不同的处理。
选项介绍:
RECREATE_SCHEMA :当路径不存在时创建。如果路径已存在,则删除路径并重新创建。
CREATE_SCHEMA_WHEN_NOT_EXIST :当路径不存在时创建,路径存在时使用路径。
ERROR_WHEN_SCHEMA_NOT_EXIST :当路径不存在时报错
IGNORE :忽略表的处理
data_save_mode[Enum]
在开启同步任务之前,对目标路径中的数据文件进行不同的处理。
选项介绍:
DROP_DATA:使用路径但删除路径中的数据文件。
APPEND_DATA:使用路径,并在路径中添加新文件以写入数据。
ERROR_WHEN_DATA_EXISTS:当路径中存在数据文件时,将报错。
encoding [string]
仅当 file_format_type 为 json,text,csv,xml 时使用。
写入文件的编码。此参数将由 Charset.forName(encoding) 解析。
示例
简单示例:
此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并将其发送到 S3File Sink。FakeSource 总共生成 16 行数据 (row.num=16),每行有两个字段,name (字符串类型) 和 age (int 类型)。最终的目标 s3 目录将创建一个文件,并将所有数据写入其中。 在运行此作业之前,您需要创建 s3 路径:/seatunnel/text。如果您尚未安装和部署 SeaTunnel,您需要按照 安装 SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 使用 SeaTunnel Engine 快速入门 中的说明运行此作业。
# 定义运行时环境
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  # 这是一个示例源插件,仅用于测试和演示功能源插件
  FakeSource {
    parallelism = 1
    plugin_output = "fake"
    row.num = 16
    schema = {
      fields {
        c_map = "map<string, array<int>>"
        c_array = "array<int>"
        name = string
        c_boolean = boolean
        age = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(16, 1)"
        c_null = "null"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
# 如果您想了解更多关于如何配置SeaTunnel以及查看完整的源插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/source
source {
}
transform {
  # 如果您想了解更多关于如何配置SeaTunnel以及查看完整的转换插件列表,
  # 请访问 https://seatunnel.apache.org/docs/transform-v2
}
sink {
    S3File {
      bucket = "s3a://seatunnel-test"
      tmp_path = "/tmp/seatunnel"
      path="/seatunnel/text"
      fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
      fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
      file_format_type = "text"
      field_delimiter = "\t"
      row_delimiter = "\n"
      have_partition = true
      partition_by = ["age"]
      partition_dir_expression = "${k0}=${v0}"
      is_partition_field_write_in_file = true
      custom_filename = true
      file_name_expression = "${transactionId}_${now}"
      filename_time_format = "yyyy.MM.dd"
      sink_columns = ["name","age"]
      is_enable_transaction=true
      hadoop_s3_properties {
        "fs.s3a.buffer.dir" = "/data/st_test/s3a"
        "fs.s3a.fast.upload.buffer" = "disk"
      }
  }
  # 如果您想了解更多关于如何配置SeaTunnel以及查看完整的接收插件列表,
  # 请访问 https://seatunnel.apache.org/docs/connector-v2/sink
}
对于文本文件格式,包含 have_partition、custom_filename、sink_columns 和 com.amazonaws.auth.InstanceProfileCredentialsProvider
S3File {
  bucket = "s3a://seatunnel-test"
  tmp_path = "/tmp/seatunnel"
  path="/seatunnel/text"
  fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
  fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
  file_format_type = "text"
  field_delimiter = "\t"
  row_delimiter = "\n"
  have_partition = true
  partition_by = ["age"]
  partition_dir_expression = "${k0}=${v0}"
  is_partition_field_write_in_file = true
  custom_filename = true
  file_name_expression = "${transactionId}_${now}"
  filename_time_format = "yyyy.MM.dd"
  sink_columns = ["name","age"]
  is_enable_transaction=true
  hadoop_s3_properties {
    "fs.s3a.buffer.dir" = "/data/st_test/s3a"
    "fs.s3a.fast.upload.buffer" = "disk"
  }
}
对于Parquet文件格式,简单配置使用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
S3File {
  bucket = "s3a://seatunnel-test"
  tmp_path = "/tmp/seatunnel"
  path="/seatunnel/parquet"
  fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
  fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
  access_key = "xxxxxxxxxxxxxxxxx"
  secret_key = "xxxxxxxxxxxxxxxxx"
  file_format_type = "parquet"
  hadoop_s3_properties {
    "fs.s3a.buffer.dir" = "/data/st_test/s3a"
    "fs.s3a.fast.upload.buffer" = "disk"
  }
}
对于ORC文件格式,简单配置使用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
S3File {
  bucket = "s3a://seatunnel-test"
  tmp_path = "/tmp/seatunnel"
  path="/seatunnel/orc"
  fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
  fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
  access_key = "xxxxxxxxxxxxxxxxx"
  secret_key = "xxxxxxxxxxxxxxxxx"
  file_format_type = "orc"
  schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
  data_save_mode="APPEND_DATA"
}
多表写入和保存模式
env {
  "job.name"="SeaTunnel_job"
  "job.mode"=STREAMING
}
source {
  MySQL-CDC {
      database-names=[
          "wls_t1"
      ]
      table-names=[
          "wls_t1.mysqlcdc_to_s3_t3",
          "wls_t1.mysqlcdc_to_s3_t4",
          "wls_t1.mysqlcdc_to_s3_t5",
          "wls_t1.mysqlcdc_to_s3_t1",
          "wls_t1.mysqlcdc_to_s3_t2"
      ]
      password="xxxxxx"
      username="xxxxxxxxxxxxx"
      base-url="jdbc:mysql://localhost:3306/qa_source"
  }
}
transform {
}
sink {
  S3File {
    bucket = "s3a://seatunnel-test"
    tmp_path = "/tmp/seatunnel/${table_name}"
    path="/test/${table_name}"
    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "xxxxxxxxxxxxxxxxx"
    secret_key = "xxxxxxxxxxxxxxxxx"
    file_format_type = "orc"
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    data_save_mode="APPEND_DATA"
  }
}
enable_header_write [boolean]
仅在 file_format_type 为 text 或 csv 时使用。false:不写入表头,true:写入表头。
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| Revert " [improve] update localfile connector config" (#9018) | https://github.com/apache/seatunnel/commit/cdc79e13a | 2.3.10 | 
| [improve] update localfile connector config (#8765) | https://github.com/apache/seatunnel/commit/def369a85 | 2.3.10 | 
| [Fix][Connector-V2] Fixed incorrectly setting s3 key in some cases (#8885) | https://github.com/apache/seatunnel/commit/cf4bab5be | 2.3.10 | 
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769) | https://github.com/apache/seatunnel/commit/78b23c0ef | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 | 
| [improve] update S3File connector config option (#8615) | https://github.com/apache/seatunnel/commit/80cc9fa6f | 2.3.10 | 
| [Feature][Connector-V2] Support create emtpy file when no data (#8543) | https://github.com/apache/seatunnel/commit/275db7891 | 2.3.10 | 
| [Feature][Connector-V2] Support single file mode in file sink (#8518) | https://github.com/apache/seatunnel/commit/e893deed5 | 2.3.10 | 
| [Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df4 | 2.3.9 | 
| [Hotfix][Zeta] Fix the dependency conflict between the guava in hadoop-aws and hive-exec (#7986) | https://github.com/apache/seatunnel/commit/a7837f1f1 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Improve][Connector-V2] Support read archive compress file (#7633) | https://github.com/apache/seatunnel/commit/3f98cd8a1 | 2.3.8 | 
| [Improve] Refactor S3FileCatalog and it's factory (#7457) | https://github.com/apache/seatunnel/commit/d928e8b11 | 2.3.8 | 
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446 | 2.3.7 | 
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122 | 2.3.6 | 
| [Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971) | https://github.com/apache/seatunnel/commit/1a48a9c49 | 2.3.6 | 
| [Feature][S3 File] Make S3 File Connector support multiple table write (#6698) | https://github.com/apache/seatunnel/commit/8f2049b2f | 2.3.6 | 
| [Improve][Connector-v2] The hive connector support multiple filesystem (#6648) | https://github.com/apache/seatunnel/commit/8a4c01fe3 | 2.3.6 | 
| [bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class] to be non-static to avoid being reassigned after deserialization (#6717) | https://github.com/apache/seatunnel/commit/79bb70101 | 2.3.6 | 
| [Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a | 2.3.5 | 
| Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) | https://github.com/apache/seatunnel/commit/ec533ecd9 | 2.3.5 | 
| [Test][E2E] Add thread leak check for connector (#5773) | https://github.com/apache/seatunnel/commit/1f2f3fc5f | 2.3.4 | 
| [Feature][Connector]add s3file save mode function (#6131) | https://github.com/apache/seatunnel/commit/81c51073b | 2.3.4 | 
| [Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033) | https://github.com/apache/seatunnel/commit/c324d663b | 2.3.4 | 
| Support using multiple hadoop account (#5903) | https://github.com/apache/seatunnel/commit/d69d88d1a | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 | 
| [Improve][connector-file] unifiy option between file source/sink and update document (#5680) | https://github.com/apache/seatunnel/commit/8d87cf8fc | 2.3.4 | 
[Feature] Support LZO compress on File Read (#5083) | https://github.com/apache/seatunnel/commit/a4a190109 | 2.3.4 | 
| [Feature][Connector-V2][File] Support read empty directory (#5591) | https://github.com/apache/seatunnel/commit/1f58f224a | 2.3.4 | 
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 | 
| [Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567) | https://github.com/apache/seatunnel/commit/0e02db768 | 2.3.4 | 
| [Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153) | https://github.com/apache/seatunnel/commit/a3c13e59e | 2.3.3 | 
| [chore] delete unavailable S3 & Kafka Catalogs (#4477) | https://github.com/apache/seatunnel/commit/e0aec5ece | 2.3.2 | 
| [Feature][ConnectorV2]add file excel sink and source (#4164) | https://github.com/apache/seatunnel/commit/e3b97ae5d | 2.3.2 | 
| Change file type to file_format_type in file source/sink (#4249) | https://github.com/apache/seatunnel/commit/973a2fae3 | 2.3.1 | 
| [Chore] Upgrade guava to 27.0-jre (#4238) | https://github.com/apache/seatunnel/commit/4851bee57 | 2.3.1 | 
| Add redshift datatype convertor (#4245) | https://github.com/apache/seatunnel/commit/b19011517 | 2.3.1 | 
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 | 
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 | 
| Add S3Catalog (#4121) | https://github.com/apache/seatunnel/commit/7d7f50654 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 | 
| [Feature][Connector-V2][File] Support compress (#3899) | https://github.com/apache/seatunnel/commit/55602f6b1 | 2.3.1 | 
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 | 
| [Improve][Connector-V2][File] Improve file connector option rule and document (#3812) | https://github.com/apache/seatunnel/commit/bd7607766 | 2.3.1 | 
| [Feature][Shade] Add seatunnel hadoop3 uber (#3755) | https://github.com/apache/seatunnel/commit/5a024bdf8 | 2.3.0 | 
| [Engine][Checkpoint]Unified naming style (#3714) | https://github.com/apache/seatunnel/commit/bc0bd3bec | 2.3.0 | 
| [Connector][File-S3]Set AK is not required (#3713) | https://github.com/apache/seatunnel/commit/da3c52617 | 2.3.0 | 
| [Connector&Engine]Set S3 AK to optional (#3688) | https://github.com/apache/seatunnel/commit/4710918b0 | 2.3.0 | 
| [Connector][S3]Support s3a protocol (#3632) | https://github.com/apache/seatunnel/commit/ae4cc9c1e | 2.3.0 | 
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 | 
| [Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525) | https://github.com/apache/seatunnel/commit/031e8e263 | 2.3.0 | 
| [Feature][Connector-V2][File] Add option and factory for file connectors (#3375) | https://github.com/apache/seatunnel/commit/db286e863 | 2.3.0 | 
| [Improve][Connector-V2][File] Improve code structure (#3238) | https://github.com/apache/seatunnel/commit/dd5c35388 | 2.3.0 | 
| [Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f2 | 2.3.0 | 
| [Feature][Connector-V2][S3] Add S3 file source & sink connector (#3119) | https://github.com/apache/seatunnel/commit/f27d68ca9 | 2.3.0-beta |