跳到主要内容
版本:2.3.10

Hive

Hive Sink 连接器

描述

将数据写入 Hive。

提示

为了使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hive。测试过的 Hive 版本是 2.3.9 和 3.1.3。

如果您使用 SeaTunnel 引擎,您需要将 seatunnel-hadoop3-3.1.4-uber.jarhive-exec-3.1.3.jarlibfb303-0.9.3.jar 放在 $SEATUNNEL_HOME/lib/ 目录中。

关键特性

默认情况下,我们使用 2PC 提交来确保“精确一次”。

  • 文件格式
    • 文本
    • CSV
    • Parquet
    • ORC
    • JSON
  • 压缩编解码器
    • LZO

选项

名称类型必需默认值
table_namestring-
metastore_uristring-
compress_codecstringnone
hdfs_site_pathstring-
hive_site_pathstring-
hive.hadoop.confMap-
hive.hadoop.conf-pathstring-
krb5_pathstring/etc/krb5.conf
kerberos_principalstring-
kerberos_keytab_pathstring-
abort_drop_partition_metadatabooleantrue
parquet_avro_write_timestamp_as_int96booleanfalse
common-options-

table_name [string]

目标 Hive 表名,例如:db1.table1。如果源是多模式,您可以使用 ${database_name}.${table_name} 来生成表名,它将用源生成的 CatalogTable 的值替换 ${database_name}${table_name}

metastore_uri [string]

Hive 元存储 URI

hdfs_site_path [string]

hdfs-site.xml 的路径,用于加载 Namenode 的高可用配置

hive_site_path [string]

hive-site.xml 的路径

hive.hadoop.conf [map]

