跳到主要内容
版本:Next

Hdfs文件

Hdfs文件 数据源连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

在一次 pollNext 调用中读取分片中的所有数据。将读取的分片保存在快照中。

描述

从Hdfs文件系统中读取数据。

支持的数据源信息

数据源支持的版本
Hdfs文件hadoop 2.x 和 3.x

源选项

名称类型是否必须默认值描述
pathstring-源文件路径。
file_format_typestring-我们支持以下文件类型:text json csv orc parquet excel。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 txt
fs.defaultFSstring-hdfs:// 开头的 Hadoop 集群地址,例如:hdfs://hadoopcluster
read_columnslist-数据源的读取列列表,用户可以使用它实现字段投影。支持的文件类型的列投影如下所示:[text,json,csv,orc,parquet,excel]。提示:如果用户在读取 text json csv 文件时想要使用此功能,必须配置 schema 选项。
hdfs_site_pathstring-hdfs-site.xml 的路径,用于加载 namenodes 的 ha 配置。
delimiter/field_delimiterstring\001字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认 \001,与 Hive 的默认分隔符相同。
parse_partition_from_pathbooleantrue控制是否从文件路径中解析分区键和值。例如,如果您从路径 hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26 读取文件,则来自文件的每条记录数据将添加这两个字段:[name:tyrantlucifer,age:26]。提示:不要在 schema 选项中定义分区字段。
date_formatstringyyyy-MM-dd日期类型格式,用于告诉连接器如何将字符串转换为日期,支持的格式如下:yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd,默认 yyyy-MM-dd。日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持的格式如下:yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss,默认 yyyy-MM-dd HH:mm:ss
time_formatstringHH:mm:ss时间类型格式,用于告诉连接器如何将字符串转换为时间,支持的格式如下:HH:mm:ss HH:mm:ss.SSS,默认 HH:mm:ss
remote_userstring-用于连接 Hadoop 的登录用户。它旨在用于 RPC 中的远程用户,不会有任何凭据。
krb5_pathstring/etc/krb5.confkerberos 的 krb5 路径。
kerberos_principalstring-kerberos 的 principal。
kerberos_keytab_pathstring-kerberos 的 keytab 路径。
skip_header_row_numberlong0跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:skip_header_row_number = 2。然后 Seatunnel 将跳过源文件中的前两行。
file_filter_patternstring-过滤模式,用于过滤文件。
schemaconfig-上游数据的模式字段。
sheet_namestring-读取工作簿的表格,仅在文件格式为 excel 时使用。
compress_codecstringnone文件的压缩编解码器。
common-options-源插件通用参数,请参阅 源通用选项 获取详细信息。

delimiter/field_delimiter [string]

delimiter 参数在版本 2.3.5 后将被弃用,请改用 field_delimiter

file_filter_pattern [string]

过滤模式,用于过滤文件。

这个过滤规则遵循正则表达式. 关于详情,请参考 https://en.wikipedia.org/wiki/Regular_expression 学习

这里是一些例子.

文件清单:

/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png

匹配规则:

例子 1: 匹配所有txt为后缀名的文件,匹配正则为:

/data/seatunnel/20241001/.*\.txt

匹配的结果是:

/data/seatunnel/20241001/report.txt

例子 2: 匹配所有文件名以abc开头的文件,匹配正则为:

/data/seatunnel/20241002/abc.*

匹配的结果是:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv

例子 3: 匹配所有文件名以abc开头,并且文件第四个字母是 h 或者 g 的文件, 匹配正则为:

/data/seatunnel/20241007/abc[h,g].*

匹配的结果是:

/data/seatunnel/20241007/abch202410.csv

例子 4: 匹配所有文件夹第三级以 202410 开头并且文件后缀名是.csv的文件, 匹配正则为:

/data/seatunnel/202410\d*/.*\.csv

匹配的结果是:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv

compress_codec [string]

文件的压缩编解码器及支持的详细信息如下所示:

  • txt:lzo none
  • json:lzo none
  • csv:lzo none
  • orc/parquet:
    自动识别压缩类型,无需额外设置。

提示

如果您使用 spark/flink,为了

使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 ${SEATUNNEL_HOME}/lib 下的 jar 包来确认这一点。

任务示例

简单示例:

此示例定义了一个 SeaTunnel 同步任务,从 Hdfs 中读取数据并将其发送到 Hdfs。

# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}

source {
HdfsFile {
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
type = "json"
fs.defaultFS = "hdfs://namenode001"
}
# 如果您想获取有关如何配置 seatunnel 和查看源插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/source
}

transform {
# 如果您想获取有关如何配置 seatunnel 和查看转换插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/category/transform-v2
}

sink {
HdfsFile {
fs.defaultFS = "hdfs://hadoopcluster"
path = "/tmp/hive/warehouse/test2"
file_format = "orc"
}
# 如果您想获取有关如何配置 seatunnel 和查看接收器插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/sink
}

Filter File

env {
parallelism = 1
job.mode = "BATCH"
}

source {
HdfsFile {
path = "/apps/hive/demo/student"
file_format_type = "json"
fs.defaultFS = "hdfs://namenode001"
file_filter_pattern = "abc[DX]*.*"
}
}

sink {
Console {
}
}