跳到主要内容
版本:Next

OssFile

Oss文件数据源连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

使用依赖

对于Spark/Flink引擎

  1. 您必须确保您的spark/flink集群已经集成了hadoop。测试过的hadoop版本是2.x。
  2. 您必须确保hadoop-aliyun-xx.jaraliyun-sdk-oss-xx.jarjdom-xx.jar${SEATUNNEL_HOME}/plugins/目录中,并且hadoop-aliyun jar的版本需要与您在spark/flink中使用的hadoop版本相等,aliyun-sdk-oss-xx.jarjdom-xx.jar版本需要是与hadoop-aliyun版本对应的版本。例如:hadoop-aliyun-3.1.4.jar依赖aliyun-sdk-oss-3.4.1.jarjdom-1.1.jar

对于SeaTunnel Zeta引擎

  1. 您必须确保seatunnel-hadoop3-3.1.4-uber.jaraliyun-sdk-oss-3.4.1.jarhadoop-aliyun-3.1.4.jarjdom-1.1.jar${SEATUNNEL_HOME}/lib/目录中。

主要特性

在一次pollNext调用中读取分片中的所有数据。将读取的分片保存在快照中。

数据类型映射

数据类型映射与正在读取的文件类型相关,我们支持以下文件类型:

text csv parquet orc json excel xml

JSON文件类型

如果您将文件类型指定为json,您还应该指定schema选项来告诉连接器如何将数据解析为您想要的行。

例如:

上游数据如下:


{"code": 200, "data": "get success", "success": true}

您也可以在一个文件中保存多条数据,并用换行符分隔:


{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}

您应该按如下方式指定schema:


schema {
fields {
code = int
data = string
success = boolean
}
}

连接器将生成如下数据:

codedatasuccess
200get successtrue

文本或CSV文件类型

如果您将file_format_type设置为textexcelcsvxml。那么需要设置schema字段来告诉连接器如何将数据解析为行。

如果您设置了schema字段,您还应该设置选项field_delimiter,除非file_format_typecsvxmlexcel

您可以按如下方式设置schema和分隔符:


field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}

连接器将生成如下数据:

nameagegender
tyrantlucifer26male

Orc文件类型

如果您将文件类型指定为parquet orc,则不需要schema选项,连接器可以自动找到上游数据的schema。

Orc数据类型SeaTunnel数据类型
BOOLEANBOOLEAN
INTINT
BYTEBYTE
SHORTSHORT
LONGLONG
FLOATFLOAT
DOUBLEDOUBLE
BINARYBINARY
STRING
VARCHAR
CHAR
STRING
DATELOCAL_DATE_TYPE
TIMESTAMPLOCAL_DATE_TIME_TYPE
DECIMALDECIMAL
LIST(STRING)STRING_ARRAY_TYPE
LIST(BOOLEAN)BOOLEAN_ARRAY_TYPE
LIST(TINYINT)BYTE_ARRAY_TYPE
LIST(SMALLINT)SHORT_ARRAY_TYPE
LIST(INT)INT_ARRAY_TYPE
LIST(BIGINT)LONG_ARRAY_TYPE
LIST(FLOAT)FLOAT_ARRAY_TYPE
LIST(DOUBLE)DOUBLE_ARRAY_TYPE
Map<K,V>MapType,K和V的类型将转换为SeaTunnel类型
STRUCTSeaTunnelRowType

Parquet文件类型

如果您将文件类型指定为parquet orc,则不需要schema选项,连接器可以自动找到上游数据的schema。

Parquet数据类型SeaTunnel数据类型
INT_8BYTE
INT_16SHORT
DATEDATE
TIMESTAMP_MILLISTIMESTAMP
INT64LONG
INT96TIMESTAMP
BINARYBYTES
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
FIXED_LEN_BYTE_ARRAYTIMESTAMP
DECIMAL
DECIMALDECIMAL
LIST(STRING)STRING_ARRAY_TYPE
LIST(BOOLEAN)BOOLEAN_ARRAY_TYPE
LIST(TINYINT)BYTE_ARRAY_TYPE
LIST(SMALLINT)SHORT_ARRAY_TYPE
LIST(INT)INT_ARRAY_TYPE
LIST(BIGINT)LONG_ARRAY_TYPE
LIST(FLOAT)FLOAT_ARRAY_TYPE
LIST(DOUBLE)DOUBLE_ARRAY_TYPE
Map<K,V>MapType,K和V的类型将转换为SeaTunnel类型
STRUCTSeaTunnelRowType

选项