Hadoop 配置中的属性(core-site.xmlhdfs-site.xmlhive-site.xml

hive.hadoop.conf-path [string]

指定加载 core-site.xmlhdfs-site.xmlhive-site.xml 文件的路径

krb5_path [string]

krb5.conf 的路径,用于 Kerberos 认证

hive-site.xml 的路径,用于 Hive 元存储认证

kerberos_principal [string]

Kerberos 的主体

kerberos_keytab_path [string]

Kerberos 的 keytab 文件路径

abort_drop_partition_metadata [boolean]

在中止操作期间是否从 Hive Metastore 中删除分区元数据的标志。注意:这只影响元存储中的元数据,分区中的数据将始终被删除(同步过程中生成的数据)。

parquet_avro_write_timestamp_as_int96 [boolean]

支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。

通用选项

Sink 插件的通用参数,请参阅 Sink Common Options 了解详细信息。

示例

  Hive {
table_name = "default.seatunnel_orc"
metastore_uri = "thrift://namenode001:9083"
}

示例 1

我们有一个源表如下:

create table test_hive_source(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE,
test_array ARRAY<INT>,
test_map MAP<STRING, FLOAT>,
test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

我们需要从源表读取数据并写入另一个表:

create table test_hive_sink_text_simple(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

作业配置文件可以如下:

env {
parallelism = 3
job.name="test_hive_source_to_hive"
}

source {
Hive {
table_name = "test_hive.test_hive_source"
metastore_uri = "thrift://ctyun7:9083"
}
}

sink {
# 选择 stdout 输出插件将数据输出到控制台

Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
hive.hadoop.conf = {
bucket = "s3a://mybucket"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}

示例 2:Kerberos

sink {
Hive {
table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
metastore_uri = "thrift://metastore:9083"
hive_site_path = "/tmp/hive-site.xml"
kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
kerberos_keytab_path = "/tmp/hive.keytab"
krb5_path = "/tmp/krb5.conf"
}
}

描述:

  • hive_site_pathhive-site.xml 文件的路径。
  • kerberos_principal:Kerberos 认证的主体。
  • kerberos_keytab_path:Kerberos 认证的 keytab 文件路径。
  • krb5_path:用于 Kerberos 认证的 krb5.conf 文件路径。

运行案例:

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Hive {
table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
metastore_uri = "thrift://metastore:9083"
hive_site_path = "/tmp/hive-site.xml"
kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
kerberos_keytab_path = "/tmp/hive.keytab"
krb5_path = "/tmp/krb5.conf"
}
}

Hive on s3

步骤 1

为 EMR 的 Hive 创建 lib 目录。

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

步骤 2

从 Maven 中心获取 jar 文件到 lib。

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

步骤 3

从您的 EMR 环境中复制 jar 文件到 lib 目录。

cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib

步骤 4

运行案例。

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_s3"
metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
}

Hive on oss

步骤 1

为 EMR 的 Hive 创建 lib 目录。

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

步骤 2

从 Maven 中心获取 jar 文件到 lib。

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

步骤 3

从您的 EMR 环境中复制 jar 文件到 lib 目录并删除冲突的 jar。

cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar

步骤 4

运行案例。

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Hive {
table_name = "test_hive.test_hive_sink_on_oss"
metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
hive.hadoop.conf = {
bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
}
}
}

示例 2

我们有多个源表如下:

create table test_1(
)
PARTITIONED BY (xx);

create table test_2(
)
PARTITIONED BY (xx);
...

我们需要从这些源表读取数据并写入其他表:

作业配置文件可以如下:

env {
# 您可以在此处设置 Flink 配置
parallelism = 3
job.name="test_hive_source_to_hive"
}

source {
Hive {
tables_configs = [
{
table_name = "test_hive.test_1"
metastore_uri = "thrift://ctyun6:9083"
},
{
table_name = "test_hive.test_2"
metastore_uri = "thrift://ctyun7:9083"
}
]
}
}

sink {
# 选择 stdout 输出插件将数据输出到控制台
Hive {
table_name = "${database_name}.${table_name}"
metastore_uri = "thrift://ctyun7:9083"
}
}

变更日志

Change Log
ChangeCommitVersion
Revert " [improve] update localfile connector config" (#9018)https://github.com/apache/seatunnel/commit/cdc79e13a2.3.10
[improve] update localfile connector config (#8765)https://github.com/apache/seatunnel/commit/def369a852.3.10
[Improve][connector-hive] Improved hive file allocation algorithm for subtasks (#8876)https://github.com/apache/seatunnel/commit/89d1878ad2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[Fix][Hive] Writing parquet files supports the optional timestamp int96 (#8509)https://github.com/apache/seatunnel/commit/856aea1952.3.10
[Fix] Set all snappy dependency use one version (#8423)https://github.com/apache/seatunnel/commit/3ac977c8d2.3.9
[Fix][Connector-V2] Fix hive krb5 path not work (#8228)https://github.com/apache/seatunnel/commit/e18a4d07b2.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Feature][File] Support config null format for text file read (#8109)https://github.com/apache/seatunnel/commit/2dbf02df42.3.9
[Improve][API] Unified tables_configs and table_list (#8100)https://github.com/apache/seatunnel/commit/84c0b8d662.3.9
[Feature][Core] Rename result_table_name/source_table_name to plugin_input/plugin_output (#8072)https://github.com/apache/seatunnel/commit/c7bbd322d2.3.9
[Feature][E2E] Add hive3 e2e test case (#8003)https://github.com/apache/seatunnel/commit/9a24fac2c2.3.9
[Improve][Connector-V2] Change File Read/WriteStrategy setSeaTunnelRowTypeInfo to setCatalogTable (#7829)https://github.com/apache/seatunnel/commit/6b5f74e522.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Improve][Zeta] Split the classloader of task group (#7580)https://github.com/apache/seatunnel/commit/3be0d1cc62.3.8
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[Improve][Hive] Close resources when exception occurs (#7205)https://github.com/apache/seatunnel/commit/5611715282.3.6
[Hotfix][Hive Connector] Fix Hive hdfs-site.xml and hive-site.xml not be load error (#7069)https://github.com/apache/seatunnel/commit/c23a577f32.3.6
Fix hive load hive_site_path and hdfs_site_path too late (#7017)https://github.com/apache/seatunnel/commit/e2578a5b42.3.6
[Bug][connector-hive] Eanble login with kerberos for hive (#6893)https://github.com/apache/seatunnel/commit/26e433e472.3.6
[Feature][S3 File] Make S3 File Connector support multiple table write (#6698)https://github.com/apache/seatunnel/commit/8f2049b2f2.3.6
[Feature] Hive Source/Sink support multiple table (#5929)https://github.com/apache/seatunnel/commit/4d9287fce2.3.6
[Improve][Hive] udpate hive3 version (#6699)https://github.com/apache/seatunnel/commit/1184c05c22.3.6
[HiveSink]Fix the risk of resource leakage. (#6721)https://github.com/apache/seatunnel/commit/c23804f132.3.6
[Improve][Connector-v2] The hive connector support multiple filesystem (#6648)https://github.com/apache/seatunnel/commit/8a4c01fe32.3.6
[Fix][Connector-V2] Fix add hive partition error when partition already existed (#6577)https://github.com/apache/seatunnel/commit/2a0a0b9d12.3.5
Fix HiveMetaStoreProxy#enableKerberos will return true if doesn't enable kerberos (#6307)https://github.com/apache/seatunnel/commit/1dad6f7062.3.4
[Feature][Engine] Unify job env parameters (#6003)https://github.com/apache/seatunnel/commit/2410ab38f2.3.4
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033)https://github.com/apache/seatunnel/commit/c324d663b2.3.4
Support using multiple hadoop account (#5903)https://github.com/apache/seatunnel/commit/d69d88d1a2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Hotfix][Connector-V2][Hive] fix the bug that hive-site.xml can not be injected in HiveConf (#5261)https://github.com/apache/seatunnel/commit/04ce22ac12.3.4
[Improve][Connector-v2][HiveSink]remove drop partition when abort. (#4940)https://github.com/apache/seatunnel/commit/edef87b522.3.3
[feature][web] hive add option because web need (#5154)https://github.com/apache/seatunnel/commit/5e1511ff02.3.3
[Hotfix][Connector-V2][Hive] Support user-defined hive-site.xml (#4965)https://github.com/apache/seatunnel/commit/2a064bcdb2.3.3
Change file type to file_format_type in file source/sink (#4249)https://github.com/apache/seatunnel/commit/973a2fae32.3.1
[hotfix] fixed schema options import errorhttps://github.com/apache/seatunnel/commit/656805f2d2.3.1
[chore] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/291214ad62.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[Imprve][Connector-V2][Hive] Support read text table & Column projection (#4105)https://github.com/apache/seatunnel/commit/717620f542.3.1
[Hotfix][Connector-V2][Hive] Fix hive unknownhost (#4141)https://github.com/apache/seatunnel/commit/f1a1dfe4a2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Improve][Connector-V2][Hive] Support assign partitions (#3842)https://github.com/apache/seatunnel/commit/6a4a850b42.3.1
[Improve][Connector-V2][Hive] Improve config check logic (#3886)https://github.com/apache/seatunnel/commit/b4348f6f42.3.1
[Feature][Connector-V2] Support kerberos in hive and hdfs file connector (#3840)https://github.com/apache/seatunnel/commit/055ad9d832.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Improve][Connector-V2] The log outputs detailed exception stack information (#3805)https://github.com/apache/seatunnel/commit/d0c6217f22.3.1
[Feature][Shade] Add seatunnel hadoop3 uber (#3755)https://github.com/apache/seatunnel/commit/5a024bdf82.3.0
[Feature][Connector-V2][File] Optimize filesystem utils (#3749)https://github.com/apache/seatunnel/commit/ac4e880fb2.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Hotfix][Connector-V2][Hive] Fix npe of getting file system (#3506)https://github.com/apache/seatunnel/commit/e1fc3d1b02.3.0
[Improve][Connector-V2][Hive] Unified exceptions for hive source & sink connector (#3541)https://github.com/apache/seatunnel/commit/12c0fb91d2.3.0
[Feature][Connector-V2][File] Add option and factory for file connectors (#3375)https://github.com/apache/seatunnel/commit/db286e8632.3.0
[Hotfix][Connector-V2][Hive] Fix the bug that when write data to hive throws NullPointerException (#3258)https://github.com/apache/seatunnel/commit/777bf6b422.3.0
[Improve][Connector-V2][Hive] Hive Sink Support msck partitions (#3133)https://github.com/apache/seatunnel/commit/a8738ef3c2.3.0-beta
unify flatten-maven-plugin version (#3078)https://github.com/apache/seatunnel/commit/ed743fddc2.3.0-beta
[Engine][Merge] fix merge problemhttps://github.com/apache/seatunnel/commit/0e9ceeefc2.3.0-beta
Merge remote-tracking branch 'upstream/dev' into st-enginehttps://github.com/apache/seatunnel/commit/ca80df7792.3.0-beta
update hive.metastore.version to hive.exec.version (#2879)https://github.com/apache/seatunnel/commit/018ee0a3d2.2.0-beta
[Bug][Connector-V2] Fix hive sink bug (#2870)https://github.com/apache/seatunnel/commit/d661fa0112.2.0-beta
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)https://github.com/apache/seatunnel/commit/61720306e2.2.0-beta
[Bug][Connector-V2] Fix hive source text table name (#2797)https://github.com/apache/seatunnel/commit/563637ebd2.2.0-beta
[Improve][Connector-V2] Refactor hive source & sink connector (#2708)https://github.com/apache/seatunnel/commit/a357dca362.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) (#2731)https://github.com/apache/seatunnel/commit/e8929ab602.3.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f7552.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be692.2.0-beta
[Improve][Connector-V2] Refactor the package of hdfs file connector (#2402)https://github.com/apache/seatunnel/commit/87d0624c52.2.0-beta
[Feature][Connector-V2] Add orc file support in connector hive sink (#2311) (#2374)https://github.com/apache/seatunnel/commit/81cb80c052.2.0-beta
[improve][UT] Upgrade junit to 5.+ (#2305)https://github.com/apache/seatunnel/commit/362319ff32.2.0-beta
Decide table format using outputFormat in HiveSinkConfig #2303https://github.com/apache/seatunnel/commit/3a2586f6d2.2.0-beta
[Feature][Connector-V2-Hive] Add parquet file format support to Hive Sink (#2310)https://github.com/apache/seatunnel/commit/4ab3c21b82.2.0-beta
Add BaseHiveCommitInfo for common hive commit info (#2306)https://github.com/apache/seatunnel/commit/0d2f6f4d72.2.0-beta
Remove same code to independent method in HiveSinkWriter (#2307)https://github.com/apache/seatunnel/commit/e99e6ee722.2.0-beta
Avoid potential null pointer risk in HiveSinkWriter#snapshotState (#2302)https://github.com/apache/seatunnel/commit/e7d817f7d2.2.0-beta
[Connector-V2] Add file type check logic in hive connector (#2275)https://github.com/apache/seatunnel/commit/5488337c62.2.0-beta
[Connector-V2] Add parquet file reader for Hive Source Connector (#2199) (#2237)https://github.com/apache/seatunnel/commit/59db97ed32.2.0-beta
Merge from dev to st-engine (#2243)https://github.com/apache/seatunnel/commit/41e530afd2.3.0-beta
StateT of SeaTunnelSource should extend Serializable (#2214)https://github.com/apache/seatunnel/commit/8c426ef852.2.0-beta
[Bug][connector-hive] filter '_SUCCESS' file in file list (#2235) (#2236)https://github.com/apache/seatunnel/commit/db04651522.2.0-beta
[Bug][hive-connector-v2] Resolve the schema inconsistency bug (#2229) (#2230)https://github.com/apache/seatunnel/commit/62ca075912.2.0-beta
[Bug][spark-connector-v2-example] fix the bug of no class found. (#2191) (#2192)https://github.com/apache/seatunnel/commit/5dbc2df172.2.0-beta
[Connector-V2] Add Hive sink connector v2 (#2158)https://github.com/apache/seatunnel/commit/23ad4ee732.2.0-beta
[Connector-V2] Add File Sink Connector (#2117)https://github.com/apache/seatunnel/commit/e2283da642.2.0-beta
[Connector-V2]Hive Source (#2123)https://github.com/apache/seatunnel/commit/ffcf3f59e2.2.0-beta
[api-draft][Optimize] Optimize module name (#2062)https://github.com/apache/seatunnel/commit/f79e3112b2.2.0-beta