S3File
S3文件数据源连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
在一次pollNext调用中读取分片中的所有数据。将读取的分片保存在快照中。
描述
从aws s3文件系统读取数据。
支持的数据源信息
数据源 | 支持的版本 |
---|---|
S3 | current |
依赖
如果您使用spark/flink,为了使用此连接器,您必须确保您的spark/flink集群已经集成了hadoop。测试过的hadoop版本是2.x。
如果您使用SeaTunnel Zeta,它在您下载和安装SeaTunnel Zeta时会自动集成hadoop jar。您可以检查${SEATUNNEL_HOME}/lib下的jar包来确认这一点。
要使用此连接器,您需要将hadoop-aws-3.1.4.jar和aws-java-sdk-bundle-1.12.692.jar放在${SEATUNNEL_HOME}/lib目录中。
数据类型映射
数据类型映射与正在读取的文件类型相关,我们支持以下文件类型:
text
csv
parquet
orc
json
excel
xml
JSON文件类型
如果您将文件类型指定为json
,您还应该指定schema选项来告诉连接器如何将数据解析为您想要的行。
例如:
上游数据如下:
{"code": 200, "data": "get success", "success": true}
您也可以在一个文件中保存多条数据,并用换行符分隔:
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}
您应该按如下方式指定schema:
schema {
fields {
code = int
data = string
success = boolean
}
}
连接器将生成如下数据:
code | data | success |
---|---|---|
200 | get success | true |
文本或CSV文件类型
如果您将file_format_type
设置为text
、excel
、csv
、xml
。那么需要设置schema
字段来告诉连接器如何将数据解析为行。
如果您设置了schema
字段,您还应该设置选项field_delimiter
,除非file_format_type
是csv
、xml
、excel
您可以按如下方式设置schema和分隔符:
field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
连接器将生成如下数据:
name | age | gender |
---|---|---|
tyrantlucifer | 26 | male |
Orc文件类型
如果您将文件类型指定为parquet
orc
,则不需要schema选项,连接器可以自动找到上游数据的schema。
Orc数据类型 | SeaTunnel数据类型 |
---|---|
BOOLEAN | BOOLEAN |
INT | INT |
BYTE | BYTE |
SHORT | SHORT |
LONG | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BINARY | BINARY |
STRING VARCHAR CHAR | STRING |
DATE | LOCAL_DATE_TYPE |
TIMESTAMP | LOCAL_DATE_TIME_TYPE |
DECIMAL | DECIMAL |
LIST(STRING) | STRING_ARRAY_TYPE |
LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE |
LIST(TINYINT) | BYTE_ARRAY_TYPE |
LIST(SMALLINT) | SHORT_ARRAY_TYPE |
LIST(INT) | INT_ARRAY_TYPE |
LIST(BIGINT) | LONG_ARRAY_TYPE |
LIST(FLOAT) | FLOAT_ARRAY_TYPE |
LIST(DOUBLE) | DOUBLE_ARRAY_TYPE |
Map<K,V> | MapType,K和V的类型将转换为SeaTunnel类型 |
STRUCT | SeaTunnelRowType |
Parquet文件类型
如果您将文件类型指定为parquet
orc
,则不需要schema选项,连接器可以自动找到上游数据的schema。
Parquet数据类型 | SeaTunnel数据类型 |
---|---|
INT_8 | BYTE |
INT_16 | SHORT |
DATE | DATE |
TIMESTAMP_MILLIS | TIMESTAMP |
INT64 | LONG |
INT96 | TIMESTAMP |
BINARY | BYTES |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
FIXED_LEN_BYTE_ARRAY | TIMESTAMP DECIMAL |
DECIMAL | DECIMAL |
LIST(STRING) | STRING_ARRAY_TYPE |
LIST(BOOLEAN) | BOOLEAN_ARRAY_TYPE |
LIST(TINYINT) | BYTE_ARRAY_TYPE |
LIST(SMALLINT) | SHORT_ARRAY_TYPE |
LIST(INT) | INT_ARRAY_TYPE |
LIST(BIGINT) | LONG_ARRAY_TYPE |
LIST(FLOAT) | FLOAT_ARRAY_TYPE |
LIST(DOUBLE) | DOUBLE_ARRAY_TYPE |
Map<K,V> | MapType,K和V的类型将转换为SeaTunnel类型 |
STRUCT | SeaTunnelRowType |
选项
名称 | 类型 | 是否必需 | 默认值 | 描述 |
---|---|---|---|---|
path | string | 是 | - | 需要读取的s3路径,可以有子路径,但子路径需要满足一定的格式要求。具体要求可以参考"parse_partition_from_path"选项 |
file_format_type | string | 是 | - | 文件类型,支持以下文件类型:text csv parquet orc json excel xml binary |
bucket | string | 是 | - | s3文件系统的bucket地址,例如:s3n://seatunnel-test ,如果您使用s3a 协议,此参数应为s3a://seatunnel-test 。 |
fs.s3a.endpoint | string | 是 | - | fs s3a端点 |
fs.s3a.aws.credentials.provider | string | 是 | com.amazonaws.auth.InstanceProfileCredentialsProvider | s3a的认证方式。我们目前只支持org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 和com.amazonaws.auth.InstanceProfileCredentialsProvider 。有关凭据提供程序的更多信息,您可以查看Hadoop AWS文档 |
read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它来实现字段投影。支持列投影的文件类型如下所示:text csv parquet orc json excel xml 。如果用户想在读取text json csv 文件时使用此功能,必须配置"schema"选项。 |
access_key | string | 否 | - | 仅在fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用 |
access_secret | string | 否 | - | 仅在fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用 |
hadoop_s3_properties | map | 否 | - | 如果您需要添加其他选项,可以在此处添加并参考此链接 |
delimiter/field_delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认\001 ,与hive的默认分隔符相同。 |
parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径解析分区键和值。例如,如果您从路径s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26 读取文件。文件中的每条记录数据都将添加这两个字段:name="tyrantlucifer",age=16 |
date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd 。默认yyyy-MM-dd |
datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | 日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss |
time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:HH:mm:ss HH:mm:ss.SSS |
skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于txt和csv。例如,设置如下:skip_header_row_number = 2 。然后SeaTunnel将跳过源文件的前2行 |
csv_use_header_line | boolean | 否 | false | 是否使用标题行来解析文件,仅在file_format为csv 且文件包含符合RFC 4180的标题行时使用 |
schema | config | 否 | - | 上游数据的schema。 |
sheet_name | string | 否 | - | 读取工作簿的工作表,仅在file_format为excel时使用。 |
xml_row_tag | string | 否 | - | 指定XML文件中数据行的标签名称,仅对XML文件有效。 |
xml_use_attr_format | boolean | 否 | - | 指定是否使用标签属性格式处理数据,仅对XML文件有效。 |
compress_codec | string | 否 | none | |
archive_compress_codec | string | 否 | none | |
encoding | string | 否 | UTF-8 | |
null_format | string | 否 | - | 仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:\N |
binary_chunk_size | int | 否 | 1024 | 仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。 |
binary_complete_file_mode | boolean | 否 | false | 仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。 |
file_filter_pattern | string | 否 | 过滤模式,用于过滤文件。 | |
filename_extension | string | 否 | - | 过滤文件名扩展名,用于过滤具有特定扩展名的文件。例如:csv .txt json .xml 。 |
common-options | 否 | - | 数据源插件通用参数,请参考数据源通用选项了解详情。 |
delimiter/field_delimiter [string]
delimiter参数将在2.3.5版本后弃用,请使用field_delimiter代替。
file_filter_pattern [string]
过滤模式,用于过滤文件。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。
文件结构示例:
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png
匹配规则示例:
示例1:匹配所有.txt文件,正则表达式:
/data/seatunnel/20241001/.*\.txt
此示例匹配的结果是:
/data/seatunnel/20241001/report.txt
示例2:匹配所有以abc开头的文件,正则表达式:
/data/seatunnel/20241002/abc.*
此示例匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
示例3:匹配所有以abc开头,且第四个字符是h或g的文件,正则表达式:
/data/seatunnel/20241007/abc[h,g].*
此示例匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
示例4:匹配以202410开头的第三级文件夹和以.csv结尾的文件,正则表达式:
/data/seatunnel/202410\d*/.*\.csv
此示例匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
compress_codec [string]
文件的压缩编解码器,支持的详细信息如下所示:
- txt:
lzo
none
- json:
lzo
none
- csv:
lzo
none
- orc/parquet: 自动识别压缩类型,无需额外设置。
archive_compress_codec [string]
归档文件的压缩编解码器,支持的详细信息如下所示:
archive_compress_codec | file_format | archive_compress_suffix |
---|---|---|
ZIP | txt,json,excel,xml | .zip |
TAR | txt,json,excel,xml | .tar |
TAR_GZ | txt,json,excel,xml | .tar.gz |
GZ | txt,json,excel,xml | .gz |
NONE | all | .* |
注意:gz压缩的excel文件需要压缩原始文件或指定文件后缀,例如e2e.xls ->e2e_test.xls.gz
encoding [string]
仅在file_format_type为json、text、csv、xml时使用。
要读取的文件的编码。此参数将由Charset.forName(encoding)
解析。
binary_chunk_size [int]
仅在file_format_type为binary时使用。
读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。
binary_complete_file_mode [boolean]
仅在file_format_type为binary时使用。
是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。
示例
- 在此示例中,我们从s3路径
s3a://seatunnel-test/seatunnel/text
读取数据,此路径中的文件类型是orc。 我们使用org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
进行身份验证,因此需要access_key
和secret_key
。 文件中的所有列都将被读取并发送到接收器。
# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
file_format_type = "orc"
}
}
transform {
# 如果您想获取有关如何配置seatunnel和查看转换插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/transform-v2
}
sink {
Console {}
}
- 使用
InstanceProfileCredentialsProvider
进行身份验证 S3中的文件类型是json,因此需要配置schema选项。
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}
- 使用
InstanceProfileCredentialsProvider
进行身份验证 S3中的文件类型是json,有五个字段(id
、name
、age
、sex
、type
),因此需要配置schema选项。 在此作业中,我们只需要将id
和name
列发送到mysql。
# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
read_columns = ["id", "name"]
schema {
fields {
id = int
name = string
age = int
sex = int
type = string
}
}
}
}
transform {
# 如果您想获取有关如何配置seatunnel和查看转换插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/transform-v2
}
sink {
Console {}
}
过滤文件
env {
parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
read_columns = ["id", "name"]
// 文件示例 abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
}
}
sink {
Console {
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Improve][Connector-V2] Support maxcompute sink writer with timestamp field type (#9234) | https://github.com/apache/seatunnel/commit/a513c495e3 | dev |
[improve] update file connectors config (#9034) | https://github.com/apache/seatunnel/commit/8041d59dc2 | 2.3.11 |
[Improve][File] Add row_delimiter options into text file sink (#9017) | https://github.com/apache/seatunnel/commit/92aa855a34 | 2.3.11 |
Revert " [improve] update localfile connector config" (#9018) | https://github.com/apache/seatunnel/commit/cdc79e13ad | 2.3.10 |
[improve] update localfile connector config (#8765) | https://github.com/apache/seatunnel/commit/def369a85f | 2.3.10 |
[Fix][Connector-V2] Fixed incorrectly setting s3 key in some cases (#8885) | https://github.com/apache/seatunnel/commit/cf4bab5be2 | 2.3.10 |
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769) | https://github.com/apache/seatunnel/commit/78b23c0ef5 | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
[improve] update S3File connector config option (#8615) | https://github.com/apache/seatunnel/commit/80cc9fa6ff | 2.3.10 |
[Feature][Connector-V2] Support create emtpy file when no data (#8543) | https://github.com/apache/seatunnel/commit/275db78918 | 2.3.10 |
[Feature][Connector-V2] Support single file mode in file sink (#8518) | https://github.com/apache/seatunnel/commit/e893deed50 | 2.3.10 |
[Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df47 | 2.3.9 |
[Hotfix][Zeta] Fix the dependency conflict between the guava in hadoop-aws and hive-exec (#7986) | https://github.com/apache/seatunnel/commit/a7837f1f19 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
[Improve][Connector-V2] Support read archive compress file (#7633) | https://github.com/apache/seatunnel/commit/3f98cd8a16 | 2.3.8 |
[Improve] Refactor S3FileCatalog and it's factory (#7457) | https://github.com/apache/seatunnel/commit/d928e8b113 | 2.3.8 |
[Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
[Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971) | https://github.com/apache/seatunnel/commit/1a48a9c493 | 2.3.6 |
[Feature][S3 File] Make S3 File Connector support multiple table write (#6698) | https://github.com/apache/seatunnel/commit/8f2049b2f1 | 2.3.6 |
[Improve][Connector-v2] The hive connector support multiple filesystem (#6648) | https://github.com/apache/seatunnel/commit/8a4c01fe35 | 2.3.6 |
[bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class] to be non-static to avoid being reassigned after deserialization (#6717) | https://github.com/apache/seatunnel/commit/79bb70101a | 2.3.6 |
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a5 | 2.3.5 |
Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) | https://github.com/apache/seatunnel/commit/ec533ecd9a | 2.3.5 |
[Test][E2E] Add thread leak check for connector (#5773) | https://github.com/apache/seatunnel/commit/1f2f3fc5f0 | 2.3.4 |
[Feature][Connector]add s3file save mode function (#6131) | https://github.com/apache/seatunnel/commit/81c51073bf | 2.3.4 |
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033) | https://github.com/apache/seatunnel/commit/c324d663b4 | 2.3.4 |
Support using multiple hadoop account (#5903) | https://github.com/apache/seatunnel/commit/d69d88d1aa | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Improve][connector-file] unifiy option between file source/sink and update document (#5680) | https://github.com/apache/seatunnel/commit/8d87cf8fc4 | 2.3.4 |
[Feature] Support LZO compress on File Read (#5083) | https://github.com/apache/seatunnel/commit/a4a1901096 | 2.3.4 |
[Feature][Connector-V2][File] Support read empty directory (#5591) | https://github.com/apache/seatunnel/commit/1f58f224a0 | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
[Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567) | https://github.com/apache/seatunnel/commit/0e02db768d | 2.3.4 |
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153) | https://github.com/apache/seatunnel/commit/a3c13e59eb | 2.3.3 |
[chore] delete unavailable S3 & Kafka Catalogs (#4477) | https://github.com/apache/seatunnel/commit/e0aec5ecec | 2.3.2 |
[Feature][ConnectorV2]add file excel sink and source (#4164) | https://github.com/apache/seatunnel/commit/e3b97ae5d2 | 2.3.2 |
Change file type to file_format_type in file source/sink (#4249) | https://github.com/apache/seatunnel/commit/973a2fae3c | 2.3.1 |
[Chore] Upgrade guava to 27.0-jre (#4238) | https://github.com/apache/seatunnel/commit/4851bee575 | 2.3.1 |
Add redshift datatype convertor (#4245) | https://github.com/apache/seatunnel/commit/b19011517f | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
Add S3Catalog (#4121) | https://github.com/apache/seatunnel/commit/7d7f506547 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Feature][Connector-V2][File] Support compress (#3899) | https://github.com/apache/seatunnel/commit/55602f6b1c | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
[Improve][Connector-V2][File] Improve file connector option rule and document (#3812) | https://github.com/apache/seatunnel/commit/bd76077669 | 2.3.1 |
[Feature][Shade] Add seatunnel hadoop3 uber (#3755) | https://github.com/apache/seatunnel/commit/5a024bdf8f | 2.3.0 |
[Engine][Checkpoint]Unified naming style (#3714) | https://github.com/apache/seatunnel/commit/bc0bd3bec3 | 2.3.0 |
[Connector][File-S3]Set AK is not required (#3713) | https://github.com/apache/seatunnel/commit/da3c526172 | 2.3.0 |
[Connector&Engine]Set S3 AK to optional (#3688) | https://github.com/apache/seatunnel/commit/4710918b02 | 2.3.0 |
[Connector][S3]Support s3a protocol (#3632) | https://github.com/apache/seatunnel/commit/ae4cc9c1ec | 2.3.0 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
[Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525) | https://github.com/apache/seatunnel/commit/031e8e263c | 2.3.0 |
[Feature][Connector-V2][File] Add option and factory for file connectors (#3375) | https://github.com/apache/seatunnel/commit/db286e8631 | 2.3.0 |
[Improve][Connector-V2][File] Improve code structure (#3238) | https://github.com/apache/seatunnel/commit/dd5c353881 | 2.3.0 |
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f26 | 2.3.0 |
[Feature][Connector-V2][S3] Add S3 file source & sink connector (#3119) | https://github.com/apache/seatunnel/commit/f27d68ca9c | 2.3.0-beta |