Skip to main content
Version: 2.3.1

IoTDB

IoTDB sink connector

Description

Used to write data to IoTDB.

tip

There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute rm -f $SPARK_HOME/jars/libthrift* and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/ to resolve it.

Key features

IoTDB supports the exactly-once feature through idempotent writing. If two pieces of data have the same key and timestamp, the new data will overwrite the old one.

Options

nametyperequireddefault value
node_urlslistyes-
usernamestringyes-
passwordstringyes-
key_devicestringyes-
key_timestampstringnoprocessing time
key_measurement_fieldsarraynoexclude device & timestamp
storage_groupstringno-
batch_sizeintno1024
batch_interval_msintno-
max_retriesintno-
retry_backoff_multiplier_msintno-
max_retry_backoff_msintno-
default_thrift_buffer_sizeintno-
max_thrift_frame_sizeintno-
zone_idstringno-
enable_rpc_compressionbooleanno-
connection_timeout_in_msintno-
common-optionsno-

node_urls [list]

IoTDB cluster address, the format is ["host:port", ...]

username [string]

IoTDB user username

password [string]

IoTDB user password

key_device [string]

Specify field name of the IoTDB deviceId in SeaTunnelRow

key_timestamp [string]

Specify field-name of the IoTDB timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

key_measurement_fields [array]

Specify field-name of the IoTDB measurement list in SeaTunnelRow. If not specified, include all fields but exclude device & timestamp

storage_group [string]

Specify device storage group(path prefix)

example: deviceId = ${storage_group} + "." + ${key_device}

batch_size [int]

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB

batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB

max_retries [int]

The number of retries to flush failed

retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to IoTDB

default_thrift_buffer_size [int]

Thrift init buffer size in IoTDB client

max_thrift_frame_size [int]

Thrift max frame size in IoTDB client

zone_id [string]

java.time.ZoneId in IoTDB client

enable_rpc_compression [boolean]

Enable rpc compression in IoTDB client

connection_timeout_in_ms [int]

The maximum time (in ms) to wait when connecting to IoTDB

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Examples

Case1

Common options:

sink {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
batch_size = 1024
batch_interval_ms = 1000
}
}

When you assign key_device is device_name, for example:

sink {
IoTDB {
...
key_device = "device_name"
}
}

Upstream SeaTunnelRow data format is the following:

device_namefield_1field_2
root.test_group.device_a10011002
root.test_group.device_b20012002
root.test_group.device_c30013002

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
| Time| Device| field_1| field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
+------------------------+------------------------+----------+-----------+

Case2

When you assign key_devicekey_timestampkey_measurement_fields, for example:

sink {
IoTDB {
...
key_device = "device_name"
key_timestamp = "ts"
key_measurement_fields = ["temperature", "moisture"]
}
}

Upstream SeaTunnelRow data format is the following:

tsdevice_namefield_1field_2temperaturemoisture
1664035200001root.test_group.device_a1001100236.1100
1664035200001root.test_group.device_b2001200236.2101
1664035200001root.test_group.device_c3001300236.3102

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+

Changelog

2.2.0-beta 2022-09-26

  • Add IoTDB Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] Improve IoTDB Sink Connector (2917)
    • Support align by sql syntax
    • Support sql split ignore case
    • Support restore split offset to at-least-once
    • Support read timestamp from RowRecord
  • [BugFix] Fix IoTDB connector sink NPE (3080)