跳到主要内容
版本:Next

CosFile

CosFile source 连接器

支持引擎

Spark
Flink
SeaTunnel Zeta

关键特性

在pollNext调用中读取拆分的所有数据。读取的拆分内容将保存在快照中。

描述

从阿里云Cos文件系统读取数据。

:::提示

如果你使用spark/flink,为了使用这个连接器,你必须确保你的spark/flilk集群已经集成了hadoop。测试的hadoop版本是2.x

如果你使用SeaTunnel Engine,当你下载并安装SeaTunnel引擎时,它会自动集成hadoop jar。您可以在${SEATUNNEL_HOME}/lib下检查jar包以确认这一点.

要使用此连接器,您需要将hadoop-cos-{hadoop.version}-{version}.jar和cos_api-bundle-{version}.jar位于${SEATUNNEL_HOME}/lib目录中,下载:Hadoop-Cos-release. 它只支持hadoop 2.6.5+和8.0.2版本+.

:::

选项

名称类型必需默认值
pathstring-
file_format_typestring-
bucketstring-
secret_idstring-
secret_keystring-
regionstring-
read_columnslist-
delimiter/field_delimiterstring\001
parse_partition_from_pathbooleantrue
skip_header_row_numberlong0
date_formatstringyyyy-MM-dd
datetime_formatstringyyyy-MM-dd HH:mm:ss
time_formatstringHH:mm:ss
schemaconfig-
sheet_namestring-
xml_row_tagstring-
xml_use_attr_formatboolean-
file_filter_patternstring
compress_codecstringnone
archive_compress_codecstringnone
encodingstringUTF-8
common-options-

path [string]

源文件路径。

file_format_type [string]

文件类型,支持以下文件类型:

text csv parquet orc json excel xml binary

如果您将文件类型设置为“json”,您还应该分配模式选项,告诉连接器如何将数据解析到所需的行。

例如:

上游数据如下:


{"code": 200, "data": "get success", "success": true}

您还可以将多条数据保存在一个文件中,并按换行符拆分它们:


{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}

您应该按如下方式设置schema架构:


schema {
fields {
code = int
data = string
success = boolean
}
}

连接器将按如下方式生成数据:

codedatasuccess
200get successtrue

如果您将文件类型指定为“parquet” “orc”,则不需要模式选项,连接器可以自动找到上游数据的模式。

如果将文件类型指定为“text” “csv”,则可以选择是否指定schema架构信息。

例如,上游数据如下:


tyrantlucifer#26#male

如果不指定数据schema模式,连接器将按如下方式处理上游数据:

content
tyrantlucifer#26#male

如果指定数据模式,除了CSV文件类型外,还应指定“field_delimiter”选项

您应该按如下方式分配模式和分隔符:


field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}

连接器将按如下方式生成数据:

nameagegender
tyrantlucifer26male

如果将文件类型指定为“二进制”,SeaTunnel可以同步任何格式的文件, 例如压缩包、图片等。简而言之,任何文件都可以同步到目标位置。 根据此要求,您需要确保源端和目标端使用“二进制”格式进行文件同步同时。您可以在下面的示例中找到具体用法。

bucket [string]

Cos文件系统的bucket地址,例如: cos://tyrantlucifer-image-bed

secret_id [string]

Cos文件系统的秘密id。

secret_key [string]

Cos文件系统的密钥。

region [string]

cos文件系统的region。

read_columns [list]

读取数据源的列的列表,用户可以使用它来实现字段映射。

delimiter/field_delimiter [string]

delimiter 参数在2.3.5版本后将弃用,请改用field_delimiter

仅当file_format为文本时才需要配置。

字段分隔符,用于告诉连接器如何对字段进行切片和切块

默认值“\001”,与配置单元的默认分隔符相同

parse_partition_from_path [boolean]

控制是否从文件路径解析分区键和值

例如,如果从路径读取文件cosn://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26

文件中的每个记录数据都将添加这两个字段:

nameage
tyrantlucifer26

提示:不要在schema选项中定义分区字段

skip_header_row_number [long]

跳过前几行,但仅限于txt和csv。

例如,设置如下:

skip_header_row_number = 2

那么SeaTunnel将跳过源文件的前两行

date_format [string]

日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:

yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd

default yyyy-MM-dd

datetime_format [string]

Datetime类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:

yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss

default yyyy-MM-dd HH:mm:ss

time_format [string]

时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:

HH:mm:ss HH:mm:ss.SSS

default HH:mm:ss

schema [config]

仅当file_format_type为文本、json、excel、xml或csv(或我们无法从元数据中读取模式的其他格式)时才需要配置。

fields [Config]

上游数据的模式。

sheet_name [string]

仅当file_format为excel时才需要配置。

阅读工作簿的纸张。

xml_row_tag [string]

仅当file_format为xml时才需要配置。

指定XML文件中数据行的标记名称。

xml_use_attr_format [boolean]