名称类型是否必需默认值描述
pathstring-需要读取的Oss路径,可以有子路径,但子路径需要满足一定的格式要求。具体要求可以参考"parse_partition_from_path"选项
file_format_typestring-文件类型,支持以下文件类型:text csv parquet orc json excel xml binary
bucketstring-oss文件系统的bucket地址,例如:oss://seatunnel-test
endpointstring-fs oss端点
read_columnslist-数据源的读取列列表,用户可以使用它来实现字段投影。支持列投影的文件类型如下所示:text csv parquet orc json excel xml。如果用户想在读取text json csv文件时使用此功能,必须配置"schema"选项。
access_keystring-
access_secretstring-
delimiterstring\001字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认\001,与hive的默认分隔符相同。
parse_partition_from_pathbooleantrue控制是否从文件路径解析分区键和值。例如,如果您从路径oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26读取文件。文件中的每条记录数据都将添加这两个字段:name="tyrantlucifer",age=16
date_formatstringyyyy-MM-dd日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd。默认yyyy-MM-dd
datetime_formatstringyyyy-MM-dd HH:mm:ss日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss
time_formatstringHH:mm:ss时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:HH:mm:ss HH:mm:ss.SSS
filename_extensionstring-过滤文件名扩展名,用于过滤具有特定扩展名的文件。例如:csv .txt json .xml
skip_header_row_numberlong0跳过前几行,但仅适用于txt和csv。例如,设置如下:skip_header_row_number = 2。然后SeaTunnel将跳过源文件的前2行
csv_use_header_linebooleanfalse是否使用标题行来解析文件,仅在file_format为csv且文件包含符合RFC 4180的标题行时使用
schemaconfig-上游数据的schema。
sheet_namestring-读取工作簿的工作表,仅在file_format为excel时使用。
xml_row_tagstring-指定XML文件中数据行的标签名称,仅在file_format为xml时使用。
xml_use_attr_formatboolean-指定是否使用标签属性格式处理数据,仅在file_format为xml时使用。
compress_codecstringnone文件使用的压缩编解码器。
encodingstringUTF-8
null_formatstring-仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:\N
binary_chunk_sizeint1024仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。
binary_complete_file_modebooleanfalse仅在file_format_type为binary时使用。是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。
file_filter_patternstring过滤模式,用于过滤文件。
common-optionsconfig-数据源插件通用参数,请参考数据源通用选项了解详情。

compress_codec [string]

文件的压缩编解码器,支持的详细信息如下所示:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc/parquet: 自动识别压缩类型,无需额外设置。

encoding [string]

仅在file_format_type为json、text、csv、xml时使用。 要读取的文件的编码。此参数将由Charset.forName(encoding)解析。

binary_chunk_size [int]

仅在file_format_type为binary时使用。

读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。

binary_complete_file_mode [boolean]

仅在file_format_type为binary时使用。

是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为false。

file_filter_pattern [string]

过滤模式,用于过滤文件。

该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。

文件结构示例:

/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png

匹配规则示例:

示例1匹配所有.txt文件,正则表达式:

/data/seatunnel/20241001/.*\.txt

此示例匹配的结果是:

/data/seatunnel/20241001/report.txt

示例2匹配所有以abc开头的文件,正则表达式:

/data/seatunnel/20241002/abc.*

此示例匹配的结果是:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv

示例3匹配所有以abc开头,且第四个字符是h或g的文件,正则表达式:

/data/seatunnel/20241007/abc[h,g].*

此示例匹配的结果是:

/data/seatunnel/20241007/abch202410.csv

示例4匹配以202410开头的第三级文件夹和以.csv结尾的文件,正则表达式:

/data/seatunnel/202410\d*/.*\.csv

此示例匹配的结果是:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv

schema [config]

仅在file_format_type为text、json、excel、xml或csv时需要配置(或其他我们无法从元数据读取schema的格式)。

fields [Config]

上游数据的schema。

如何创建Oss数据同步作业

以下示例演示如何创建从Oss读取数据并在本地客户端打印的数据同步作业:

# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}

# 创建连接到Oss的数据源
source {
OssFile {
path = "/seatunnel/orc"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "orc"
}
}

# 控制台打印读取的Oss数据
sink {
Console {
}
}
# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}

# 创建连接到Oss的数据源
source {
OssFile {
path = "/seatunnel/json"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}
}

# 控制台打印读取的Oss数据
sink {
Console {
}
}

多表

无需配置schema文件类型,例如:orc

env {
parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}

source {
OssFile {
tables_configs = [
{
schema = {
table = "fake01"
}
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/orc"
file_format_type = "orc"
},
{
schema = {
table = "fake02"
}
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/orc"
file_format_type = "orc"
}
]
plugin_output = "fake"
}
}

sink {
Assert {
rules {
table-names = ["fake01", "fake02"]
}
}
}

需要配置schema文件类型,例如:json


env {
execution.parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}

