OssFile
Oss 文件 sink 连接器
支持引擎
Spark
Flink
SeaTunnel Zeta
使用依赖性
适用于Spark/Flink引擎
- 您必须确保您的spark/flink集群已经集成了hadoop。测试的hadoop版本是2.x。
 - 您必须确保
${SEATUNNEL_HOME}/plugins/目录中的hadoop-aliyun-xx.jar,aliyun-sdk-oss-xx.jar和jdom-xx.jar的版本与您在spark/flink中使用的hadoop版本匹配,aliyun-sdk-oss-x.x.jar和jdom-xx.jar版本需要与hadoop-aliyun版本对应的版本。例如:hadoop-aliyun-3.1.4.jar依赖项aliyun-sdk-oss-3.4.1.jar和jdom-1.1.jar。 
适用于SeaTunnel Zeta引擎
- 您必须确保在
${seatunnel_HOME}/lib/目录中有seatunnel-hadopp3-3.1.4-uber.jar、aliyun-sdk-oss-3.4.1.jar、hadoop-aliyun-3.1.4.jar和jdom-1.1.jar。 
关键特性
默认情况下,我们使用2PC commit来确保精确一次
-  文件格式类型
- text
 - csv
 - parquet
 - orc
 - json
 - excel
 - xml
 - binary
 
 
