跳到主要内容
版本:Next

OssFile

Oss 文件 sink 连接器

支持引擎

Spark
Flink
SeaTunnel Zeta

使用依赖性

适用于Spark/Flink引擎

  1. 您必须确保您的spark/flink集群已经集成了hadoop。测试的hadoop版本是2.x。
  2. 您必须确保${SEATUNNEL_HOME}/plugins/目录中的hadoop-aliyun-xx.jar, aliyun-sdk-oss-xx.jarjdom-xx.jar的版本与您在spark/flink中使用的hadoop版本匹配,aliyun-sdk-oss-x.x.jarjdom-xx.jar版本需要与hadoop-aliyun版本对应的版本。例如:hadoop-aliyun-3.1.4.jar依赖项aliyun-sdk-oss-3.4.1.jarjdom-1.1.jar

适用于SeaTunnel Zeta引擎

  1. 您必须确保在${seatunnel_HOME}/lib/目录中有seatunnel-hadopp3-3.1.4-uber.jar aliyun-sdk-oss-3.4.1.jar hadoop-aliyun-3.1.4.jarjdom-1.1.jar

关键特性

默认情况下,我们使用2PC commit来确保精确一次

  • 文件格式类型
    • text
    • csv
    • parquet
    • orc
    • json
    • excel
    • xml
    • binary

数据类型映射

如果写入csvtext文件类型,则所有列将为字符串。

Orc 文件类型

SeaTunnel 数据类型Orc 数据类型
STRINGSTRING
BOOLEANBOOLEAN
TINYINTBYTE
SMALLINTSHORT
INTINT
BIGINTLONG
FLOATFLOAT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BYTESBINARY
DATEDATE
TIME
TIMESTAMP
TIMESTAMP
ROWSTRUCT
NULL不支持的数据类型
ARRAYLIST
MapMap

Parquet 文件类型

SeaTunnel 数据类型Parquet 数据类型
STRINGSTRING
BOOLEANBOOLEAN
TINYINTINT_8
SMALLINTINT_16
INTINT32
BIGINTINT64
FLOATFLOAT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BYTESBINARY
DATEDATE
TIME
TIMESTAMP
TIMESTAMP_MILLIS
ROWGroupType
NULL不支持的数据类型
ARRAYLIST
MapMap

选项

名称类型必需默认值描述
pathstring写入文件的oss路径。
tmp_pathstring/tmp/seatunnel结果文件将首先写入tmp路径,然后使用mv将tmp-dir提交到目标dir。因此需要一个OSS目录。
bucketstring-
access_keystring-
access_secretstring-
endpointstring-
custom_filenamebooleanfalse是否需要自定义文件名
file_name_expressionstring"${transactionId}"仅在custom_filename为true时使用
filename_time_formatstring"yyyy.MM.dd"仅在custom_filename为true时使用
file_format_typestring"csv"
field_delimiterstring'\001'仅当file_format_type为文本时使用
row_delimiterstring"\n"仅当file_format_type为文本时使用
have_partitionbooleanfalse是否需要处理分区。
partition_byarray-只有在have_partition为true时才使用
partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"只有在have_partition为true时才使用
is_partition_field_write_in_filebooleanfalse只有在have_partition为true时才使用
sink_columnsarray当此参数为空时,所有字段都是接收列
is_enable_transactionbooleantrue
batch_sizeint1000000
compress_codecstringnone
common-optionsobject-
max_rows_in_memoryint-仅当file_format_type为excel时使用。
sheet_namestringSheet${Random number}仅当file_format_type为excel时使用。
csv_string_quote_modeenumMINIMAL仅在file_format为csv时使用。
xml_root_tagstringRECORDS仅在file_format为xml时使用。
xml_row_tagstringRECORD仅在file_format为xml时使用。
xml_use_attr_formatboolean-仅在file_format为xml时使用。
single_file_modebooleanfalse每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀。
create_empty_file_when_no_databooleanfalse当上游没有数据同步时,仍然会生成相应的数据文件。
parquet_avro_write_timestamp_as_int96booleanfalse仅在file_format为parquet时使用。
parquet_avro_write_fixed_as_int96array-仅在file_format为parquet时使用。
enable_header_writebooleanfalse仅当file_format_type为文本、csv时使用
false:不写标头,true:写标头。
encodingstring"UTF-8"仅当file_format_type为json、text、csv、xml时使用。

path [string]

目标目录路径是必需的。

bucket [string]

oss文件系统的bucket地址,例如:oss://tyrantlucifer-image-bed

access_key [string]

oss文件系统的access_key。

access_secret [string]

oss文件系统的access_secret。

endpoint [string]

oss文件系统的endpoint端点。

custom_filename [boolean]

是否自定义文件名

file_name_expression [string]

仅在custom_filenametrue时使用

file_name_expression描述了将在path中创建的文件表达式。我们可以在filename_expression中添加变量${now}${uuid},类似于test${uuid}_${now}${now}表示当前时间,其格式可以通过指定选项filename_time_format`来定义。

请注意,如果is_enable_transactiontrue,我们将自动添加${transactionId}_在文件的开头。

filename_time_format [String]

仅在custom_filenametrue时使用`

file_name_expression参数中的格式为xxxx-${Now}时,filename_time_format可以指定路径的时间格式,默认值为yyyy.MM.dd。常用的时间格式如下:

SymbolDescription
yYear
MMonth
dDay of month
HHour in day (0-23)
mMinute in hour
sSecond in minute

file_format_type [string]

我们支持以下文件类型:

text csv parquet orc json excel xml binary

请注意,最终文件名将以file_format_type的后缀结尾,文本文件的后缀为txt

