跳到主要内容
版本:Next

Kudu

Kudu source connector

Support Kudu Version

  • 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key features

Description

Used to read data from Kudu.

The tested kudu version is 1.11.1.

Data Type Mapping

kudu Data TypeSeaTunnel Data Type
BOOLBOOLEAN
INT8
INT16
INT32
INT
INT64BIGINT
DECIMALDECIMAL
FLOATFLOAT
DOUBLEDOUBLE
STRINGSTRING
UNIXTIME_MICROSTIMESTAMP
BINARYBYTES

Source Options

NameTypeRequiredDefaultDescription
kudu_mastersStringYes-Kudu master address. Separated by ',',such as '192.168.88.110:7051'.
table_nameStringYes-The name of kudu table.
client_worker_countIntNo2 * Runtime.getRuntime().availableProcessors()Kudu worker count. Default value is twice the current number of cpu cores.
client_default_operation_timeout_msLongNo30000Kudu normal operation time out.
client_default_admin_operation_timeout_msLongNo30000Kudu admin operation time out.
enable_kerberosBoolNofalseKerberos principal enable.
kerberos_principalStringNo-Kerberos principal. Note that all zeta nodes require have this file.
kerberos_keytabStringNo-Kerberos keytab. Note that all zeta nodes require have this file.
kerberos_krb5confStringNo-Kerberos krb5 conf. Note that all zeta nodes require have this file.
scan_token_query_timeoutLongNo30000The timeout for connecting scan token. If not set, it will be the same as operationTimeout.
scan_token_batch_size_bytesIntNo1024 * 1024Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB.
filterIntNo1024 * 1024Kudu scan filter expressions,Not supported yet.
schemaMapNo1024 * 1024SeaTunnel Schema.
table_listArrayNo-The list of tables to be read. you can use this configuration instead of table_path example: table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}]
common-optionsNo-Source plugin common parameters, please refer to Source Common Options for details.

Task Example

Simple:

The following example is for a Kudu table named "kudu_source_table", The goal is to print the data from this table on the console and write kudu table "kudu_sink_table"

# Defining the runtime environment
env {
parallelism = 2
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
kudu {
kudu_masters = "kudu-master:7051"
table_name = "kudu_source_table"
plugin_output = "kudu"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}

transform {
}

sink {
console {
plugin_input = "kudu"
}

kudu {
plugin_input = "kudu"
kudu_masters = "kudu-master:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}

Multiple Table

env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
kudu{
kudu_masters = "kudu-master:7051"
table_list = [
{
table_name = "kudu_source_table_1"
},{
table_name = "kudu_source_table_2"
}
]
plugin_output = "kudu"
}
}

transform {
}

sink {
Assert {
rules {
table-names = ["kudu_source_table_1", "kudu_source_table_2"]
}
}
}

Changelog

2.2.0-beta 2022-09-26

  • Add Kudu Source Connector

Next Version

  • Change plugin name from KuduSource to Kudu 3432