跳到主要内容
版本:Next

S3File

S3 文件 Sink 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

默认情况下,我们使用 2PC 提交来确保 精确一次

  • 文件格式类型
    • text
    • csv
    • parquet
    • orc
    • json
    • excel
    • xml
    • binary

描述

将数据输出到 AWS S3 文件系统。

支持的数据源信息

数据源支持的版本
S3当前版本

数据库依赖

如果您使用 Spark/Flink,为了使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hadoop。测试的 Hadoop 版本为 2.x。

如果您使用 SeaTunnel引擎,当您下载并安装 SeaTunnel引擎时,它会自动集成 Hadoop jar 包。您可以在 ${SEATUNNEL_HOME}/lib 下检查 jar 包以确认这一点。 要使用此连接器,您需要将 hadoop-aws-3.1.4.jaraws-java-sdk-bundle-1.12.692.jar 放在 ${SEATUNNEL_HOME}/lib 目录下。

数据类型映射

如果写入 csvtext 文件类型,所有列都将为字符串类型。

Orc 文件类型

SeaTunnel 数据类型Orc 数据类型
STRINGSTRING
BOOLEANBOOLEAN
TINYINTBYTE
SMALLINTSHORT
INTINT
BIGINTLONG
FLOATFLOAT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BYTESBINARY
DATEDATE
TIME
TIMESTAMP
TIMESTAMP
ROWSTRUCT
NULL不支持的数据类型
ARRAYLIST
MapMap

Parquet 文件类型

SeaTunnel 数据类型Parquet 数据类型
STRINGSTRING
BOOLEANBOOLEAN
TINYINTINT_8
SMALLINTINT_16
INTINT32
BIGINTINT64
FLOATFLOAT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BYTESBINARY
DATEDATE
TIME
TIMESTAMP
TIMESTAMP_MILLIS
ROWGroupType
NULL不支持的数据类型
ARRAYLIST
MapMap

Sink 选项

名称类型是否必填默认值描述
pathstring-
tmp_pathstring/tmp/seatunnel结果文件将首先写入临时路径,然后使用 mv 将临时目录提交到目标目录。需要一个 S3 目录。
bucketstring-
fs.s3a.endpointstring-
fs.s3a.aws.credentials.providerstringcom.amazonaws.auth.InstanceProfileCredentialsProvider认证 s3a 的方式。目前仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvidercom.amazonaws.auth.InstanceProfileCredentialsProvider
access_keystring-仅当 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
access_secretstring-仅当 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
custom_filenamebooleanfalse是否需要自定义文件名
file_name_expressionstring"${transactionId}"仅当 custom_filename 为 true 时使用
filename_time_formatstring"yyyy.MM.dd"仅当 custom_filename 为 true 时使用
file_format_typestring"csv"
field_delimiterstring'\001'仅当 file_format 为 text 时使用
row_delimiterstring"\n"仅当 file_format 为 text 时使用
have_partitionbooleanfalse是否需要处理分区。
partition_byarray-仅当 have_partition 为 true 时使用
partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"仅当 have_partition 为 true 时使用
is_partition_field_write_in_filebooleanfalse仅当 have_partition 为 true 时使用
sink_columnsarray当此参数为空时,所有字段均为 sink 列
is_enable_transactionbooleantrue
batch_sizeint1000000
compress_codecstringnone
common-optionsobject-
max_rows_in_memoryint-仅当 file_format 为 excel 时使用
sheet_namestringSheet${Random number}仅当 file_format 为 excel 时使用
csv_string_quote_modeenumMINIMAL仅当 file_format 为 csv 时使用
xml_root_tagstringRECORDS仅当 file_format 为 xml 时使用,指定 XML 文件中根元素的标签名称。
xml_row_tagstringRECORD仅当 file_format 为 xml 时使用,指定 XML 文件中数据行的标签名称。
xml_use_attr_formatboolean-仅当 file_format 为 xml 时使用,指定是否使用标签属性格式处理数据。
single_file_modebooleanfalse每个并行度只会输出一个文件。当此参数开启时,batch_size 将不会生效。输出文件名不会有文件块后缀。
create_empty_file_when_no_databooleanfalse当上游没有数据同步时,仍然会生成相应的数据文件。
parquet_avro_write_timestamp_as_int96booleanfalse仅当 file_format 为 parquet 时使用
parquet_avro_write_fixed_as_int96array-仅当 file_format 为 parquet 时使用
hadoop_s3_propertiesmap如果您需要添加其他选项,可以在此处添加,并参考此链接
schema_save_modeEnumCREATE_SCHEMA_WHEN_NOT_EXIST在开启同步任务之前,对目标路径进行不同的处理
data_save_modeEnumAPPEND_DATA在开启同步任务之前,对目标路径中的数据文件进行不同的处理
enable_header_writebooleanfalse仅当 file_format_type 为 text,csv 时使用。
false: 不写入表头, true: 写入表头。
encodingstring"UTF-8"仅当 file_format_type 为 json,text,csv,xml 时使用。

