跳到主要内容
版本:Next

Kudu

Kudu数据接收器

支持Kudu版本

  • 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

支持引擎

Spark
Flink
SeaTunnel Zeta

主要特性

数据类型映射

SeaTunnel 数据类型Kudu 数据类型
BOOLEANBOOL
INTINT8
INT16
INT32
BIGINTINT64
DECIMALDECIMAL
FLOATFLOAT
DOUBLEDOUBLE
STRINGSTRING
TIMESTAMPUNIXTIME_MICROS
BYTESBINARY

Sink 选项

名称类型是否必填默认值描述
kudu_mastersString-Kudu主地址。用“,”分隔,例如“192.168.88.110:7051”。
table_nameString-Kudu表的名字。
client_worker_countInt2 * Runtime.getRuntime().availableProcessors()Kudu工人数。默认值是当前cpu核数的两倍。
client_default_operation_timeout_msLong30000Kudu正常运行超时。
client_default_admin_operation_timeout_msLong30000Kudu管理员操作超时。
enable_kerberosBoolfalse启用Kerberos主体。
kerberos_principalString-Kerberos主体。请注意,所有zeta节点都需要此文件。
kerberos_keytabString-Kerberos密钥表。请注意,所有zeta节点都需要此文件。
kerberos_krb5confString-Kerberos krb5 conf.请注意,所有zeta节点都需要此文件。
save_modeString-存储模式,支持 overwriteappend.
session_flush_modeStringAUTO_FLUSH_SYNCKudu刷新模式。默认AUTO_FLUSH_SYNC。
batch_sizeInt1024超过此记录数的刷新最大大小(包括所有追加、追加和删除记录)将刷新数据。默认值为100
buffer_flush_intervalInt10000刷新间隔期间,异步线程将刷新数据。
ignore_not_foundBoolfalse如果为true,则忽略所有未找到的行。
ignore_not_duplicateBoolfalse如果为true,则忽略所有dulicate行。
common-options-源插件常用参数,详见[Source common Options](../sink common-Options.md)。

任务示例

简单示例:

以下示例引用了FakeSource kudu写入表kudu_sink_table


env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "kudu"
schema = {
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_BEFORE
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_AFTER
fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = DELETE
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
}

sink {
kudu{
plugin_input = "kudu"
kudu_masters = "kudu-master-cdc:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}

多表

示例1

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
kudu{
kudu_masters = "kudu-master-cdc:7051"
table_name = "${database_name}_${table_name}_test"
}
}

示例2

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword

table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}

transform {
}

sink {
kudu{
kudu_masters = "kudu-master-cdc:7051"
table_name = "${schema_name}_${table_name}_test"
}
}

更改日志

2.2.0-beta 2022-09-26

  • 添加Kudu数据接收器

2.3.0-beta 2022-10-20

  • [改进]Kudu Sink连接器支持追加销售行(2881)

下个版本

  • 将插件名称从"KuduSink"更改为"Kudu"3432