跳到主要内容
版本:Next

Elasticsearch

描述

输出数据到 Elasticsearch

主要特性

提示

引擎支持

  • 支持 ElasticSearch 版本 >= 2.x 并且 <= 8.x

选项

名称类型是否必须默认值
hostsarray-
indexstring-
schema_save_modestringCREATE_SCHEMA_WHEN_NOT_EXIST
data_save_modestringAPPEND_DATA
index_typestring
primary_keyslist
key_delimiterstring_
usernamestring
passwordstring
max_retry_countint3
max_batch_sizeint10
tls_verify_certificatebooleantrue
tls_verify_hostnamesbooleantrue
tls_keystore_pathstring-
tls_keystore_passwordstring-
tls_truststore_pathstring-
tls_truststore_passwordstring-
common-options-

hosts [array]

Elasticsearch 集群http地址,格式为 host:port ,允许指定多个主机。例如 ["host1:9200", "host2:9200"]

index [string]

Elasticsearchindex 名称。索引支持包含字段名变量,例如 seatunnel_${age},并且该字段必须出现在 seatunnel Row 中。如果没有,我们将把它视为普通索引

index_type [string]

Elasticsearch 索引类型,elasticsearch 6及以上版本建议不要指定

primary_keys [list]

主键字段用于生成文档 _id ,这是 CDC 必需的选项。

key_delimiter [string]

设定复合键的分隔符(默认为 _),例如,如果使用 $ 作为分隔符,那么文档的 _id 将呈现为 KEY1$KEY2$KEY3 的格式

username [string]

x-pack 用户名

password [string]

x-pack 密码

max_retry_count [int]

批次批量请求最大尝试大小

max_batch_size [int]

批次批量文档最大大小

tls_verify_certificate [boolean]

为 HTTPS 端点启用证书验证

tls_verify_hostname [boolean]

为 HTTPS 端点启用主机名验证

tls_keystore_path [string]

指向 PEM 或 JKS 密钥存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件

tls_keystore_password [string]

指定的密钥存储的密钥密码

tls_truststore_path [string]

指向 PEM 或 JKS 信任存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件

tls_truststore_password [string]

指定的信任存储的密钥密码

common options

Sink插件常用参数,请参考 Sink常用选项 了解详情

schema_save_mode

在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA :当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST :当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST :当表不存在时将抛出错误

data_save_mode

在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案
选项介绍:
DROP_DATA: 保留数据库结构,删除数据
APPEND_DATA:保留数据库结构,保留数据
ERROR_WHEN_DATA_EXISTS:当有数据时抛出错误

示例

简单示例

sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
}
}

变更数据捕获 (Change data capture) 事件

sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"

# CDC required options
primary_keys = ["key1", "key2", ...]
}
}

SSL 禁用证书验证

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_verify_certificate = false
}
}

SSL 禁用主机名验证

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_verify_hostname = false
}
}

SSL 启用证书验证

通过设置 tls_keystore_pathtls_keystore_password 指定证书路径及密码

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}

配置表生成策略 (schema_save_mode)

通过设置 schema_save_mode 配置为 CREATE_SCHEMA_WHEN_NOT_EXIST 来支持不存在表时创建表

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}

变更日志

下一版本

  • [Feature] Support CDC write DELETE/UPDATE/INSERT events (3673)
  • [Feature] Support https protocol & compatible with opensearch (3997)