LocalFile
本地文件数据源连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
在 pollNext 调用中读取分片中的所有数据。读取的分片将保存在快照中。
描述
从本地文件系统读取数据。
如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。
如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 ${SEATUNNEL_HOME}/lib
下的 jar 包来确认这一点。
选项
名称 | 类型 | 是否必须 | 默认值 |
---|---|---|---|
path | string | 是 | - |
file_format_type | string | 是 | - |
read_columns | list | 否 | - |
delimiter/field_delimiter | string | 否 | \001 |
parse_partition_from_path | boolean | 否 | true |
date_format | string | 否 | yyyy-MM-dd |
datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss |
time_format | string | 否 | HH:mm:ss |
skip_header_row_number | long | 否 | 0 |
schema | config | 否 | - |
sheet_name | string | 否 | - |
excel_engine | string | 否 | POI |
xml_row_tag | string | 否 | - |
xml_use_attr_format | boolean | 否 | - |
csv_use_header_line | boolean | 否 | false |
file_filter_pattern | string | 否 | - |
filename_extension | string | 否 | - |
compress_codec | string | 否 | none |
archive_compress_codec | string | 否 | none |
encoding | string | 否 | UTF-8 |
null_format | string | 否 | - |
binary_chunk_size | int | 否 | 1024 |
binary_complete_file_mode | boolean | 否 | false |
common-options | 否 | - | |
tables_configs | list | 否 | 用于定义多表任务 |
path [string]
源文件路径。
file_format_type [string]
文件类型,支持以下文件类型:
text
csv
parquet
orc
json
excel
xml
binary
如果您将文件类型指定为 json
,您还应该指定 schema 选项来告诉连接器如何将数据解析为您想要的行。
例如:
上游数据如下:
{"code": 200, "data": "get success", "success": true}
您也可以在一个文件中保存多条数据并用换行符分割:
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}
您应该按如下方式指定 schema:
schema {
fields {
code = int
data = string
success = boolean
}
}
连接器将生成如下数据:
code | data | success |
---|---|---|
200 | get success | true |
如果您将文件类型指定为 parquet
orc
,则不需要 schema 选项,连接器可以自动找到上游数据的 schema。
如果您将文件类型指定为 text
csv
,您可以选择指定或不指定 schema 信息。
例如,上游数据如下:
tyrantlucifer#26#male
如果您不指定数据 schema,连接器将把上游数据视为如下:
content |
---|
tyrantlucifer#26#male |
如果您指定数据 schema,除了 CSV 文件类型外,您还应该指定选项 field_delimiter
您应该按如下方式指定 schema 和分隔符:
field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
连接器将生成如下数据:
name | age | gender |
---|---|---|
tyrantlucifer | 26 | male |
如果您将文件类型指定为 binary
,SeaTunnel 可以同步任何格式的文件,
例如压缩包、图片等。简而言之,任何文件都可以同步到目标位置。
在此要求下,您需要确保源和接收器同时使用 binary
格式进行文件同步。
您可以在下面的示例中找到具体用法。
read_columns [list]
数据源的读取列列表,用户可以使用它来实现字段投影。
delimiter/field_delimiter [string]
delimiter 参数将在 2.3.5 版本后弃用,请使用 field_delimiter 代替。
仅在 file_format 为 text 时需要配置。
字段分隔符,用于告诉连接器如何分割字段。
默认 \001
,与 hive 的默认分隔符相同
parse_partition_from_path [boolean]
控制是否从文件路径解析分区键和值
例如,如果您从路径 file://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26
读取文件
文件中的每条记录数据都将添加这两个字段:
name | age |
---|---|
tyrantlucifer | 26 |
提示:不要在 schema 选项中定义分区字段
date_format [string]
日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:
yyyy-MM-dd
yyyy.MM.dd
yyyy/MM/dd
默认 yyyy-MM-dd
datetime_format [string]
日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:
yyyy-MM-dd HH:mm:ss
yyyy.MM.dd HH:mm:ss
yyyy/MM/dd HH:mm:ss
yyyyMMddHHmmss
默认 yyyy-MM-dd HH:mm:ss
time_format [string]
时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:
HH:mm:ss
HH:mm:ss.SSS
默认 HH:mm:ss
skip_header_row_number [long]
跳过前几行,但仅适用于 txt 和 csv。
例如,设置如下:
skip_header_row_number = 2
然后 SeaTunnel 将跳过源文件的前 2 行
schema [config]
仅在 file_format_type 为 text、json、excel、xml 或 csv(或其他我们无法从元数据读取 schema 的格式)时需要配置。
fields [Config]
上游数据的 schema 信息。
sheet_name [string]
仅在 file_format 为 excel 时需要配置。
读取工作簿的工作表。
excel_engine [string]
仅在 file_format 为 excel 时需要配置。
支持以下文件类型:
POI
EasyExcel
默认的 excel 读取引擎是 POI,但当读取超过 65,000 行的 Excel 时,POI 容易导致内存溢出,因此您可以切换到 EasyExcel 作为读取引擎。
xml_row_tag [string]
仅在 file_format 为 xml 时需要配置。
指定 XML 文件中数据行的标签名称。
xml_use_attr_format [boolean]
仅在 file_format 为 xml 时需要配置。
指定是否使用标签属性格式处理数据。
csv_use_header_line [boolean]
是否使用标题行解析文件,仅在 file_format 为 csv
且文件包含符合 RFC 4180 的标题行时使用
file_filter_pattern [string]
过滤模式,用于过滤文件。
该模式遵循标准正则表达式。详情请参考 https://en.wikipedia.org/wiki/Regular_expression。 以下是一些示例。
文件结构示例:
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png
匹配规则示例:
示例 1:匹配所有 .txt 文件,正则表达式:
/data/seatunnel/20241001/.*\.txt
此示例匹配的结果是:
/data/seatunnel/20241001/report.txt
示例 2:匹配所有以 abc 开头的文件,正则表达式:
/data/seatunnel/20241002/abc.*
此示例匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
示例 3:匹配所有以 abc 开头,且第四个字符是 h 或 g 的文件,正则表达式:
/data/seatunnel/20241007/abc[h,g].*
此示例匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
示例 4:匹配以 202410 开头的第三级文件夹和以 .csv 结尾的文件,正则表达式:
/data/seatunnel/202410\d*/.*\.csv
此示例匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
filename_extension [string]
过滤文件扩展名,用于过滤具有特定扩展名的文件。示例:csv
.txt
json
.xml
。
compress_codec [string]
文件的压缩编解码器及其支持的详细信息如下所示:
- txt:
lzo
none
- json:
lzo
none
- csv:
lzo
none
- orc/parquet:
自动识别压缩类型,无需额外设置。
archive_compress_codec [string]
归档文件的压缩编解码器及其支持的详细信息如下所示:
archive_compress_codec | file_format | archive_compress_suffix |
---|---|---|
ZIP | txt,json,excel,xml | .zip |
TAR | txt,json,excel,xml | .tar |
TAR_GZ | txt,json,excel,xml | .tar.gz |
GZ | txt,json,excel,xml | .gz |
NONE | all | .* |
注意:gz 压缩的 excel 文件需要压缩原始文件或指定文件后缀,例如 e2e.xls ->e2e_test.xls.gz
encoding [string]
仅在 file_format_type 为 json,text,csv,xml 时使用。
要读取的文件的编码。此参数将由 Charset.forName(encoding)
解析。
null_format [string]
仅在 file_format_type 为 text 时使用。 null_format 定义哪些字符串可以表示为 null。
例如:\N
binary_chunk_size [int]
仅在 file_format_type 为 binary 时使用。
读取二进制文件的块大小(以字节为单位)。默认为 1024 字节。较大的值可能会提高大文件的性能,但会使用更多内存。
binary_complete_file_mode [boolean]
仅在 file_format_type 为 binary 时使用。
是否将完整文件作为单个块读取,而不是分割成块。启用时,整个文件内容将一次性读入内存。默认为 false。
通用选项
数据源插件通用参数,请参阅 数据源通用选项 了解详情
tables_configs
用于定义多表任务,当您有多个表要读取时,可以使用此选项定义多个表。
示例
单表
LocalFile {
path = "/apps/hive/demo/student"
file_format_type = "parquet"
}
LocalFile {
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
file_format_type = "json"
}
对于带有 encoding
的 json、text 或 csv 文件格式
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "text"
encoding = "gbk"
}
多表
LocalFile {
tables_configs = [
{
schema {
table = "student"
}
path = "/apps/hive/demo/student"
file_format_type = "parquet"
},
{
schema {
table = "teacher"
}
path = "/apps/hive/demo/teacher"
file_format_type = "parquet"
}
]
}
LocalFile {
tables_configs = [
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
file_format_type = "json"
},
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/teacher"
file_format_type = "json"
}
}
传输二进制文件
env {
parallelism = 1
job.mode = "BATCH"
}
source {
LocalFile {
path = "/seatunnel/read/binary/"
file_format_type = "binary"
binary_chunk_size = 2048
binary_complete_file_mode = false
}
}
sink {
// 您可以将本地文件传输到 s3/hdfs/oss 等。
LocalFile {
path = "/seatunnel/read/binary2/"
file_format_type = "binary"
}
}
过滤文件
env {
parallelism = 1
job.mode = "BATCH"
}
source {
LocalFile {
path = "/data/seatunnel/"
file_format_type = "csv"
skip_header_row_number = 1
// 文件示例 abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
}
}
sink {
Console {
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Sink] File support new format: maxwell_json,canal_json,debezium_json (#9278) (#9336) | https://github.com/apache/seatunnel/commit/a1bfbb20dd | dev |
[Improve][Connector-V2] Support maxcompute sink writer with timestamp field type (#9234) | https://github.com/apache/seatunnel/commit/a513c495e3 | dev |
[improve] update file connectors config (#9034) | https://github.com/apache/seatunnel/commit/8041d59dc2 | 2.3.11 |
[Improve] Refactor file enumerator to prevent duplicate put split (#8989) | https://github.com/apache/seatunnel/commit/fdf1beae9c | 2.3.11 |
[Improve][File] Add row_delimiter options into text file sink (#9017) | https://github.com/apache/seatunnel/commit/92aa855a34 | 2.3.11 |
Revert " [improve] update localfile connector config" (#9018) | https://github.com/apache/seatunnel/commit/cdc79e13ad | 2.3.10 |
[improve] update localfile connector config (#8765) | https://github.com/apache/seatunnel/commit/def369a85f | 2.3.10 |
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769) | https://github.com/apache/seatunnel/commit/78b23c0ef5 | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
[Feature][Connector-V2] Support create emtpy file when no data (#8543) | https://github.com/apache/seatunnel/commit/275db78918 | 2.3.10 |
[Feature][Connector-V2] Support single file mode in file sink (#8518) | https://github.com/apache/seatunnel/commit/e893deed50 | 2.3.10 |
[Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df47 | 2.3.9 |
[Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d660 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
[Improve][Connector-V2] Support read archive compress file (#7633) | https://github.com/apache/seatunnel/commit/3f98cd8a16 | 2.3.8 |
[Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
[feature][connector-file-local] add save mode function for localfile (#7080) | https://github.com/apache/seatunnel/commit/7b2f538310 | 2.3.6 |
[Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971) | https://github.com/apache/seatunnel/commit/1a48a9c493 | 2.3.6 |
[Chore] Fix file spell errors (#6606) | https://github.com/apache/seatunnel/commit/2599d3b736 | 2.3.5 |
[Feature][Connectors-V2][File]support assign encoding for file source/sink (#6489) | https://github.com/apache/seatunnel/commit/d159fbe086 | 2.3.5 |
Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) | https://github.com/apache/seatunnel/commit/ec533ecd9a | 2.3.5 |
[Feature][OssFile Connector] Make Oss implement source factory and sink factory (#6062) | https://github.com/apache/seatunnel/commit/1a8e9b4554 | 2.3.4 |
Add multiple table file sink to base (#6049) | https://github.com/apache/seatunnel/commit/085e0e5fc3 | 2.3.4 |
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033) | https://github.com/apache/seatunnel/commit/c324d663b4 | 2.3.4 |
Support using multiple hadoop account (#5903) | https://github.com/apache/seatunnel/commit/d69d88d1aa | 2.3.4 |
[Feature] LocalFile sink support multiple table (#5931) | https://github.com/apache/seatunnel/commit/0fdf45f94d | 2.3.4 |
[Feature] LocalFileSource support multiple table | https://github.com/apache/seatunnel/commit/72be6663ad | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Improve][connector-file] unifiy option between file source/sink and update document (#5680) | https://github.com/apache/seatunnel/commit/8d87cf8fc4 | 2.3.4 |
[Feature][Connector-V2][File] Support read empty directory (#5591) | https://github.com/apache/seatunnel/commit/1f58f224a0 | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
[Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567) | https://github.com/apache/seatunnel/commit/0e02db768d | 2.3.4 |
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153) | https://github.com/apache/seatunnel/commit/a3c13e59eb | 2.3.3 |
[Feature][ConnectorV2]add file excel sink and source (#4164) | https://github.com/apache/seatunnel/commit/e3b97ae5d2 | 2.3.2 |
Change file type to file_format_type in file source/sink (#4249) | https://github.com/apache/seatunnel/commit/973a2fae3c | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Feature][Connector-V2][File] Support compress (#3899) | https://github.com/apache/seatunnel/commit/55602f6b1c | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
[Improve][Connector-V2][File] Improve file connector option rule and document (#3812) | https://github.com/apache/seatunnel/commit/bd76077669 | 2.3.1 |
[Feature][Shade] Add seatunnel hadoop3 uber (#3755) | https://github.com/apache/seatunnel/commit/5a024bdf8f | 2.3.0 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
[Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525) | https://github.com/apache/seatunnel/commit/031e8e263c | 2.3.0 |
[Feature][Connector-V2][File] Add option and factory for file connectors (#3375) | https://github.com/apache/seatunnel/commit/db286e8631 | 2.3.0 |
[Improve][Connector-V2][File] Improve code structure (#3238) | https://github.com/apache/seatunnel/commit/dd5c353881 | 2.3.0 |
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f26 | 2.3.0 |
[Improve][Connector-V2][File] Support parse field from file path (#2985) | https://github.com/apache/seatunnel/commit/0bc12085c2 | 2.3.0-beta |
[Improve][connector][file] Support user-defined schema for reading text file (#2976) | https://github.com/apache/seatunnel/commit/1c05ee0d7e | 2.3.0-beta |
[Improve][Connector] Improve write parquet (#2943) | https://github.com/apache/seatunnel/commit/8fd966394b | 2.3.0-beta |
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845) | https://github.com/apache/seatunnel/commit/61720306e7 | 2.2.0-beta |
[Improve][Connector-V2] Improve read parquet (#2841) | https://github.com/apache/seatunnel/commit/e19bc82f9b | 2.2.0-beta |
[Bug][Connector-V2] Fix error option (#2775) | https://github.com/apache/seatunnel/commit/488e561eef | 2.2.0-beta |
[Improve][Connector-V2] Refactor local file sink connector code structure (#2655) | https://github.com/apache/seatunnel/commit/6befd599a1 | 2.2.0-beta |
[#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
[chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538) | https://github.com/apache/seatunnel/commit/7dc2a27388 | 2.2.0-beta |
[Feature][Connector-V2] Local file json support (#2465) | https://github.com/apache/seatunnel/commit/65a92f2496 | 2.2.0-beta |
[Feature][Connector-V2] Add local file connector source (#2419) | https://github.com/apache/seatunnel/commit/eff595c452 | 2.2.0-beta |
[Improve][Connector-V2] Refactor the package of local file connector (#2403) | https://github.com/apache/seatunnel/commit/a538daed5c | 2.2.0-beta |
[Feature][Connector-V2] Add json file sink & json format (#2385) | https://github.com/apache/seatunnel/commit/dd68c06b0a | 2.2.0-beta |
[Imporve][Connector-V2] Remove redundant type judge logic because of pr #2315 (#2370) | https://github.com/apache/seatunnel/commit/42e8c25e50 | 2.2.0-beta |
[Feature][Connector-V2] Support orc file format in file connector (#2369) | https://github.com/apache/seatunnel/commit/f44fe1e033 | 2.2.0-beta |
[improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3e | 2.2.0-beta |
[Connector-V2] Add parquet writer in file connector (#2273) | https://github.com/apache/seatunnel/commit/c95cc72cfa | 2.2.0-beta |
[checkstyle] Improved validation scope of MagicNumber (#2194) | https://github.com/apache/seatunnel/commit/6d08b5f369 | 2.2.0-beta |
[Connector-V2] Add Hive sink connector v2 (#2158) | https://github.com/apache/seatunnel/commit/23ad4ee735 | 2.2.0-beta |
[Connector-V2] Add File Sink Connector (#2117) | https://github.com/apache/seatunnel/commit/e2283da64f | 2.2.0-beta |