Skip to main content
Version: Next

InfluxDB

InfluxDB sink connector

Description​

Write data to InfluxDB.

Key features​

Options​

nametyperequireddefault value
urlstringyes-
databasestringyes
measurementstringyes
usernamestringno-
passwordstringno-
key_timestringnoprocessing time
key_tagsarraynoexclude field & key_time
batch_sizeintno1024
max_retriesintno-
retry_backoff_multiplier_msintno-
connect_timeout_mslongno15000
common-optionsconfigno-

url​

the url to connect to influxDB e.g.

http://influxdb-host:8086

database [string]​

The name of influxDB database

measurement [string]​

The name of influxDB measurement

username [string]​

influxDB user username

password [string]​

influxDB user password

key_time [string]​

Specify field-name of the influxDB measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

key_tags [array]​

Specify field-name of the influxDB measurement tags in SeaTunnelRow. If not specified, include all fields with influxDB measurement field

batch_size [int]​

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches checkpoint.interval, the data will be flushed into the influxDB

max_retries [int]​

The number of retries to flush failed

retry_backoff_multiplier_ms [int]​

Using as a multiplier for generating the next delay for backoff

max_retry_backoff_ms [int]​

The amount of time to wait before attempting to retry a request to influxDB

connect_timeout_ms [long]​

the timeout for connecting to InfluxDB, in milliseconds

common options​

Sink plugin common parameters, please refer to Sink Common Options for details

Examples​

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}

Multiple table​

example1​

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "${table_name}_test"
}
}

Changelog​

next version​

  • Add InfluxDB Sink Connector