Hdfs文件
Hdfs文件 数据源连接器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
在一次 pollNext 调用中读取分片中的所有数据。将读取的分片保存在快照中。
描述
从Hdfs文件系统中读取数据。
支持的数据源信息
数据源 | 支持的版本 |
---|---|
Hdfs文件 | hadoop 2.x 和 3.x |
源选项
名称 | 类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
path | string | 是 | - | 源文件路径。 |
file_format_type | string | 是 | - | 我们支持以下文件类型:text json csv orc parquet excel 。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 txt 。 |
fs.defaultFS | string | 是 | - | 以 hdfs:// 开头的 Hadoop 集群地址,例如:hdfs://hadoopcluster 。 |
read_columns | list | 否 | - | 数据源的读取列列表,用户可以使用它实现字段投影。支持的文件类型的列投影如下所示:[text,json,csv,orc,parquet,excel]。提示:如果用户在读取 text json csv 文件时想要使用此功能,必须配置 schema 选项。 |
hdfs_site_path | string | 否 | - | hdfs-site.xml 的路径,用于加载 namenodes 的 ha 配置。 |
delimiter/field_delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认 \001 ,与 Hive 的默认分隔符相同。 |
parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径中解析分区键和值。例如,如果您从路径 hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26 读取文件,则来自文件的每条记录数据将添加这两个字段:[name:tyrantlucifer,age:26]。提示:不要在 schema 选项中定义分区字段。 |
date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持的格式如下:yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd ,默认 yyyy-MM-dd 。日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持的格式如下:yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss ,默认 yyyy-MM-dd HH:mm:ss 。 |
time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持的格式如下:HH:mm:ss HH:mm:ss.SSS ,默认 HH:mm:ss 。 |
remote_user | string | 否 | - | 用于连接 Hadoop 的登录用户。它旨在用于 RPC 中的远程用户,不会有任何凭据。 |
krb5_path | string | 否 | /etc/krb5.conf | kerberos 的 krb5 路径。 |
kerberos_principal | string | 否 | - | kerberos 的 principal。 |
kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径。 |
skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:skip_header_row_number = 2 。然后 Seatunnel 将跳过源文件中的前两行。 |
file_filter_pattern | string | 否 | - | 过滤模式,用于过滤文件。 |
filename_extension | string | 否 | - | 过滤文件扩展名, 用于过滤出指定扩展名的文件。 例如 csv .txt json .xml 。 |
null_format | string | 否 | - | 定义哪些字符串可以表示为 null,但仅适用于 txt 和 csv. 例如: \N |
schema | config | 否 | - | 上游数据的模式字段。 |
sheet_name | string | 否 | - | 读取工作簿的表格,仅在文件格式为 excel 时使用。 |
compress_codec | string | 否 | none | 文件的压缩编解码器。 |
common-options | 否 | - | 源插件通用参数,请参阅 源通用选项 获取详细信息。 |
delimiter/field_delimiter [string]
delimiter 参数在版本 2.3.5 后将被弃用,请改用 field_delimiter。
file_filter_pattern [string]
过滤模式,用于过滤文件。
这个过滤规则遵循正则表达式. 关于详情,请参考 https://en.wikipedia.org/wiki/Regular_expression 学习
这里是一些例子.
文件清单:
/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png
匹配规则:
例子 1: 匹配所有txt为后缀名的文件,匹配正则为:
/data/seatunnel/20241001/.*\.txt
匹配的结果是:
/data/seatunnel/20241001/report.txt
例子 2: 匹配所有文件名以abc开头的文件,匹配正则为:
/data/seatunnel/20241002/abc.*
匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
例子 3: 匹配所有文件名以abc开头,并且文件第四个字母是 h 或者 g 的文件, 匹配正则为:
/data/seatunnel/20241007/abc[h,g].*
匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
例子 4: 匹配所有文件夹第三级以 202410 开头并且文件后缀名是.csv的文件, 匹配正则为:
/data/seatunnel/202410\d*/.*\.csv
匹配的结果是:
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
compress_codec [string]
文件的压缩编解码器及支持的详细信息如下所示:
- txt:
lzo
none
- json:
lzo
none
- csv:
lzo
none
- orc/parquet:
自动识别压缩类型,无需额外设置。
提示
如果您使用 spark/flink,为了
使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 ${SEATUNNEL_HOME}/lib
下的 jar 包来确认这一点。
任务示例
简单示例:
此示例定义了一个 SeaTunnel 同步任务,从 Hdfs 中读取数据并将其发送到 Hdfs。
# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}
source {
HdfsFile {
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
type = "json"
fs.defaultFS = "hdfs://namenode001"
}
# 如果您想获取有关如何配置 seatunnel 和查看源插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/source
}
transform {
# 如果您想获取有关如何配置 seatunnel 和查看转换插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/category/transform-v2
}
sink {
HdfsFile {
fs.defaultFS = "hdfs://hadoopcluster"
path = "/tmp/hive/warehouse/test2"
file_format = "orc"
}
# 如果您想获取有关如何配置 seatunnel 和查看接收器插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/sink
}
Filter File
env {
parallelism = 1
job.mode = "BATCH"
}
source {
HdfsFile {
path = "/apps/hive/demo/student"
file_format_type = "json"
fs.defaultFS = "hdfs://namenode001"
file_filter_pattern = "abc[DX]*.*"
}
}
sink {
Console {
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769) | https://github.com/apache/seatunnel/commit/78b23c0ef5 | dev |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | dev |
[Feature][Connector-V2] Support create emtpy file when no data (#8543) | https://github.com/apache/seatunnel/commit/275db78918 | dev |
[Feature][Connector-V2] Support single file mode in file sink (#8518) | https://github.com/apache/seatunnel/commit/e893deed50 | dev |
[Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df47 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
[Improve][Connector-V2] Support read archive compress file (#7633) | https://github.com/apache/seatunnel/commit/3f98cd8a16 | 2.3.8 |
[Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971) | https://github.com/apache/seatunnel/commit/1a48a9c493 | 2.3.6 |
Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) | https://github.com/apache/seatunnel/commit/ec533ecd9a | 2.3.5 |
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033) | https://github.com/apache/seatunnel/commit/c324d663b4 | 2.3.4 |
[Improve][connector-file] unifiy option between file source/sink and update document (#5680) | https://github.com/apache/seatunnel/commit/8d87cf8fc4 | 2.3.4 |
[Feature] Support LZO compress on File Read (#5083) | https://github.com/apache/seatunnel/commit/a4a1901096 | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
[Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567) | https://github.com/apache/seatunnel/commit/0e02db768d | 2.3.4 |
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153) | https://github.com/apache/seatunnel/commit/a3c13e59eb | 2.3.3 |
[Feature][ConnectorV2]add file excel sink and source (#4164) | https://github.com/apache/seatunnel/commit/e3b97ae5d2 | 2.3.2 |
Change file type to file_format_type in file source/sink (#4249) | https://github.com/apache/seatunnel/commit/973a2fae3c | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Feature][Connector-V2][File] Support compress (#3899) | https://github.com/apache/seatunnel/commit/55602f6b1c | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
[Improve][Connector-V2][File] Improve file connector option rule and document (#3812) | https://github.com/apache/seatunnel/commit/bd76077669 | 2.3.1 |
[Feature][Shade] Add seatunnel hadoop3 uber (#3755) | https://github.com/apache/seatunnel/commit/5a024bdf8f | 2.3.0 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
[Feature][Connector-V2][File] Add option and factory for file connectors (#3375) | https://github.com/apache/seatunnel/commit/db286e8631 | 2.3.0 |
[Improve][Connector] Improve write parquet (#2943) | https://github.com/apache/seatunnel/commit/8fd966394b | 2.3.0-beta |
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845) | https://github.com/apache/seatunnel/commit/61720306e7 | 2.2.0-beta |
[Improve][Connector-V2] Improve read parquet (#2841) | https://github.com/apache/seatunnel/commit/e19bc82f9b | 2.2.0-beta |
[Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701) | https://github.com/apache/seatunnel/commit/6129c02567 | 2.2.0-beta |
[#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
[chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538) | https://github.com/apache/seatunnel/commit/7dc2a27388 | 2.2.0-beta |
[Feature][Connector-V2] Add hdfs file json support (#2451) | https://github.com/apache/seatunnel/commit/84f6b17c15 | 2.2.0-beta |
[Improve][Connector-V2] Refactor the package of hdfs file connector (#2402) | https://github.com/apache/seatunnel/commit/87d0624c5b | 2.2.0-beta |
[Feature][Connector-V2] Add hdfs file source connector (#2420) | https://github.com/apache/seatunnel/commit/4fb6f2a216 | 2.2.0-beta |
[Feature][Connector-V2] Add json file sink & json format (#2385) | https://github.com/apache/seatunnel/commit/dd68c06b0a | 2.2.0-beta |
[Imporve][Connector-V2] Remove redundant type judge logic because of pr #2315 (#2370) | https://github.com/apache/seatunnel/commit/42e8c25e50 | 2.2.0-beta |
[Feature][Connector-V2] Support orc file format in file connector (#2369) | https://github.com/apache/seatunnel/commit/f44fe1e033 | 2.2.0-beta |
[improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3e | 2.2.0-beta |
[Connector-V2] Add parquet writer in file connector (#2273) | https://github.com/apache/seatunnel/commit/c95cc72cfa | 2.2.0-beta |
[checkstyle] Improved validation scope of MagicNumber (#2194) | https://github.com/apache/seatunnel/commit/6d08b5f369 | 2.2.0-beta |
[Connector-V2] Add Hive sink connector v2 (#2158) | https://github.com/apache/seatunnel/commit/23ad4ee735 | 2.2.0-beta |
[Connector-V2] Add File Sink Connector (#2117) | https://github.com/apache/seatunnel/commit/e2283da64f | 2.2.0-beta |