Redis
Redis source connector
Description
Used to read data from Redis.
Key features
Options
| name | type | required | default value |
|---|---|---|---|
| host | string | yes when mode=single | - |
| port | int | no | 6379 |
| keys | string | yes | - |
| read_key_enabled | boolean | no | false |
| key_field_name | string | yes when read_key_enabled=true | key |
| batch_size | int | yes | 10 |
| data_type | string | yes | - |
| user | string | no | - |
| auth | string | no | - |
| db_num | int | no | 0 |
| mode | string | no | single |
| hash_key_parse_mode | string | no | all |
| nodes | list | yes when mode=cluster | - |
| schema | config | yes when format=json | - |
| format | string | no | json |
| single_field_name | string | yes when read_key_enabled=true | - |
| common-options | no | - |
host [string]
redis host
port [int]
redis port
hash_key_parse_mode [string]
hash key parse mode, support all kv, used to tell connector how to parse hash key.
when setting it to all, connector will treat the value of hash key as a row and use the schema config to parse it, when setting it to kv, connector will treat each kv in hash key as a row and use the schema config to parse it:
for example, if the value of hash key is the following shown:
{
"001": {
"name": "tyrantlucifer",
"age": 26
},
"002": {
"name": "Zongwen",
"age": 26
}
}
if hash_key_parse_mode is all and schema config as the following shown, it will generate the following data:
schema {
fields {
001 {
name = string
age = int
}
002 {
name = string
age = int
}
}
}
| 001 | 002 |
|---|---|
| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |
if hash_key_parse_mode is kv and schema config as the following shown, it will generate the following data:
schema {
fields {
hash_key = string
name = string
age = int
}
}
| hash_key | name | age |
|---|---|---|
| 001 | tyrantlucifer | 26 |
| 002 | Zongwen | 26 |
each kv that in hash key it will be treated as a row and send it to upstream.
Tips: connector will use the first field information of schema config as the field name of each k that in each kv
keys [string]
keys pattern
read_key_enabled [boolean]
This option determines whether the Redis source connector includes the Redis key in each output record when reading data.
When set to true, both the key and its associated value are included in the record.
By default (false), only the value is read and included.
If you are using a single-value Redis data type (such as string, int, etc.) with read_key_enabled = true,
you must also specify single_field_name to map the value to a schema column, and key_field_name to map the Redis key.
Note: When read_key_enabled = true, the schema configuration must explicitly include the key field to correctly map the deserialized data.
Example :
schema {
fields {
key = string
value = string
}
}
key_field_name [string]
Specifies the field name to store the Redis key in the output record when read_key_enabled = true or data_type = hash.
When read_key_enabled = true, the default field name will be
key.When data_type = hash and this option is not set, the default field name will be
hash_key.
This field is useful when the default field name conflicts with existing schema fields, or if a more descriptive name is preferred.
Example :
key_field_name = custom_key
hash_key_parse_mode = kv
format = "json"
schema = {
fields {
custom_key = string
name = string
}
}
batch_size [int]
indicates the number of keys to attempt to return per iteration,default 10
Tips:Redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type
data_type [string]
redis data types, support key hash list set zset
- key
The value of each key will be sent downstream as a single row of data. For example, the value of key is
SeaTunnel test message, the data received downstream isSeaTunnel test messageand only one message will be received.
- hash
The hash key-value pairs will be formatted as json to be sent downstream as a single row of data. For example, the value of hash is
name:tyrantlucifer age:26, the data received downstream is{"name":"tyrantlucifer", "age":"26"}and only one message will be received.
- list
Each element in the list will be sent downstream as a single row of data. For example, the value of list is
[tyrantlucier, CalvinKirs], the data received downstream aretyrantluciferandCalvinKirsand only two message will be received.
- set
Each element in the set will be sent downstream as a single row of data For example, the value of set is
[tyrantlucier, CalvinKirs], the data received downstream aretyrantluciferandCalvinKirsand only two message will be received.
- zset
Each element in the sorted set will be sent downstream as a single row of data For example, the value of sorted set is
[tyrantlucier, CalvinKirs], the data received downstream aretyrantluciferandCalvinKirsand only two message will be received.
user [string]
redis authentication user, you need it when you connect to an encrypted cluster
auth [string]
redis authentication password, you need it when you connect to an encrypted cluster
db_num [int]
Redis database index ID. It is connected to db 0 by default
mode [string]
redis mode, single or cluster, default is single
nodes [list]
redis nodes information, used in cluster mode, must like as the following format:
["host1:port1", "host2:port2"]
format [string]
the format of upstream data, now only support json text, default json.
when you assign format is json, you should also assign schema option, for example:
upstream data is the following:
{"code": 200, "data": "get success", "success": true}
you should assign schema as the following:
schema {
fields {
code = int
data = string
success = boolean
}
}
connector will generate data as the following:
| code | data | success |
|---|---|---|
| 200 | get success | true |
when you assign format is text, connector will do nothing for upstream data, for example:
upstream data is the following:
{"code": 200, "data": "get success", "success": true}
connector will generate data as the following:
| content |
|---|
| {"code": 200, "data": "get success", "success": true} |
schema [config]
fields [config]
the schema fields of redis data
single_field_name [string]
Specifies the field name for Redis values when read_key_enabled = true and the value is a single primitive (e.g., string, int).
This name is used in the schema to map the value field.
Note: This option has no effect when reading complex Redis data types such as hashes or objects that can be directly mapped to a schema.
Example :
read_key_enabled = true
key_field_name = key
single_field_name = value
schema {
fields {
key = string
value = string
}
}
common options
Source plugin common parameters, please refer to Source Common Options for details
Example
simple:
Redis {
host = localhost
port = 6379
keys = "key_test*"
data_type = key
format = text
}
Redis {
host = localhost
port = 6379
keys = "key_test*"
data_type = key
format = json
schema {
fields {
name = string
age = int
}
}
}
read string type keys write append to list
source {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
keys = "string_test*"
data_type = string
batch_size = 33
}
}
sink {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
key = "string_test_list"
data_type = list
batch_size = 33
}
}
Changelog
Change Log
| Change | Commit | Version |
|---|---|---|
| [Improve][Connector-V2] Use key_field_name option when reading Redis hash data (#9642) | https://github.com/apache/seatunnel/commit/5d214a7305 | 2.3.12 |
| [Feature][Redis] Add redis key into the result record (#9574) | https://github.com/apache/seatunnel/commit/6e8b7c5da5 | 2.3.12 |
| [Fix][Connector-Redis] Redis did not write successfully, but the task did not fail (#9055) | https://github.com/apache/seatunnel/commit/07510ed937 | 2.3.11 |
| [hotfix][redis] fix npe cause by null host parameter (#8881) | https://github.com/apache/seatunnel/commit/7bd5865165 | 2.3.10 |
| [Improve][Redis] Optimized Redis connection params (#8841) | https://github.com/apache/seatunnel/commit/e56f06cdf0 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [improve] update Redis connector config option (#8631) | https://github.com/apache/seatunnel/commit/f1c313eea6 | 2.3.10 |
| [Feature][Redis] Flush data when the time reaches checkpoint.interval and update test case (#8308) | https://github.com/apache/seatunnel/commit/e15757bcd7 | 2.3.9 |
| Revert "[Feature][Redis] Flush data when the time reaches checkpoint interval" and "[Feature][CDC] Add 'schema-changes.enabled' options" (#8278) | https://github.com/apache/seatunnel/commit/fcb2938286 | 2.3.9 |
| [Feature][Redis] Flush data when the time reaches checkpoint.interval (#8198) | https://github.com/apache/seatunnel/commit/2e24941e6a | 2.3.9 |
| [Hotfix] Fix redis sink NPE (#8171) | https://github.com/apache/seatunnel/commit/6b9074e769 | 2.3.9 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Feature][Connector-Redis] Redis connector support delete data (#7994) | https://github.com/apache/seatunnel/commit/02a35c3979 | 2.3.9 |
| [Improve][Connector-V2] Redis support custom key and value (#7888) | https://github.com/apache/seatunnel/commit/ef2c3c7283 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [improve][Redis]Redis scan command supports versions 5, 6, 7 (#7666) | https://github.com/apache/seatunnel/commit/6e70cbe334 | 2.3.8 |
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
| [Improve][Redis] Redis reader use scan cammnd instead of keys, single mode reader/writer support batch (#7087) | https://github.com/apache/seatunnel/commit/be37f05c07 | 2.3.6 |
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
| [Improve][Connector-V2]Support multi-table sink feature for redis (#6314) | https://github.com/apache/seatunnel/commit/fed89ae3fc | 2.3.5 |
| [Feature][Core] Upgrade flink source translation (#5100) | https://github.com/apache/seatunnel/commit/5aabb14a94 | 2.3.4 |
| [Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on redis (#5901) | https://github.com/apache/seatunnel/commit/e84dcb8c10 | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
| [Improve][Connector-v2][Redis] Redis support select db (#5570) | https://github.com/apache/seatunnel/commit/77fbbbd0ee | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Feature][Connector-v2][RedisSink]Support redis to set expiration time. (#4975) | https://github.com/apache/seatunnel/commit/b5321ff1d2 | 2.3.3 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Improve][Connector-V2][Redis] Unified exception for redis source & sink exception (#3517) | https://github.com/apache/seatunnel/commit/205f782585 | 2.3.0 |
| options in conditional need add to required or optional options (#3501) | https://github.com/apache/seatunnel/commit/51d5bcba10 | 2.3.0 |
| [feature][api] add option validation for the ReadonlyConfig (#3417) | https://github.com/apache/seatunnel/commit/4f824fea36 | 2.3.0 |
| [Feature][Redis Connector V2] Add Redis Connector Option Rules & Improve Redis Connector doc (#3320) | https://github.com/apache/seatunnel/commit/1c10aacb30 | 2.3.0 |
| [Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f26 | 2.3.0 |
| [Improve][Connector-V2][Redis] Support redis cluster connection & user authentication (#3188) | https://github.com/apache/seatunnel/commit/c7275a49cc | 2.3.0 |
| [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755c | 2.2.0-beta |
| [Feature][Connector-V2] Add redis sink connector (#2647) | https://github.com/apache/seatunnel/commit/71a9e4b019 | 2.2.0-beta |
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
| [Feature][Connector-V2] Add redis source connector (#2569) | https://github.com/apache/seatunnel/commit/405f7d6f99 | 2.2.0-beta |