Hdfs文件
Hdfs文件 数据接收器
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
默认情况下,我们使用2PC提交来确保"精确一次"
-  文件格式类型
- 文本
 - CSV
 - Parquet
 - ORC
 - JSON
 - Excel
 
 -  压缩编解码器
- lzo
 
 
描述
将数据输出到Hdfs文件
支持的数据源信息
| 数据源 | 支持的版本 | 
|---|---|
| Hdfs文件 | hadoop 2.x 和 3.x | 
接收器选项
| 名称 | 类型 | 是否必须 | 默认值 | 描述 | 
|---|---|---|---|---|
| fs.defaultFS | string | 是 | - | 以 hdfs:// 开头的 Hadoop 集群地址,例如:hdfs://hadoopcluster | 
| path | string | 是 | - | 目标目录路径是必需的。 | 
| tmp_path | string | 是 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 mv 命令将临时目录提交到目标目录。需要一个Hdfs路径。 | 
| hdfs_site_path | string | 否 | - | hdfs-site.xml 的路径,用于加载 namenodes 的 ha 配置。 | 
| custom_filename | boolean | 否 | false | 是否需要自定义文件名 | 
| file_name_expression | string | 否 | "${transactionId}" | 仅在 custom_filename 为 true 时使用。file_name_expression 描述将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now} 或 ${uuid},例如 test_${uuid}_${now},${now} 表示当前时间,其格式可以通过指定选项 filename_time_format 来定义。请注意,如果 is_enable_transaction 为 true,我们将在文件头部自动添加 ${transactionId}_。 | 
| filename_time_format | string | 否 | "yyyy.MM.dd" | 仅在 custom_filename 为 true 时使用。当 file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式如下所示:[y:年,M:月,d:月中的一天,H:一天中的小时(0-23),m:小时中的分钟,s:分钟中的秒] | 
| file_format_type | string | 否 | "csv" | 我们支持以下文件类型:text json csv orc parquet excel。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 txt。 | 
| filename_extension | string | 否 | - | 使用自定义的文件扩展名覆盖默认的文件扩展名。 例如:.xml, .json, dat, .customtype | 
| field_delimiter | string | 否 | '\001' | 仅在 file_format 为 text 时使用,数据行中列之间的分隔符。仅需要 text 文件格式。 | 
| row_delimiter | string | 否 | "\n" | 仅在 file_format 为 text 时使用,文件中行之间的分隔符。仅需要 text 文件格式。 | 
| have_partition | boolean | 否 | false | 是否需要处理分区。 | 
| partition_by | array | 否 | - | 仅在 have_partition 为 true 时使用,根据选定的字段对数据进行分区。 | 
| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 仅在 have_partition 为 true 时使用,如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。默认 partition_dir_expression 为 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/。k0 是第一个分区字段,v0 是第一个分区字段的值。 | 
| is_partition_field_write_in_file | boolean | 否 | false | 仅当 have_partition 为 true 时使用。如果 is_partition_field_write_in_file 为 true,则分区字段及其值将写入数据文件中。例如,如果要写入Hive数据文件,则其值应为 false。 | 
| sink_columns | array | 否 | 当此参数为空时,所有字段都是接收器列。需要写入文件的列,默认值是从 Transform 或 Source 获取的所有列。字段的顺序确定了实际写入文件时的顺序。 | |
| is_enable_transaction | boolean | 否 | true | 如果 is_enable_transaction 为 true,则在将数据写入目标目录时,我们将确保数据不会丢失或重复。请注意,如果 is_enable_transaction 为 true,我们将在文件头部自动添加 ${transactionId}_。目前仅支持 true。 | 
| batch_size | int | 否 | 1000000 | 文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size 和 checkpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,则接收器写入器将在文件中写入行,直到文件中的行大于 batch_size。如果 checkpoint.interval 很小,则接收器写入器将在新检查点触发时创建一个新文件。 | 
| single_file_mode | boolean | 否 | false | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。 | 
| create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,依然生成对应的数据文件。 | 
| compress_codec | string | 否 | none | 文件的压缩编解码器及其支持的细节如下所示:[txt: lzo none,json: lzo none,csv: lzo none,orc: lzo snappy lz4 zlib none,parquet: lzo snappy lz4 gzip brotli zstd none]。提示:excel类型不支持任何压缩格式。 | 
| krb5_path | string | 否 | /etc/krb5.conf | kerberos 的 krb5 路径 | 
| kerberos_principal | string | 否 | - | kerberos 的主体 | 
| kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径 | 
| compress_codec | string | 否 | none | 压缩编解码器 | 
| common-options | object | 否 | - | 接收器插件通用参数,请参阅 接收器通用选项 了解详情 | 
| csv_string_quote_mode | enum | 否 | MINIMAL | 仅在文件格式为 CSV 时使用。 | 
| enable_header_write | boolean | 否 | false | 仅在 file_format_type 为 text,csv 时使用。 false:不写入表头,true:写入表头。  | 
| max_rows_in_memory | int | 否 | - | 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。 | 
| sheet_name | string | 否 | Sheet${Random number} | 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名 | 
| remote_user | string | 否 | - | Hdfs的远端用户名。 | 
提示
如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查
${SEATUNNEL_HOME}/lib下的 jar 包来确认这一点。
任务示例
简单示例:
此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并将其发送到 Hdfs。
# 定义运行时环境
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  # 这是一个示例源插件 **仅用于测试和演示功能源插件**
  FakeSource {
    parallelism = 1
    plugin_output = "fake"
    row.num = 16
    schema = {
      fields {
        c_map = "map<string, smallint>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
  # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的源端插件列表,
  # 请访问 https://seatunnel.apache.org/docs/connector-v2/source
}
transform {
  # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的转换插件列表,
    # 请访问 https://seatunnel.apache.org/docs/category/transform-v2
}
sink {
    HdfsFile {
      fs.defaultFS = "hdfs://hadoopcluster"
      path = "/tmp/hive/warehouse/test2"
      file_format_type = "orc"
    }
  # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的接收器插件列表,
  # 请访问 https://seatunnel.apache.org/docs/connector-v2/sink
}
orc 文件格式的简单配置
HdfsFile {
    fs.defaultFS = "hdfs://hadoopcluster"
    path = "/tmp/hive/warehouse/test2"
    file_format_type = "orc"
}
text 文件格式的配置,包括 have_partition、custom_filename 和 sink_columns
HdfsFile {
    fs.defaultFS = "hdfs://hadoopcluster"
    path = "/tmp/hive/warehouse/test2"
    file_format_type = "text"
    field_delimiter = "\t"
    row_delimiter = "\n"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    custom_filename = true
    file_name_expression = "${transactionId}_${now}"
    filename_time_format = "yyyy.MM.dd"
    sink_columns = ["name","age"]
    is_enable_transaction = true
}
parquet 文件格式的配置,包括 have_partition、custom_filename 和 sink_columns
HdfsFile {
    fs.defaultFS = "hdfs://hadoopcluster"
    path = "/tmp/hive/warehouse/test2"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    custom_filename = true
    file_name_expression = "${transactionId}_${now}"
    filename_time_format = "yyyy.MM.dd"
    file_format_type = "parquet"
    sink_columns = ["name","age"]
    is_enable_transaction = true
}
enable_header_write [boolean]
仅在 file_format_type 为 text,csv 时使用。false:不写入表头,true:写入表头。
csv_string_quote_mode [string]
当文件格式为 CSV 时,CSV 的字符串引号模式。
- ALL:所有字符串字段都会加引号。
 - MINIMAL:仅为包含特殊字符(如字段分隔符、引号字符或行分隔符字符串中的任何字符)的字段加引号。
 - NONE:从不为字段加引号。当数据中包含分隔符时,输出会在前面加上转义字符。如果未设置转义字符,则格式验证会抛出异常。
 
kerberos 的简单配置
HdfsFile {
    fs.defaultFS = "hdfs://hadoopcluster"
    path = "/tmp/hive/warehouse/test2"
    hdfs_site_path = "/path/to/your/hdfs_site_path"
    kerberos_principal = "your_principal@EXAMPLE.COM"
    kerberos_keytab_path = "/path/to/your/keytab/file.keytab"
}
压缩的简单配置
HdfsFile {
    fs.defaultFS = "hdfs://hadoopcluster"
    path = "/tmp/hive/warehouse/test2"
    compress_codec = "lzo"
}
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| Revert " [improve] update localfile connector config" (#9018) | https://github.com/apache/seatunnel/commit/cdc79e13a | 2.3.10 | 
| [improve] update localfile connector config (#8765) | https://github.com/apache/seatunnel/commit/def369a85 | 2.3.10 | 
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769) | https://github.com/apache/seatunnel/commit/78b23c0ef | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 | 
| [Feature][Connector-V2] Support create emtpy file when no data (#8543) | https://github.com/apache/seatunnel/commit/275db7891 | 2.3.10 | 
| [Feature][Connector-V2] Support single file mode in file sink (#8518) | https://github.com/apache/seatunnel/commit/e893deed5 | 2.3.10 | 
| [Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df4 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Improve][Connector-V2] Support read archive compress file (#7633) | https://github.com/apache/seatunnel/commit/3f98cd8a1 | 2.3.8 | 
| [Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971) | https://github.com/apache/seatunnel/commit/1a48a9c49 | 2.3.6 | 
| Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) | https://github.com/apache/seatunnel/commit/ec533ecd9 | 2.3.5 | 
| [Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033) | https://github.com/apache/seatunnel/commit/c324d663b | 2.3.4 | 
| [Improve][connector-file] unifiy option between file source/sink and update document (#5680) | https://github.com/apache/seatunnel/commit/8d87cf8fc | 2.3.4 | 
[Feature] Support LZO compress on File Read (#5083) | https://github.com/apache/seatunnel/commit/a4a190109 | 2.3.4 | 
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 | 
| [Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567) | https://github.com/apache/seatunnel/commit/0e02db768 | 2.3.4 | 
| [Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153) | https://github.com/apache/seatunnel/commit/a3c13e59e | 2.3.3 | 
| [Feature][ConnectorV2]add file excel sink and source (#4164) | https://github.com/apache/seatunnel/commit/e3b97ae5d | 2.3.2 | 
| Change file type to file_format_type in file source/sink (#4249) | https://github.com/apache/seatunnel/commit/973a2fae3 | 2.3.1 | 
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 | 
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 | 
| [Feature][Connector-V2][File] Support compress (#3899) | https://github.com/apache/seatunnel/commit/55602f6b1 | 2.3.1 | 
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 | 
| [Improve][Connector-V2][File] Improve file connector option rule and document (#3812) | https://github.com/apache/seatunnel/commit/bd7607766 | 2.3.1 | 
| [Feature][Shade] Add seatunnel hadoop3 uber (#3755) | https://github.com/apache/seatunnel/commit/5a024bdf8 | 2.3.0 | 
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 | 
| [Feature][Connector-V2][File] Add option and factory for file connectors (#3375) | https://github.com/apache/seatunnel/commit/db286e863 | 2.3.0 | 
| [Improve][Connector] Improve write parquet (#2943) | https://github.com/apache/seatunnel/commit/8fd966394 | 2.3.0-beta | 
| [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845) | https://github.com/apache/seatunnel/commit/61720306e | 2.2.0-beta | 
| [Improve][Connector-V2] Improve read parquet (#2841) | https://github.com/apache/seatunnel/commit/e19bc82f9 | 2.2.0-beta | 
| [Improve][Connector-V2] Refactor hdfs file sink connector code structure (#2701) | https://github.com/apache/seatunnel/commit/6129c0256 | 2.2.0-beta | 
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta | 
| [chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538) | https://github.com/apache/seatunnel/commit/7dc2a2738 | 2.2.0-beta | 
| [Feature][Connector-V2] Add hdfs file json support (#2451) | https://github.com/apache/seatunnel/commit/84f6b17c1 | 2.2.0-beta | 
| [Improve][Connector-V2] Refactor the package of hdfs file connector (#2402) | https://github.com/apache/seatunnel/commit/87d0624c5 | 2.2.0-beta | 
| [Feature][Connector-V2] Add hdfs file source connector (#2420) | https://github.com/apache/seatunnel/commit/4fb6f2a21 | 2.2.0-beta | 
| [Feature][Connector-V2] Add json file sink & json format (#2385) | https://github.com/apache/seatunnel/commit/dd68c06b0 | 2.2.0-beta | 
| [Imporve][Connector-V2] Remove redundant type judge logic because of pr #2315 (#2370) | https://github.com/apache/seatunnel/commit/42e8c25e5 | 2.2.0-beta | 
| [Feature][Connector-V2] Support orc file format in file connector (#2369) | https://github.com/apache/seatunnel/commit/f44fe1e03 | 2.2.0-beta | 
| [improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3 | 2.2.0-beta | 
| [Connector-V2] Add parquet writer in file connector (#2273) | https://github.com/apache/seatunnel/commit/c95cc72cf | 2.2.0-beta | 
| [checkstyle] Improved validation scope of MagicNumber (#2194) | https://github.com/apache/seatunnel/commit/6d08b5f36 | 2.2.0-beta | 
| [Connector-V2] Add Hive sink connector v2 (#2158) | https://github.com/apache/seatunnel/commit/23ad4ee73 | 2.2.0-beta | 
| [Connector-V2] Add File Sink Connector (#2117) | https://github.com/apache/seatunnel/commit/e2283da64 | 2.2.0-beta |