Hive
Hive sink connector
Descriptionâ
Write data to Hive.
In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9.
Tips: Hive Sink Connector not support array, map and struct datatype now
Key featuresâ
By default, we use 2PC commit to ensure exactly-once
- schema projection
- file format
- text
- parquet
- orc
Optionsâ
name | type | required | default value |
---|---|---|---|
table_name | string | yes | - |
metastore_uri | string | yes | - |
partition_by | array | required if hive sink table have partitions | - |
sink_columns | array | no | When this parameter is empty, all fields are sink columns |
is_enable_transaction | boolean | no | true |
save_mode | string | no | "append" |
table_name [string]â
Target Hive table name eg: db1.table1
metastore_uri [string]â
Hive metastore uri
partition_by [array]â
Partition data based on selected fields
sink_columns [array]â
Which columns need be write to hive, default value is all of the columns get from Transform
or Source
.
The order of the fields determines the order in which the file is actually written.
is_enable_transaction [boolean]â
If is_enable_transaction
is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.
Only support true
now.
save_mode [string]â
Storage mode, we need support overwrite
and append
. append
is now supported.
Streaming Job not support overwrite
.
Exampleâ
Hive {
table_name = "default.seatunnel_orc"
metastore_uri = "thrift://namenode001:9083"
}
example 1â
We have a source table like this:
create table test_hive_source(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE,
test_array ARRAY<INT>,
test_map MAP<STRING, FLOAT>,
test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
We need read data from the source table and write to another table:
create table test_hive_sink_text_simple(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
The job config file can like this:
env {
# You can set flink configuration here
execution.parallelism = 3
job.name="test_hive_source_to_hive"
}
source {
Hive {
table_name = "test_hive.test_hive_source"
metastore_uri = "thrift://ctyun7:9083"
}
}
transform {
}
sink {
# choose stdout output plugin to output data to console
Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
partition_by = ["test_par1", "test_par2"]
sink_columns = ["test_tinyint", "test_smallint", "test_int", "test_bigint", "test_boolean", "test_float", "test_double", "test_string", "test_binary", "test_timestamp", "test_decimal", "test_char", "test_varchar", "test_date", "test_par1", "test_par2"]
}
}