仅当file_format为xml时才需要配置。 指定是否使用标记属性格式处理数据。

file_filter_pattern [string]

过滤模式,用于过滤文件。

该模式遵循标准正则表达式。详情请参阅https://en.wikipedia.org/wiki/Regular_expression. 有一些例子。

文件结构示例:

/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png

匹配规则示例:

示例1匹配所有.txt文件,正则表达式:

/data/seatunnel/20241001/.*\.txt

此示例匹配的结果为:

/data/seatunnel/20241001/report.txt

示例2:匹配所有以abc开头的文件,正则表达式:

/data/seatunnel/20241002/abc.*

此示例匹配的结果为:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv

示例3匹配所有以abc开头的文件,第四个字符是h或g,正则表达式:

/data/seatunnel/20241007/abc[h,g].*

此示例匹配的结果为:

/data/seatunnel/20241007/abch202410.csv

示例4:匹配以202410开头的三级文件夹和以.csv结尾的文件,正则表达式:

/data/seatunnel/202410\d*/.*\.csv

此示例匹配的结果为:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv

compress_codec [string]

文件的压缩编解码器和支持的详细信息如下所示:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc/parquet:
    自动识别压缩类型,无需额外设置。

archive_compress_codec [string]

归档文件的压缩编解码器和支持的详细信息如下所示:

archive_compress_codecfile_formatarchive_compress_suffix
ZIPtxt,json,excel,xml.zip
TARtxt,json,excel,xml.tar
TAR_GZtxt,json,excel,xml.tar.gz
GZtxt,json,excel,xml.gz
NONEall.*

注意:gz压缩的excel文件需要压缩原始文件或指定文件后缀,如e2e.xls->e2e_test.xls.gz

encoding [string]

仅当file_format_type为json、text、csv、xml时使用。 要读取的文件的编码。此参数将由Charset.forName(encoding)解析。

common options

源插件常用参数,详见[源端通用选项](../Source-common-Options.md)。

例如


CosFile {
path = "/seatunnel/orc"
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
file_format_type = "orc"
}


CosFile {
path = "/seatunnel/json"
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}

传输二进制文件


env {
parallelism = 1
job.mode = "BATCH"
}

source {
CosFile {
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
path = "/seatunnel/read/binary/"
file_format_type = "binary"
}
}
sink {
// 您可以将本地文件传输到s3/hdfs/oss等。
CosFile {
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
path = "/seatunnel/read/binary2/"
file_format_type = "binary"
}
}

Filter File

env {
parallelism = 1
job.mode = "BATCH"
}

source {
CosFile {
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
path = "/seatunnel/read/binary/"
file_format_type = "binary"
// file example abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
}
}

sink {
Console {
}
}

变更日志

Change Log
ChangeCommitVersion
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769)https://github.com/apache/seatunnel/commit/78b23c0ef5dev
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eebdev
[Feature][Connector-V2] Support create emtpy file when no data (#8543)https://github.com/apache/seatunnel/commit/275db78918dev
[Feature][Connector-V2] Support single file mode in file sink (#8518)https://github.com/apache/seatunnel/commit/e893deed50dev
[Feature][File] Support config null format for text file read (#8109)https://github.com/apache/seatunnel/commit/2dbf02df472.3.9
[Improve][Connector-V2] Change File Read/WriteStrategy setSeaTunnelRowTypeInfo to setCatalogTable (#7829)https://github.com/apache/seatunnel/commit/6b5f74e5242.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Improve][Connector-V2] Support read archive compress file (#7633)https://github.com/apache/seatunnel/commit/3f98cd8a162.3.8
[Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971)https://github.com/apache/seatunnel/commit/1a48a9c4932.3.6
[Feature][Connector-V2] Supports the transfer of any file (#6826)https://github.com/apache/seatunnel/commit/c1401787b32.3.6
[Feature][Tool] Add connector check script for issue 6199 (#6635)https://github.com/apache/seatunnel/commit/65aedf6a792.3.5
Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327)https://github.com/apache/seatunnel/commit/ec533ecd9a2.3.5
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033)https://github.com/apache/seatunnel/commit/c324d663b42.3.4
Support using multiple hadoop account (#5903)https://github.com/apache/seatunnel/commit/d69d88d1aa2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve][connector-file] unifiy option between file source/sink and update document (#5680)https://github.com/apache/seatunnel/commit/8d87cf8fc42.3.4
[Feature] Support LZO compress on File Read (#5083)https://github.com/apache/seatunnel/commit/a4a19010962.3.4
[Feature][Connector-V2][File] Support read empty directory (#5591)https://github.com/apache/seatunnel/commit/1f58f224a02.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567)https://github.com/apache/seatunnel/commit/0e02db768d2.3.4
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153)https://github.com/apache/seatunnel/commit/a3c13e59eb2.3.3
[Feature][Connector-V2][File] Add cos source&sink (#4979)https://github.com/apache/seatunnel/commit/1f946764362.3.3