path [string]

存储数据文件的路径,支持变量替换。例如:path=/test/${database_name}/${schema_name}/${table_name}

hadoop_s3_properties [map]

如果您需要添加其他选项,可以在此处添加,并参考此链接

hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}

custom_filename [boolean]

是否自定义文件名

file_name_expression [string]

仅当 custom_filenametrue 时使用

file_name_expression 描述了将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now}${uuid},例如 test_${uuid}_${now}${now} 表示当前时间,其格式可以通过指定选项 filename_time_format 来定义。

请注意,如果 is_enable_transactiontrue,我们会在文件头部自动添加 ${transactionId}_

filename_time_format [string]

仅当 custom_filenametrue 时使用

file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式如下:

符号描述
y
M
d
H小时 (0-23)
m分钟
s

file_format_type [string]

我们支持以下文件类型:

text csv parquet orc json excel xml binary

请注意,最终文件名将以文件格式类型的后缀结尾,文本文件的后缀为 txt

field_delimiter [string]

行数据中列之间的分隔符。仅在 text 文件格式中需要。

row_delimiter [string]

文件中行之间的分隔符。仅在 text 文件格式中需要。

have_partition [boolean]

是否需要处理分区。

partition_by [array]

仅当 have_partitiontrue 时使用。

根据选定的字段对数据进行分区。

partition_dir_expression [string]

仅当 have_partitiontrue 时使用。

如果指定了 partition_by,我们将根据分区信息生成相应的分区目录,最终文件将放置在分区目录中。

默认的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一个分区字段,v0 是第一个分区字段的值。

is_partition_field_write_in_file [boolean]

仅当 have_partitiontrue 时使用。

如果 is_partition_field_write_in_filetrue,分区字段及其值将被写入数据文件。

例如,如果您想写入 Hive 数据文件,其值应为 false

sink_columns [array]

哪些列需要写入文件,默认值为从 TransformSource 获取的所有列。 字段的顺序决定了文件实际写入的顺序。

is_enable_transaction [boolean]

如果 is_enable_transaction 为 true,我们将确保在将数据写入目标目录时不会丢失或重复。

请注意,如果 is_enable_transactiontrue,我们会在文件头部自动添加 ${transactionId}_

目前仅支持 true

batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_sizecheckpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,sink writer 将一直写入文件,直到文件中的行数超过 batch_size。如果 checkpoint.interval 较小,sink writer 将在新的 checkpoint 触发时创建一个新文件。

compress_codec [string]

文件的压缩编解码器,支持的详细信息如下:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc: lzo snappy lz4 zlib none
  • parquet: lzo snappy lz4 gzip brotli zstd none

提示:excel 类型不支持任何压缩格式

common options

Sink 插件通用参数,请参考 Sink 通用选项 获取详细信息。

max_rows_in_memory [int]

当文件格式为 Excel 时,内存中可以缓存的最大数据项数。

sheet_name [string]

写入工作表的名称

csv_string_quote_mode [string]