source {
OssFile {
tables_configs = [
{
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/json"
file_format_type = "json"
schema = {
table = "fake01"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
},
{
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/test/seatunnel/read/json"
file_format_type = "json"
schema = {
table = "fake02"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
}
]
plugin_output = "fake"
}
}

sink {
Assert {
rules {
table-names = ["fake01", "fake02"]
}
}
}

过滤文件

env {
parallelism = 1
job.mode = "BATCH"
}

source {
OssFile {
path = "/seatunnel/orc"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "orc"
// 文件示例 abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
}
}

sink {
Console {
}
}

变更日志

Change Log
ChangeCommitVersion
[Improve][Connector-V2] Support maxcompute sink writer with timestamp field type (#9234)https://github.com/apache/seatunnel/commit/a513c495e3dev
[Doc][Connector-V2] Update save mode config for OssFileSink (#9303)https://github.com/apache/seatunnel/commit/40097d7f3e2.3.11
[improve] update file connectors config (#9034)https://github.com/apache/seatunnel/commit/8041d59dc22.3.11
[Improve][File] Add row_delimiter options into text file sink (#9017)https://github.com/apache/seatunnel/commit/92aa855a342.3.11
Revert " [improve] update localfile connector config" (#9018)https://github.com/apache/seatunnel/commit/cdc79e13ad2.3.10
[improve] update localfile connector config (#8765)https://github.com/apache/seatunnel/commit/def369a85f2.3.10
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769)https://github.com/apache/seatunnel/commit/78b23c0ef52.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Feature][Connector-V2] Support create emtpy file when no data (#8543)https://github.com/apache/seatunnel/commit/275db789182.3.10
[Feature][Connector-V2] Support single file mode in file sink (#8518)https://github.com/apache/seatunnel/commit/e893deed502.3.10
[Feature][File] Support config null format for text file read (#8109)https://github.com/apache/seatunnel/commit/2dbf02df472.3.9
[Improve][API] Unified tables_configs and table_list (#8100)https://github.com/apache/seatunnel/commit/84c0b8d6602.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Improve][Connector-V2] Support read archive compress file (#7633)https://github.com/apache/seatunnel/commit/3f98cd8a162.3.8
[Improve] Added OSSFileCatalog and it's factory (#7458)https://github.com/apache/seatunnel/commit/9006a205db2.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f6446b2.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca74122c2.3.6
[Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971)https://github.com/apache/seatunnel/commit/1a48a9c4932.3.6
[Chore] Fix file spell errors (#6606)https://github.com/apache/seatunnel/commit/2599d3b7362.3.5
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a52.3.5
Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327)https://github.com/apache/seatunnel/commit/ec533ecd9a2.3.5
[Feature][OssFile Connector] Make Oss implement source factory and sink factory (#6062)https://github.com/apache/seatunnel/commit/1a8e9b45542.3.4
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033)https://github.com/apache/seatunnel/commit/c324d663b42.3.4
[Hotfix][Oss File Connector] fix oss connector can not run bug (#6010)https://github.com/apache/seatunnel/commit/755bc2a7302.3.4
Support using multiple hadoop account (#5903)https://github.com/apache/seatunnel/commit/d69d88d1aa2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve][connector-file] unifiy option between file source/sink and update document (#5680)https://github.com/apache/seatunnel/commit/8d87cf8fc42.3.4
[Feature] Support LZO compress on File Read (#5083)https://github.com/apache/seatunnel/commit/a4a19010962.3.4
[Feature][Connector-V2][File] Support read empty directory (#5591)https://github.com/apache/seatunnel/commit/1f58f224a02.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567)https://github.com/apache/seatunnel/commit/0e02db768d2.3.4
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153)https://github.com/apache/seatunnel/commit/a3c13e59eb2.3.3
[Fix][Connector-V2] Fix file-oss config check bug and amend file-oss-jindo factoryIdentifier (#4581)https://github.com/apache/seatunnel/commit/5c4f17df202.3.2
[Feature][ConnectorV2]add file excel sink and source (#4164)https://github.com/apache/seatunnel/commit/e3b97ae5d22.3.2
Change file type to file_format_type in file source/sink (#4249)https://github.com/apache/seatunnel/commit/973a2fae3c2.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector-V2][File] Support compress (#3899)https://github.com/apache/seatunnel/commit/55602f6b1c2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Improve][Connector-V2][File] Improve file connector option rule and document (#3812)https://github.com/apache/seatunnel/commit/bd760776692.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525)https://github.com/apache/seatunnel/commit/031e8e263c2.3.0
[Feature][Connector-V2][File] Add option and factory for file connectors (#3375)https://github.com/apache/seatunnel/commit/db286e86312.3.0
[Improve][Connector-V2][File] Improve code structure (#3238)https://github.com/apache/seatunnel/commit/dd5c3538812.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f262.3.0
[Improve][Connector-V2][File] Support parse field from file path (#2985)https://github.com/apache/seatunnel/commit/0bc12085c22.3.0-beta
[Improve][connector][file] Support user-defined schema for reading text file (#2976)https://github.com/apache/seatunnel/commit/1c05ee0d7e2.3.0-beta
[Improve][Connector] Improve write parquet (#2943)https://github.com/apache/seatunnel/commit/8fd966394b2.3.0-beta
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)https://github.com/apache/seatunnel/commit/61720306e72.2.0-beta
[Improve][Connector-V2] Improve read parquet (#2841)https://github.com/apache/seatunnel/commit/e19bc82f9b2.2.0-beta
[Feature][Connector-V2] Add oss sink (#2629)https://github.com/apache/seatunnel/commit/bb2ad404872.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be69b2.2.0-beta
[chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538)https://github.com/apache/seatunnel/commit/7dc2a273882.2.0-beta
[Feature][Connector-V2] Add oss source connector (#2467)https://github.com/apache/seatunnel/commit/712b77744e2.2.0-beta