跳到主要内容
版本:2.3.8

Hudi

Hudi 接收器连接器

描述

用于将数据写入 Hudi。

主要特点

选项

基础配置:

名称名称是否必需默认值
table_dfs_pathstring-
conf_files_pathstring-
table_liststring-
auto_commitbooleantrue
schema_save_modeenumCREATE_SCHEMA_WHEN_NOT_EXIST
common-optionsconfig-

表清单配置:

名称类型是否必需默认值
table_namestringyes-
databasestringnodefault
table_typeenumnoCOPY_ON_WRITE
op_typeenumnoinsert
record_key_fieldsstringno-
partition_fieldsstringno-
batch_interval_msIntno1000
batch_sizeIntno1000
insert_shuffle_parallelismIntno2
upsert_shuffle_parallelismIntno2
min_commits_to_keepIntno20
max_commits_to_keepIntno30
index_typeenumnoBLOOM
index_class_namestringno-
record_byte_sizeIntno1024

注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。

table_name [string]

table_name Hudi 表的名称。

database [string]

database Hudi 表的database.

table_dfs_path [string]

table_dfs_path Hudi 表的 DFS 根路径,例如 "hdfs://nameservice/data/hudi/"。

table_type [enum]

table_type Hudi 表的类型。

record_key_fields [string]

record_key_fields Hudi 表的记录键字段, 当op_type是UPSERT类型时, 必须配置该项.

partition_fields [string]

partition_fields Hudi 表的分区字段.

index_type [string]

index_type Hudi 表的索引类型. 当前只支持BLOOM, SIMPLE, GLOBAL SIMPLE三种类型.

index_class_name [string]

index_class_name Hudi 表自定义索引名称,例如: org.apache.seatunnel.connectors.seatunnel.hudi.index.CustomHudiIndex.

record_byte_size [Int]

record_byte_size Hudi 表单行记录的大小, 该值可用于预估每个hudi数据文件中记录的大致数量。调整此参数与batch_size可以有效减少hudi数据文件写放大次数.

conf_files_path [string]

conf_files_path 环境配置文件路径列表(本地路径),用于初始化 HDFS 客户端以读取 Hudi 表文件。示例:"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"。

op_type [enum]

op_type Hudi 表的操作类型。值可以是 insertupsertbulk_insert

batch_interval_ms [Int]

batch_interval_ms 批量写入 Hudi 表的时间间隔。

batch_size [Int]

batch_size 批量写入 Hudi 表的记录数大小.

insert_shuffle_parallelism [Int]

insert_shuffle_parallelism 插入数据到 Hudi 表的并行度。

upsert_shuffle_parallelism [Int]

upsert_shuffle_parallelism 更新插入数据到 Hudi 表的并行度。

min_commits_to_keep [Int]

min_commits_to_keep Hudi 表保留的最少提交数。

max_commits_to_keep [Int]

max_commits_to_keep Hudi 表保留的最多提交数。

auto_commit [boolean]

auto_commit 是否自动提交.

schema_save_mode [Enum]

在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA:当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST:当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST:当表不存在时将抛出错误
IGNORE :忽略对表的处理

通用选项

数据源插件的通用参数,请参考 Source Common Options 了解详细信息。

示例

单表

sink {
Hudi {
table_dfs_path = "hdfs://nameserivce/data/"
database = "st"
table_name = "test_table"
table_type = "COPY_ON_WRITE"
conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
batch_size = 10000
use.kerberos = true
kerberos.principal = "test_user@xxx"
kerberos.principal.file = "/home/test/test_user.keytab"
}
}

多表

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
Hudi {
table_dfs_path = "hdfs://nameserivce/data/"
conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
table_list = [
{
database = "st1"
table_name = "role"
table_type = "COPY_ON_WRITE"
op_type="INSERT"
batch_size = 10000
},
{
database = "st1"
table_name = "user"
table_type = "COPY_ON_WRITE"
op_type="UPSERT"
# op_type is 'UPSERT', must configured record_key_fields
record_key_fields = "user_id"
batch_size = 10000
},
{
database = "st1"
table_name = "Bucket"
table_type = "MERGE_ON_READ"
}
]
...
}
}