当文件格式为 CSV 时,CSV 的字符串引用模式。

  • ALL: 所有字符串字段都会被引用。
  • MINIMAL: 引用包含特殊字符的字段,如字段分隔符、引用字符或行分隔符字符串中的任何字符。
  • NONE: 从不引用字段。当数据中出现分隔符时,打印机会在其前面加上转义字符。如果未设置转义字符,格式验证将抛出异常。

xml_root_tag [string]

指定 XML 文件中根元素的标签名称。

xml_row_tag [string]

指定 XML 文件中数据行的标签名称。

xml_use_attr_format [boolean]

指定是否使用标签属性格式处理数据。

parquet_avro_write_timestamp_as_int96 [boolean]

支持将时间戳写入 Parquet INT96,仅对 parquet 文件有效。

parquet_avro_write_fixed_as_int96 [array]

支持将 12-byte 字段写入 Parquet INT96,仅对 parquet 文件有效。

schema_save_mode[Enum]

在开启同步任务之前,对目标路径进行不同的处理。
选项介绍:
RECREATE_SCHEMA :当路径不存在时创建。如果路径已存在,则删除路径并重新创建。
CREATE_SCHEMA_WHEN_NOT_EXIST :当路径不存在时创建,路径存在时使用路径。
ERROR_WHEN_SCHEMA_NOT_EXIST :当路径不存在时报错
IGNORE :忽略表的处理

data_save_mode[Enum]

在开启同步任务之前,对目标路径中的数据文件进行不同的处理。 选项介绍:
DROP_DATA:使用路径但删除路径中的数据文件。 APPEND_DATA:使用路径,并在路径中添加新文件以写入数据。
ERROR_WHEN_DATA_EXISTS:当路径中存在数据文件时,将报错。

encoding [string]

仅当 file_format_type 为 json,text,csv,xml 时使用。 写入文件的编码。此参数将由 Charset.forName(encoding) 解析。

示例

简单示例:

此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并将其发送到 S3File Sink。FakeSource 总共生成 16 行数据 (row.num=16),每行有两个字段,name (字符串类型) 和 age (int 类型)。最终的目标 s3 目录将创建一个文件,并将所有数据写入其中。 在运行此作业之前,您需要创建 s3 路径:/seatunnel/text。如果您尚未安装和部署 SeaTunnel,您需要按照 安装 SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 使用 SeaTunnel Engine 快速入门 中的说明运行此作业。

# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}

source {
# 这是一个示例源插件,仅用于测试和演示功能源插件
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
name = string
c_boolean = boolean
age = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
# 如果您想了解更多关于如何配置SeaTunnel以及查看完整的源插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/source
source {
}

transform {
# 如果您想了解更多关于如何配置SeaTunnel以及查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/transform-v2
}

sink {
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}
# 如果您想了解更多关于如何配置SeaTunnel以及查看完整的接收插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/sink
}

对于文本文件格式,包含 have_partitioncustom_filenamesink_columnscom.amazonaws.auth.InstanceProfileCredentialsProvider

S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}

对于Parquet文件格式,简单配置使用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/parquet"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "parquet"
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}

对于ORC文件格式,简单配置使用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/orc"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "orc"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}

多表写入和保存模式

env {
"job.name"="SeaTunnel_job"
"job.mode"=STREAMING
}
source {
MySQL-CDC {
database-names=[
"wls_t1"
]
table-names=[
"wls_t1.mysqlcdc_to_s3_t3",
"wls_t1.mysqlcdc_to_s3_t4",
"wls_t1.mysqlcdc_to_s3_t5",
"wls_t1.mysqlcdc_to_s3_t1",
"wls_t1.mysqlcdc_to_s3_t2"
]
password="xxxxxx"
username="xxxxxxxxxxxxx"
base-url="jdbc:mysql://localhost:3306/qa_source"
}
}

transform {
}

sink {
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel/${table_name}"
path="/test/${table_name}"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "orc"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}
}

enable_header_write [boolean]

仅在 file_format_type 为 text 或 csv 时使用。false:不写入表头,true:写入表头。