field_delimiter [string]

数据行中列之间的分隔符。只需要文本文件格式。

row_delimiter [string]

文件中行之间的分隔符。只需要text文件格式。

have_partition [boolean]

是否需要处理分区。

partition_by [array]

仅当have_partitiontrue时使用。

根据所选字段对数据进行分区。

partition_dir_expression [string]

仅在have_partitiontrue时使用。

如果指定了partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。

默认的partition_dir_expression是`${k0}=${v0}/${k1}=${1v1}//${kn}=${vn}/``k0是第一个分区字段,v0是第一个划分字段的值。

is_partition_field_write_in_file [boolean]

仅在have_partitiontrue时使用。

如果is_partition_field_write_in_filetrue,则分区字段及其值将写入数据文件。

例如,如果你想写一个Hive数据文件,它的值应该是false

sink_columns [array]

哪些列需要写入文件,默认值是从TransformSource获取的所有列。 字段的顺序决定了文件实际写入的顺序。

is_enable_transaction [boolean]

如果is_enable_transaction为true,我们将确保数据在写入目标目录时不会丢失或重复。

请注意,如果is_enable_transactiontrue,我们将自动添加${transactionId}_在文件的开头。

现在只支持true

batch_size [int]

文件中的最大行数。对于SeaTunnel引擎,文件中的行数由batch_sizecheckpoint.interval共同决定。如果checkpoint.interval的值足够大,sink writer将在文件中写入行,直到文件中的行大于batch_size。如果checkpoint.interval较小,则接收器写入程序将在新的检查点触发时创建一个新文件。

compress_codec [string]

文件的压缩编解码器和支持的详细信息如下所示:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc: lzo snappy lz4 zlib none
  • parquet: lzo snappy lz4 gzip brotli zstd none

提示:excel类型不支持任何压缩格式

通用选项

Sink插件常用参数,请参考[Sink common Options](../Sink common Options.md)了解详细信息。

max_rows_in_memory [int]

当文件格式为Excel时,内存中可以缓存的最大数据项数。

sheet_name [string]

编写工作簿的工作表

csv_string_quote_mode [string]

当文件格式为CSV时,CSV的字符串引用模式。

  • ALL: 所有字符串字段都将被引用。
  • MINIMAL: 引号字段包含特殊字符,如字段分隔符、引号字符或行分隔符字符串中的任何字符。
  • NONE: 从不引用字段。当分隔符出现在数据中时,打印机会用转义符作为前缀。如果未设置转义符,格式验证将抛出异常。

xml_root_tag [string]

指定XML文件中根元素的标记名。

xml_row_tag [string]

指定XML文件中数据行的标记名称。

xml_use_attr_format [boolean]

指定是否使用标记属性格式处理数据。

parquet_avro_write_timestamp_as_int96 [boolean]

支持从时间戳写入Parquet INT96,仅适用于拼花地板文件。

parquet_avro_write_fixed_as_int96 [array]

支持从12-byte字段写入Parquet INT96,仅适用于拼花地板文件。

encoding [string]

仅当file_format_type为json、text、csv、xml时使用。 要写入的文件的编码。此参数将由Charset.forName(encoding)解析。

如何创建Oss数据同步作业

以下示例演示了如何创建从假数据源读取数据并写入的数据同步作业 把它发送到Oss:

对于具有have_partitioncustom_filenamesink_columns的文本文件格式

# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}

# 创建产品数据源
source {
FakeSource {
schema = {
fields {
name = string
age = int
}
}
}
}

# 将数据写入Oss
sink {
OssFile {
path="/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
}
}

适用于带有have_partitionsink_columns的parquet文件格式

# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}

# Create a source to product data
source {
FakeSource {
schema = {
fields {
name = string
age = int
}
}
}
}

# 将数据写入Oss
sink {
OssFile {
path = "/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_format_type = "parquet"
sink_columns = ["name","age"]
}
}

对于orc文件格式的简单配置

# 设置要执行的任务的基本配置
env {
parallelism = 1
job.mode = "BATCH"
}

# Create a source to product data
source {
FakeSource {
schema = {
fields {
name = string
age = int
}
}
}
}

# 将数据写入Oss
sink {
OssFile {
path="/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
file_format_type = "orc"
}
}

enable_header_write [boolean]

仅当file_format_type为text csv时使用。false:不写标头,true:写标头。

多表

用于从上游提取source元数据, 您可以在路径中使用${database_name}, ${table_name}${schema_name}


env {
parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "fake1"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
},
{
schema = {
table = "fake2"
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
]
}
}

sink {
OssFile {
bucket = "oss://whale-ops"
access_key = "xxxxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxx"
endpoint = "https://oss-accelerate.aliyuncs.com"
path = "/tmp/fake_empty/text/${table_name}"
row_delimiter = "\n"
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "text"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
compress_codec = "lzo"
}
}

Changelog

2.2.0-beta 2022-09-26

  • 添加OSS Sink连接器

2.3.0-beta 2022-10-20

  • [Bug修复] 修复windows环境下路径错误的bug(2980)
  • [Bug修复] 修复文件系统获取错误(3117)
  • [Bug修复] 解决了无法从配置中将\t解析为分隔符的错误文件(3083)

下个版本

  • [Bug修复] 修复了以下无法写入数据的错误文件(3258)
    • 当上游的字段为空时,它将抛出NullPointerException
    • Sink列映射失败
    • 当从状态还原写入程序时,直接获取事务失败
  • [优化] 支持为每个文件设置批量大小(3625)
  • [优化] 支持文件压缩(3899)

提示

1.SeaTunnel部署方案.