数据类型映射
如果写入csv、text文件类型,则所有列将为字符串。
Orc 文件类型
| SeaTunnel 数据类型 | Orc 数据类型 | 
|---|---|
| STRING | STRING | 
| BOOLEAN | BOOLEAN | 
| TINYINT | BYTE | 
| SMALLINT | SHORT | 
| INT | INT | 
| BIGINT | LONG | 
| FLOAT | FLOAT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| DECIMAL | DECIMAL | 
| BYTES | BINARY | 
| DATE | DATE | 
| TIME  TIMESTAMP  | TIMESTAMP | 
| ROW | STRUCT | 
| NULL | 不支持的数据类型 | 
| ARRAY | LIST | 
| Map | Map | 
Parquet 文件类型
| SeaTunnel 数据类型 | Parquet 数据类型 | 
|---|---|
| STRING | STRING | 
| BOOLEAN | BOOLEAN | 
| TINYINT | INT_8 | 
| SMALLINT | INT_16 | 
| INT | INT32 | 
| BIGINT | INT64 | 
| FLOAT | FLOAT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| DECIMAL | DECIMAL | 
| BYTES | BINARY | 
| DATE | DATE | 
| TIME  TIMESTAMP  | TIMESTAMP_MILLIS | 
| ROW | GroupType | 
| NULL | 不支持的数据类型 | 
| ARRAY | LIST | 
| Map | Map | 
选项
| 名称 | 类型 | 必需 | 默认值 | 描述 | 
|---|---|---|---|---|
| path | string | 是 | 写入文件的oss路径。 | |
| tmp_path | string | 否 | /tmp/seatunnel | 结果文件将首先写入tmp路径,然后使用mv将tmp-dir提交到目标dir。因此需要一个OSS目录。 | 
| bucket | string | 是 | - | |
| access_key | string | 是 | - | |
| access_secret | string | 是 | - | |
| endpoint | string | 是 | - | |
| custom_filename | boolean | 否 | false | 是否需要自定义文件名 | 
| file_name_expression | string | 否 | "${transactionId}" | 仅在custom_filename为true时使用 | 
| filename_time_format | string | 否 | "yyyy.MM.dd" | 仅在custom_filename为true时使用 | 
| file_format_type | string | 否 | "csv" | |
| field_delimiter | string | 否 | '\001' | 仅当file_format_type为文本时使用 | 
| row_delimiter | string | 否 | "\n" | 仅当file_format_type为文本时使用 | 
| have_partition | boolean | 否 | false | 是否需要处理分区。 | 
| partition_by | array | 否 | - | 只有在have_partition为true时才使用 | 
| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 只有在have_partition为true时才使用 | 
| is_partition_field_write_in_file | boolean | 否 | false | 只有在have_partition为true时才使用 | 
| sink_columns | array | 否 | 当此参数为空时,所有字段都是接收列 | |
| is_enable_transaction | boolean | 否 | true | |
| batch_size | int | 否 | 1000000 | |
| compress_codec | string | 否 | none | |
| common-options | object | 否 | - | |
| max_rows_in_memory | int | 否 | - | 仅当file_format_type为excel时使用。 | 
| sheet_name | string | 否 | Sheet${Random number} | 仅当file_format_type为excel时使用。 | 
| csv_string_quote_mode | enum | 否 | MINIMAL | 仅在file_format为csv时使用。 | 
| xml_root_tag | string | 否 | RECORDS | 仅在file_format为xml时使用。 | 
| xml_row_tag | string | 否 | RECORD | 仅在file_format为xml时使用。 | 
| xml_use_attr_format | boolean | 否 | - | 仅在file_format为xml时使用。 | 
| single_file_mode | boolean | 否 | false | 每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀。 | 
| create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,仍然会生成相应的数据文件。 | 
| parquet_avro_write_timestamp_as_int96 | boolean | 否 | false | 仅在file_format为parquet时使用。 | 
| parquet_avro_write_fixed_as_int96 | array | 否 | - | 仅在file_format为parquet时使用。 | 
| enable_header_write | boolean | 否 | false | 仅当file_format_type为文本、csv时使用 false:不写标头,true:写标头。  | 
| encoding | string | 否 | "UTF-8" | 仅当file_format_type为json、text、csv、xml时使用。 | 
path [string]
目标目录路径是必需的。
bucket [string]
oss文件系统的bucket地址,例如:oss://tyrantlucifer-image-bed
access_key [string]
oss文件系统的access_key。
access_secret [string]
oss文件系统的access_secret。
endpoint [string]
oss文件系统的endpoint端点。
custom_filename [boolean]
是否自定义文件名
file_name_expression [string]
仅在custom_filename为true时使用
file_name_expression描述了将在path中创建的文件表达式。我们可以在filename_expression中添加变量${now}或${uuid},类似于test${uuid}_${now},${now}表示当前时间,其格式可以通过指定选项filename_time_format`来定义。
请注意,如果is_enable_transaction为true,我们将自动添加${transactionId}_在文件的开头。
filename_time_format [String]
仅在custom_filename为true时使用`
当file_name_expression参数中的格式为xxxx-${Now}时,filename_time_format可以指定路径的时间格式,默认值为yyyy.MM.dd。常用的时间格式如下:
| Symbol | Description | 
|---|---|
| y | Year | 
| M | Month | 
| d | Day of month | 
| H | Hour in day (0-23) | 
| m | Minute in hour | 
| s | Second in minute | 
file_format_type [string]
我们支持以下文件类型:
text csv parquet orc json excel xml binary
请注意,最终文件名将以file_format_type的后缀结尾,文本文件的后缀为txt。
field_delimiter [string]
数据行中列之间的分隔符。只需要文本文件格式。
row_delimiter [string]
文件中行之间的分隔符。只需要text文件格式。
have_partition [boolean]
是否需要处理分区。
partition_by [array]
仅当have_partition为true时使用。
根据所选字段对数据进行分区。
partition_dir_expression [string]
仅在have_partition为true时使用。
如果指定了partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。
默认的partition_dir_expression是`${k0}=${v0}/${k1}=${1v1}//${kn}=${vn}/``k0是第一个分区字段,v0是第一个划分字段的值。
is_partition_field_write_in_file [boolean]
仅在have_partition为true时使用。
如果is_partition_field_write_in_file为true,则分区字段及其值将写入数据文件。
例如,如果你想写一个Hive数据文件,它的值应该是false。
sink_columns [array]
哪些列需要写入文件,默认值是从Transform或Source获取的所有列。
字段的顺序决定了文件实际写入的顺序。
is_enable_transaction [boolean]
如果is_enable_transaction为true,我们将确保数据在写入目标目录时不会丢失或重复。
请注意,如果is_enable_transaction为true,我们将自动添加${transactionId}_在文件的开头。
现在只支持true。
batch_size [int]
文件中的最大行数。对于SeaTunnel引擎,文件中的行数由batch_size和checkpoint.interval共同决定。如果checkpoint.interval的值足够大,sink writer将在文件中写入行,直到文件中的行大于batch_size。如果checkpoint.interval较小,则接收器写入程序将在新的检查点触发时创建一个新文件。
compress_codec [string]
文件的压缩编解码器和支持的详细信息如下所示:
- txt: 
lzonone - json: 
lzonone - csv: 
lzonone - orc: 
lzosnappylz4zlibnone - parquet: 
lzosnappylz4gzipbrotlizstdnone 
提示:excel类型不支持任何压缩格式
通用选项
Sink插件常用参数,请参考[Sink common Options](../Sink common Options.md)了解详细信息。
max_rows_in_memory [int]
当文件格式为Excel时,内存中可以缓存的最大数据项数。
sheet_name [string]
编写工作簿的工作表
csv_string_quote_mode [string]
当文件格式为CSV时,CSV的字符串引用模式。
- ALL: 所有字符串字段都将被引用。
 - MINIMAL: 引号字段包含特殊字符,如字段分隔符、引号字符或行分隔符字符串中的任何字符。
 - NONE: 从不引用字段。当分隔符出现在数据中时,打印机会用转义符作为前缀。如果未设置转义符,格式验证将抛出异常。
 
xml_root_tag [string]
指定XML文件中根元素的标记名。
xml_row_tag [string]
指定XML文件中数据行的标记名称。
xml_use_attr_format [boolean]
指定是否使用标记属性格式处理数据。
parquet_avro_write_timestamp_as_int96 [boolean]
支持从时间戳写入Parquet INT96,仅适用于拼花地板文件。
parquet_avro_write_fixed_as_int96 [array]
支持从12-byte字段写入Parquet INT96,仅适用于拼花地板文件。
encoding [string]
仅当file_format_type为json、text、csv、xml时使用。
要写入的文件的编码。此参数将由Charset.forName(encoding)解析。
如何创建Oss数据同步作业
以下示例演示了如何创建从假数据源读取数据并写入的数据同步作业 把它发送到Oss:
对于具有have_partition、custom_filename和sink_columns的文本文件格式
# 设置要执行的任务的基本配置
env {
  parallelism = 1
  job.mode = "BATCH"
}
# 创建产品数据源
source {
  FakeSource {
    schema = {
      fields {
        name = string
        age = int
      }
    }
  }
}
# 将数据写入Oss
sink {
  OssFile {
    path="/seatunnel/sink"
    bucket = "oss://tyrantlucifer-image-bed"
    access_key = "xxxxxxxxxxx"
    access_secret = "xxxxxxxxxxx"
    endpoint = "oss-cn-beijing.aliyuncs.com"
    file_format_type = "text"
    field_delimiter = "\t"
    row_delimiter = "\n"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    custom_filename = true
    file_name_expression = "${transactionId}_${now}"
    filename_time_format = "yyyy.MM.dd"
    sink_columns = ["name","age"]
    is_enable_transaction = true
  }
}
适用于带有have_partition和sink_columns的parquet文件格式
# 设置要执行的任务的基本配置
env {
  parallelism = 1
  job.mode = "BATCH"
}
# Create a source to product data
source {
  FakeSource {
    schema = {
      fields {
        name = string
        age = int
      }
    }
  }
}
# 将数据写入Oss
sink {
  OssFile {
    path = "/seatunnel/sink"
    bucket = "oss://tyrantlucifer-image-bed"
    access_key = "xxxxxxxxxxx"
    access_secret = "xxxxxxxxxxxxxxxxx"
    endpoint = "oss-cn-beijing.aliyuncs.com"
    have_partition = true
    partition_by = ["age"]
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_format_type = "parquet"
    sink_columns = ["name","age"]
  }
}
对于orc文件格式的简单配置
# 设置要执行的任务的基本配置
env {
  parallelism = 1
  job.mode = "BATCH"
}
# Create a source to product data
source {
  FakeSource {
    schema = {
      fields {
        name = string
        age = int
      }
    }
  }
}
# 将数据写入Oss
sink {
  OssFile {
    path="/seatunnel/sink"
    bucket = "oss://tyrantlucifer-image-bed"
    access_key = "xxxxxxxxxxx"
    access_secret = "xxxxxxxxxxx"
    endpoint = "oss-cn-beijing.aliyuncs.com"
    file_format_type = "orc"
  }
}
enable_header_write [boolean]
仅当file_format_type为text csv时使用。false:不写标头,true:写标头。
多表
用于从上游提取source元数据, 您可以在路径中使用${database_name}, ${table_name} 和 ${schema_name}。
env {
  parallelism = 1
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
  job.mode = "BATCH"
}
source {
  FakeSource {
    tables_configs = [
       {
        schema = {
          table = "fake1"
          fields {
            c_map = "map<string, string>"
            c_array = "array<int>"
            c_string = string
            c_boolean = boolean
            c_tinyint = tinyint
            c_smallint = smallint
            c_int = int
            c_bigint = bigint
            c_float = float
            c_double = double
            c_bytes = bytes
            c_date = date
            c_decimal = "decimal(38, 18)"
            c_timestamp = timestamp
            c_row = {
              c_map = "map<string, string>"
              c_array = "array<int>"
              c_string = string
              c_boolean = boolean
              c_tinyint = tinyint
              c_smallint = smallint
              c_int = int
              c_bigint = bigint
              c_float = float
              c_double = double
              c_bytes = bytes
              c_date = date
              c_decimal = "decimal(38, 18)"
              c_timestamp = timestamp
            }
          }
        }
       },
       {
       schema = {
         table = "fake2"
         fields {
           c_map = "map<string, string>"
           c_array = "array<int>"
           c_string = string
           c_boolean = boolean
           c_tinyint = tinyint
           c_smallint = smallint
           c_int = int
           c_bigint = bigint
           c_float = float
           c_double = double
           c_bytes = bytes
           c_date = date
           c_decimal = "decimal(38, 18)"
           c_timestamp = timestamp
           c_row = {
             c_map = "map<string, string>"
             c_array = "array<int>"
             c_string = string
             c_boolean = boolean
             c_tinyint = tinyint
             c_smallint = smallint
             c_int = int
             c_bigint = bigint
             c_float = float
             c_double = double
             c_bytes = bytes
             c_date = date
             c_decimal = "decimal(38, 18)"
             c_timestamp = timestamp
           }
         }
       }
      }
    ]
  }
}
sink {
  OssFile {
    bucket = "oss://whale-ops"
    access_key = "xxxxxxxxxxxxxxxxxxx"
    access_secret = "xxxxxxxxxxxxxxxxxxx"
    endpoint = "https://oss-accelerate.aliyuncs.com"
    path = "/tmp/fake_empty/text/${table_name}"
    row_delimiter = "\n"
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_name_expression = "${transactionId}_${now}"
    file_format_type = "text"
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
    compress_codec = "lzo"
  }
}
提示
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| Revert " [improve] update localfile connector config" (#9018) | https://github.com/apache/seatunnel/commit/cdc79e13a | 2.3.10 | 
| [improve] update localfile connector config (#8765) | https://github.com/apache/seatunnel/commit/def369a85 | 2.3.10 | 
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769) | https://github.com/apache/seatunnel/commit/78b23c0ef | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 | 
| [Feature][Connector-V2] Support create emtpy file when no data (#8543) | https://github.com/apache/seatunnel/commit/275db7891 | 2.3.10 | 
| [Feature][Connector-V2] Support single file mode in file sink (#8518) | https://github.com/apache/seatunnel/commit/e893deed5 | 2.3.10 | 
| [Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df4 | 2.3.9 | 
| [Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d66 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Improve][Connector-V2] Support read archive compress file (#7633) | https://github.com/apache/seatunnel/commit/3f98cd8a1 | 2.3.8 | 
| [Improve] Added OSSFileCatalog and it's factory (#7458) | https://github.com/apache/seatunnel/commit/9006a205d | 2.3.8 | 
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446 | 2.3.7 | 
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122 | 2.3.6 | 
| [Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971) | https://github.com/apache/seatunnel/commit/1a48a9c49 | 2.3.6 | 
[Chore] Fix file spell errors (#6606) | https://github.com/apache/seatunnel/commit/2599d3b73 | 2.3.5 | 
| [Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a | 2.3.5 | 
| Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) | https://github.com/apache/seatunnel/commit/ec533ecd9 | 2.3.5 | 
| [Feature][OssFile Connector] Make Oss implement source factory and sink factory (#6062) | https://github.com/apache/seatunnel/commit/1a8e9b455 | 2.3.4 | 
| [Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033) | https://github.com/apache/seatunnel/commit/c324d663b | 2.3.4 | 
| [Hotfix][Oss File Connector] fix oss connector can not run bug (#6010) | https://github.com/apache/seatunnel/commit/755bc2a73 | 2.3.4 | 
| Support using multiple hadoop account (#5903) | https://github.com/apache/seatunnel/commit/d69d88d1a | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 | 
| [Improve][connector-file] unifiy option between file source/sink and update document (#5680) | https://github.com/apache/seatunnel/commit/8d87cf8fc | 2.3.4 | 
[Feature] Support LZO compress on File Read (#5083) | https://github.com/apache/seatunnel/commit/a4a190109 | 2.3.4 | 
| [Feature][Connector-V2][File] Support read empty directory (#5591) | https://github.com/apache/seatunnel/commit/1f58f224a | 2.3.4 | 
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 | 
| [Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567) | https://github.com/apache/seatunnel/commit/0e02db768 | 2.3.4 | 
| [Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153) | https://github.com/apache/seatunnel/commit/a3c13e59e | 2.3.3 | 
| [Fix][Connector-V2] Fix file-oss config check bug and amend file-oss-jindo factoryIdentifier (#4581) | https://github.com/apache/seatunnel/commit/5c4f17df2 | 2.3.2 | 
| [Feature][ConnectorV2]add file excel sink and source (#4164) | https://github.com/apache/seatunnel/commit/e3b97ae5d | 2.3.2 | 
| Change file type to file_format_type in file source/sink (#4249) | https://github.com/apache/seatunnel/commit/973a2fae3 | 2.3.1 | 
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 | 
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 | 
| [Feature][Connector-V2][File] Support compress (#3899) | https://github.com/apache/seatunnel/commit/55602f6b1 | 2.3.1 | 
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 | 
| [Improve][Connector-V2][File] Improve file connector option rule and document (#3812) | https://github.com/apache/seatunnel/commit/bd7607766 | 2.3.1 | 
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 | 
| [Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525) | https://github.com/apache/seatunnel/commit/031e8e263 | 2.3.0 | 
| [Feature][Connector-V2][File] Add option and factory for file connectors (#3375) | https://github.com/apache/seatunnel/commit/db286e863 | 2.3.0 | 
| [Improve][Connector-V2][File] Improve code structure (#3238) | https://github.com/apache/seatunnel/commit/dd5c35388 | 2.3.0 | 
| [Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f2 | 2.3.0 | 
| [Improve][Connector-V2][File] Support parse field from file path (#2985) | https://github.com/apache/seatunnel/commit/0bc12085c | 2.3.0-beta | 
| [Improve][connector][file] Support user-defined schema for reading text file (#2976) | https://github.com/apache/seatunnel/commit/1c05ee0d7 | 2.3.0-beta | 
| [Improve][Connector] Improve write parquet (#2943) | https://github.com/apache/seatunnel/commit/8fd966394 | 2.3.0-beta | 
| [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845) | https://github.com/apache/seatunnel/commit/61720306e | 2.2.0-beta | 
| [Improve][Connector-V2] Improve read parquet (#2841) | https://github.com/apache/seatunnel/commit/e19bc82f9 | 2.2.0-beta | 
| [Feature][Connector-V2] Add oss sink (#2629) | https://github.com/apache/seatunnel/commit/bb2ad4048 | 2.2.0-beta | 
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta | 
| [chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538) | https://github.com/apache/seatunnel/commit/7dc2a2738 | 2.2.0-beta | 
| [Feature][Connector-V2] Add oss source connector (#2467) | https://github.com/apache/seatunnel/commit/712b77744 | 2.2.0-beta |