Redis
Redis 源连接器
描述
用于从 Redis 读取数据
主要功能
配置选项
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|---|---|---|---|---|
| host | string | mode=single时必须 | - | Redis 服务器主机地址 |
| port | int | 否 | 6379 | Redis 服务器端口 |
| user | string | 否 | - | Redis 认证用户名 |
| auth | string | 否 | - | Redis 认证密码 |
| db_num | int | 否 | 0 | Redis 数据库索引 |
| mode | string | 否 | single | Redis 模式:single 或 cluster |
| nodes | list | mode=cluster 时必须 | - | Redis 集群节点,格式为 ["host1:port1", "host2:port2"] |
| tables_configs | list | 否 | - | 多表读取时的表配置列表 |
| common-options | 否 | - | 源连接器插件通用参数,详情请参见 Source Common Options |
表级配置参数
使用 tables_configs 读取多个 key 模式时,每个表配置可以包含以下参数:
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|---|---|---|---|---|
| keys | string | 是 | - | 要扫描的 Redis key pattern |
| data_type | string | 是 | - | Redis 数据类型:key、hash、list、set、zset |
| batch_size | int | 否 | 10 | SCAN 操作的批量大小 |
| format | string | 否 | json | 数据格式:json 或 text |
| schema | config | 否 | - | Schema 配置 |
| hash_key_parse_mode | string | 否 | all | Hash key 解析模式:all 或 kv |
| read_key_enabled | boolean | 否 | false | 是否在输出中包含 Redis key |
| key_field_name | string | 否 | - | Redis key 的字段名称 |
| single_field_name | string | 否 | - | 单值类型的字段名称 |
| field_delimiter | string | 否 | ',' | 文本格式的分隔符 |
注意: 当配置对应单个表时,可以将 tables_configs 中的配置项平铺到外层(向后兼容)。
重要提示: 在多表模式下,上述表级参数需要配置在 tables_configs 的每个表项中。
host [string]
redis 主机地址
port [int]
redis 端口号
hash_key_parse_mode [string]
指定 hash key 解析模式, 支持 all kv 模式, 用于设定连接器如何解析 hash key。
当设定为 all 时,连接器会将 hash key 的值视为一行并根据 schema config 配置进行解析,当设定为 kv 时,连接器会将 hash key 的每个 kv 视为一行,并根据 schema config 进行解析。
例如,如果 hash key 的值如下设置:
{
"001": {
"name": "tyrantlucifer",
"age": 26
},
"002": {
"name": "Zongwen",
"age": 26
}
}
如果 hash_key_parse_mode 设置为 all 模式,且 schema config 如下所示,将会生成下表数据:
schema {
fields {
001 {
name = string
age = int
}
002 {
name = string
age = int
}
}
}
| 001 | 002 |
|---|---|
| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |
如果 hash_key_parse_mode 设置为 kv 模式,且 schema config 如下所示,将会生成下表数据:
schema {
fields {
hash_key = string
name = string
age = int
}
}
| hash_key | name | age |
|---|---|---|
| 001 | tyrantlucifer | 26 |
| 002 | Zongwen | 26 |
hash key 中的每个 kv 将会被视为一行并被发送给上游。
提示:连接器将使用 scheme config 的第一个字段信息作为每个 kv 中每个 k 的字段名称
keys [string]
keys 模式
batch_size [int]
表示每次迭代尝试返回的键的数量,默认值为 10。
提示:Redis 连接器支持模糊键匹配,用户需要确保匹配的键类型相同
data_type [string]
redis 数据类型, 支持 key hash list set zset。
- key
将每个 key 的值将作为单行数据发送给下游。
例如,key 对应的值为SeaTunnel test message,则下游接收到的数据为SeaTunnel test message,并且仅会收到一条信息。
- hash
hash 键值对将会被格式化为 json,并以单行数据的形式发送给下游。
例如,hash 值为name:tyrantlucifer age:26,则下游接收到的数据为{"name":"tyrantlucifer", "age":"26"},并且仅会收到一条信息。
- list
list 中的每个元素都将作为单行数据向下游发送。
例如,list 值为[tyrantlucier, CalvinKirs],则下游接收到的数据为tyrantlucifer和CalvinKirs,并且仅会收到两条信息。
- set
set 中的每个元素都将作为单行数据向下游发送。
例如,set 值为[tyrantlucier, CalvinKirs],则下游接收到的数据为tyrantlucifer和CalvinKirs,并且仅会收到两条信息。
- zset
zset 中的每个元素都将作为单行数据向下游发送。
例如,zset 值为[tyrantlucier, CalvinKirs],则下游接收到的数据为tyrantlucifer和CalvinKirs,并且仅会收到两条信息。
user [string]
Redis 认证身份用户,当连接到加密集群时需要使用
auth [string]
Redis 认证密钥,当连接到加密集群时需要使用
db_num [int]
Redis 数据库索引 ID,默认将连接到 db 0
mode [string]
Redis 模式,single 或 cluster,默认值为 single
nodes [list]
Redis 节点信息,在 cluster 模式下使用,必须设置为以下格式:
["host1:port1", "host2:port2"]
format [string]
上游数据格式,目前仅支持 json text,默认为 json
当指定格式为 json 时,还需要指定 scheme option,例如:
当上游数据如下时:
{"code": 200, "data": "get success", "success": true}
需要指定 schema 为如下配置:
schema {
fields {
code = int
data = string
success = boolean
}
}
连接器将会生成如下格式数据:
| code | data | success |
|---|---|---|
| 200 | get success | true |
当指定格式为 text 时,可以选择是否指定schema参数。
例如, 当上游数据如下时:
200#get success#true
如果不指定schema参数,连接器将按照以下方式处理上游数据:
| content |
|---|
| 200#get success#true |
如果指定schema参数,此时需要同时配置schema和field_delimiter,如下所示:
field_delimiter = "#"
schema {
fields {
code = int
data = string
success = boolean
}
}
连接器将生成如下数据:
| content |
|---|
| {"code": 200, "data": "get success", "success": true} |
field_delimiter [string]
字段分隔符,用于告诉连接器如何分割字段。
目前仅当格式为text时需要配置。默认为","。
schema [config]
fields [config]
Redis 数据的 schema 字段列表。更多详情请参考 Schema 特性。
common options
源连接器插件通用参数,详情请参见 Source Common Options
示例
单表模式
简单使用示例:
Redis {
host = localhost
port = 6379
keys = "key_test*"
data_type = key
format = text
}
Redis {
host = localhost
port = 6379
keys = "key_test*"
data_type = key
format = json
schema {
fields {
name = string
age = int
}
}
}
读取 string 类型并附加到 list 示例:
source {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
keys = "string_test*"
data_type = string
batch_size = 33
}
}
sink {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
key = "string_test_list"
data_type = list
batch_size = 33
}
}
多表模式
示例 1:读取具有不同数据类型的多个 key pattern
env {
job.mode = "BATCH"
}
source {
Redis {
host = "localhost"
port = 6379
auth = "password"
db_num = 0
tables_configs = [
{
keys = "user:active:*"
data_type = STRING
format = JSON
batch_size = 50
schema {
fields {
id = int
name = string
email = string
created_at = timestamp
}
}
},
{
keys = "session:*"
data_type = HASH
hash_key_parse_mode = KV
read_key_enabled = true
key_field_name = "session_id"
schema {
fields {
session_id = string
user_id = int
ip_address = string
last_active = timestamp
}
}
},
{
keys = "queue:task:*"
data_type = LIST
format = TEXT
field_delimiter = "|"
}
]
}
}
sink {
Console {
parallelism = 1
}
}
示例 2:集群模式下的多表配置
source {
Redis {
mode = CLUSTER
nodes = ["node1:6379", "node2:6379", "node3:6379"]
auth = "cluster_password"
tables_configs = [
{
keys = "metric:cpu:*"
data_type = STRING
format = JSON
batch_size = 10
schema {
fields {
host = string
timestamp = timestamp
usage = double
}
}
},
{
keys = "metric:memory:*"
data_type = STRING
format = JSON
batch_size = 10
schema {
fields {
host = string
timestamp = timestamp
used = long
total = long
}
}
}
]
}
}
sink {
Console {}
}
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Improve][Connector-V2] Use key_field_name option when reading Redis hash data (#9642) | https://github.com/apache/seatunnel/commit/5d214a7305 | 2.3.12 |
| [Feature][Redis] Add redis key into the result record (#9574) | https://github.com/apache/seatunnel/commit/6e8b7c5da5 | 2.3.12 |
| [Fix][Connector-Redis] Redis did not write successfully, but the task did not fail (#9055) | https://github.com/apache/seatunnel/commit/07510ed937 | 2.3.11 |
| [hotfix][redis] fix npe cause by null host parameter (#8881) | https://github.com/apache/seatunnel/commit/7bd5865165 | 2.3.10 |
| [Improve][Redis] Optimized Redis connection params (#8841) | https://github.com/apache/seatunnel/commit/e56f06cdf0 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [improve] update Redis connector config option (#8631) | https://github.com/apache/seatunnel/commit/f1c313eea6 | 2.3.10 |
| [Feature][Redis] Flush data when the time reaches checkpoint.interval and update test case (#8308) | https://github.com/apache/seatunnel/commit/e15757bcd7 | 2.3.9 |
| Revert "[Feature][Redis] Flush data when the time reaches checkpoint interval" and "[Feature][CDC] Add 'schema-changes.enabled' options" (#8278) | https://github.com/apache/seatunnel/commit/fcb2938286 | 2.3.9 |
| [Feature][Redis] Flush data when the time reaches checkpoint.interval (#8198) | https://github.com/apache/seatunnel/commit/2e24941e6a | 2.3.9 |
| [Hotfix] Fix redis sink NPE (#8171) | https://github.com/apache/seatunnel/commit/6b9074e769 | 2.3.9 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Feature][Connector-Redis] Redis connector support delete data (#7994) | https://github.com/apache/seatunnel/commit/02a35c3979 | 2.3.9 |
| [Improve][Connector-V2] Redis support custom key and value (#7888) | https://github.com/apache/seatunnel/commit/ef2c3c7283 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [improve][Redis]Redis scan command supports versions 5, 6, 7 (#7666) | https://github.com/apache/seatunnel/commit/6e70cbe334 | 2.3.8 |
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
| [Improve][Redis] Redis reader use scan cammnd instead of keys, single mode reader/writer support batch (#7087) | https://github.com/apache/seatunnel/commit/be37f05c07 | 2.3.6 |
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
| [Improve][Connector-V2]Support multi-table sink feature for redis (#6314) | https://github.com/apache/seatunnel/commit/fed89ae3fc | 2.3.5 |
| [Feature][Core] Upgrade flink source translation (#5100) | https://github.com/apache/seatunnel/commit/5aabb14a94 | 2.3.4 |
| [Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on redis (#5901) | https://github.com/apache/seatunnel/commit/e84dcb8c10 | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
| [Improve][Connector-v2][Redis] Redis support select db (#5570) | https://github.com/apache/seatunnel/commit/77fbbbd0ee | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Feature][Connector-v2][RedisSink]Support redis to set expiration time. (#4975) | https://github.com/apache/seatunnel/commit/b5321ff1d2 | 2.3.3 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Improve][Connector-V2][Redis] Unified exception for redis source & sink exception (#3517) | https://github.com/apache/seatunnel/commit/205f782585 | 2.3.0 |
| options in conditional need add to required or optional options (#3501) | https://github.com/apache/seatunnel/commit/51d5bcba10 | 2.3.0 |
| [feature][api] add option validation for the ReadonlyConfig (#3417) | https://github.com/apache/seatunnel/commit/4f824fea36 | 2.3.0 |
| [Feature][Redis Connector V2] Add Redis Connector Option Rules & Improve Redis Connector doc (#3320) | https://github.com/apache/seatunnel/commit/1c10aacb30 | 2.3.0 |
| [Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f26 | 2.3.0 |
| [Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188) | https://github.com/apache/seatunnel/commit/c7275a49cc | 2.3.0 |
| [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755c | 2.2.0-beta |
| [Feature][Connector-V2] Add redis sink connector (#2647) | https://github.com/apache/seatunnel/commit/71a9e4b019 | 2.2.0-beta |
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
| [Feature][Connector-V2] Add redis source connector (#2569) | https://github.com/apache/seatunnel/commit/405f7d6f99 | 2.2.0-beta |