Skip to main content
Version: Next

IoTDB

IoTDB sink connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Description

Used to write data to IoTDB.

Key Features

  • exactly-once

    IoTDB supports the exactly-once feature through idempotent writing. If multiple data have the same key and timestamp, the latest one will overwrite the previous one.

Supported DataSource Info

DatasourceSupported VersionsUrl
IoTDB2.0 <= versionlocalhost:6667

Data Type Mapping

SeaTunnel Data TypeIoTDB Data Type
BOOLEANBOOLEAN
TINYINTINT32
SMALLINTINT32
INTINT32
BIGINTINT64
FLOATFLOAT
DOUBLEDOUBLE
STRINGSTRING
TIMESTAMPTIMESTAMP
DATEDATE

Sink Options

NameTypeRequiredDefaultDescription
node_urlsArrayYes-IoTDB cluster address, the format is ["host1:port"] or ["host1:port","host2:port"]
usernameStringYes-IoTDB username
passwordStringYes-IoTDB user password
sql_dialectStringNotreethe sql dialect of IoTDB, options available is "tree" or "table"
storage_groupStringYes-IoTDB-tree: Specify the device storage group(path prefix)
example: deviceId = \${storage_group} + "." + \${key_device}
IoTDB-table: Specify the database
key_deviceStringYes-IoTDB-tree: Specify the field name in SeaTunnelRow to be used as device id
IoTDB-table: Specify the field name in SeaTunnelRow to be used as table name
key_timestampStringNoprocessing timeIoTDB-tree: Specify the field name in SeaTunnelRow to be used as timestamp (processing time will be used by default)
IoTDB-table: Specify the field name in SeaTunnelRow to be used as time column (processing time will be used by default)
key_measurement_fieldsArrayNorefer to descriptionIoTDB-tree: Specify the field names in SeaTunnelRow to be used as measurement (all fields excluding key_device&key_timestamp will be used by default)
IoTDB-table: Specify the field names in SeaTunnelRow to be used as FIELD columns (all fields excluding key_device, key_timestamp, key_tag_fields and key_attribute_fields will be used by default)
key_tag_fieldsArrayNo-IoTDB-tree: invalid
IoTDB-table: Specify the field names in SeaTunnelRow to be used as TAG columns
key_attribute_fieldsArrayNo-IoTDB-tree: invalid
IoTDB-table: Specify the field names in SeaTunnelRow to be used as ATTRIBUTE columns
batch_sizeIntegerNo1024In batch writing, the data will be flushed into the IoTDB either when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms
max_retriesIntegerNo-The number of times retrying to flush
retry_backoff_multiplier_msIntegerNo-Used as a multiplier for generating the next delay for backoff
max_retry_backoff_msIntegerNo-The amount of time to wait before attempting to retry a request to IoTDB
default_thrift_buffer_sizeIntegerNo-Thrift init buffer size in IoTDB client
max_thrift_frame_sizeIntegerNo-Thrift max frame size in IoTDB client
zone_idStringNo-java.time.ZoneId in IoTDB client
enable_rpc_compressionBooleanNo-Enable rpc compression in IoTDB client, only valid in IoTDB-tree
connection_timeout_in_msIntegerNo-The maximum time (in ms) to wait when connecting to IoTDB
common-optionsno-Sink plugin common parameters, please refer to Sink Common Options for details

Examples

Example 1: Write data to IoTDB-tree

env {
parallelism = 2
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 16
bigint.template = [1664035200001]
schema = {
fields {
device_name = "string"
temperature = "float"
moisture = "int"
event_ts = "bigint"
c_string = "string"
c_boolean = "boolean"
c_tinyint = "tinyint"
c_smallint = "smallint"
c_int = "int"
c_bigint = "bigint"
c_float = "float"
c_double = "double"
}
}
}
}

The data format from upstream SeaTunnelRow is as follows:

device_nametemperaturemoistureevent_tsc_stringc_booleanc_tinyintc_smallintc_intc_bigintc_floatc_double
root.test_group.device_a36.11001664035200001abc1true11121474836481.01.0
root.test_group.device_b36.21011664035200001abc2false22221474836492.02.0
root.test_group.device_c36.31021664035200001abc3false33321474836493.03.0

Case 1

Only required options used:

  • use current processing time as timestamp
  • measurement fields include all fields excluding key_device
sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name" # specify the `deviceId` use device_name field
}
}

The data format of IoTDB output is as follows:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2023-09-01T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

Case 2

Use source event's time:

  • use key_timestamp as timestamp
  • measurement fields include all fields excluding key_device & key_timestamp
sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name" # specify the `deviceId` use device_name field
key_timestamp = "event_ts" # specify the `timestamp` use event_ts field
}
}

The data format of IoTDB output is as follows:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

Case 3

Use source event's time and limit measurement fields:

  • use key_timestamp as timestamp
  • measurement fields include only fields specified in key_measurement_fields
sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name"
key_timestamp = "event_ts"
key_measurement_fields = ["temperature", "moisture"]
}
}

The data format of IoTDB output is as follows:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+

Example 2: Write data into IoTDB-table

env {
parallelism = 2
job.mode = "BATCH"
}

source {
FakeSource {
...
schema = {
fields {
ts = timestamp
model_id = string
region = string
tag = string
status = boolean
arrival_date = date
temperature = double
}
}
}
}

The data format from upstream SeaTunnelRow is as follows:

tsmodel_idregiontagstatusarrival_datetemperature
2025-07-30T17:52:34.851id10700HKtag1true2024-11-124.34
2025-07-29T17:51:34.851id20700HKtag2false2024-12-015.54
2025-07-28T17:50:34.851id30700HKtag3false2024-12-227.34

Case 1

Only required options used:

  • use current processing time as timestamp
  • FIELD columns include all fields excluding key_device
sink {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
sql_dialect = "table"
storage_group = "test_database"
key_device = "region"
}
}

The data format of IoTDB output is as follows:

IoTDB> SELECT * FROM "test_database"."0700HK";
+-----------------------------+-----------------------+--------+----+------+------------+-----------+
| time| ts|model_id| tag|status|arrival_date|temperature|
+-----------------------------+-----------------------+--------+----+------+------------+-----------+
|2025-08-14T17:52:34.851+08:00|2025-07-30T17:52:34.851| id1|tag1| true| 2024-11-12| 4.34|
|2025-08-14T17:51:34.851+08:00|2025-07-29T17:51:34.851| id2|tag2| false| 2024-12-01| 5.54|
|2025-08-14T17:50:34.851+08:00|2025-07-28T17:50:34.851| id3|tag3| false| 2024-12-22| 7.34|
+-----------------------------+-----------------------+--------+----+------+------------+-----------+
IoTDB> DESC "test_database"."0700HK";
+------------+---------+--------+
| ColumnName| DataType|Category|
+------------+---------+--------+
| time|TIMESTAMP| TIME|
| ts|TIMESTAMP| FIELD|
| model_id| STRING| FIELD|
| tag| STRING| FIELD|
| status| BOOLEAN| FIELD|
|arrival_date| DATE| FIELD|
| temperature| DOUBLE| FIELD|
+------------+---------+--------+

Case 2

Use source event's time and limit TAG and ATTRIBUTE columns:

  • use key_timestamp as time column
  • use specified fields as TAG columns and ATTRIBUTE columns
  • FIELD columns include all fields excluding key_device,key_timestamp,key_tag_fieldsandkey_attribute_fields
sink {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
sql_dialect = "table"
storage_group = "test_database"
key_device = "region"
key_timestamp = "ts"
key_tag_fields = ["tag"]
key_attribute_fields = ["model_id"]
}
}

The data format of IoTDB output is as follows:

IoTDB> SELECT * FROM "test_database"."0700HK";
+-----------------------------+----+--------+------+------------+-----------+
| time| tag|model_id|status|arrival_date|temperature|
+-----------------------------+----+--------+------+------------+-----------+
|2025-07-30T17:52:34.851+08:00|tag1| id1| true| 2024-11-12| 4.34|
|2025-07-29T17:51:34.851+08:00|tag2| id2| false| 2024-12-01| 5.54|
|2025-07-28T17:50:34.851+08:00|tag3| id3| false| 2024-12-22| 7.34|
+-----------------------------+----+--------+------+------------+-----------+
IoTDB> DESC "test_database"."0700HK";
+------------+---------+---------+
| ColumnName| DataType| Category|
+------------+---------+---------+
| time|TIMESTAMP| TIME|
| tag| STRING| TAG|
| model_id| STRING|ATTRIBUTE|
| status| BOOLEAN| FIELD|
|arrival_date| DATE| FIELD|
| temperature| DOUBLE| FIELD|
+------------+---------+---------+

Case 3

Use source event's time and limit FIELD columns:

  • use key_timestamp as time column
  • use specified fields as FIELD columns
sink {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
sql_dialect = "table"
storage_group = "test_database"
key_device = "region"
key_timestamp = "ts"
key_measurement_fields = ["status", "temperature"]
}
}

The data format of IoTDB output is as follows:

IoTDB> SELECT * FROM "test_database"."0700HK";
+-----------------------------+------+-----------+
| time|status|temperature|
+-----------------------------+------+-----------+
|2025-07-30T17:52:34.851+08:00| true| 4.34|
|2025-07-29T17:51:34.851+08:00| false| 5.54|
|2025-07-28T17:50:34.851+08:00| false| 7.34|
+-----------------------------+------+-----------+
IoTDB> DESC "test_database"."0700HK";
+-----------+---------+--------+
| ColumnName| DataType|Category|
+-----------+---------+--------+
| time|TIMESTAMP| TIME|
| status| BOOLEAN| FIELD|
|temperature| DOUBLE| FIELD|
+-----------+---------+-------+

Changelog

Change Log
ChangeCommitVersion
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[improve] iotdb options (#8965)https://github.com/apache/seatunnel/commit/6e073935f42.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Doc] update iotdb document (#5404)https://github.com/apache/seatunnel/commit/856aedb3c92.3.4
[Improve][Connector-V2] Remove scheduler in IoTDB sink (#5270)https://github.com/apache/seatunnel/commit/299637868c2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Improve][SourceConnector] Unified schema parameter, update IoTDB sou… (#3896)https://github.com/apache/seatunnel/commit/a0959c5fd12.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector (#3557)https://github.com/apache/seatunnel/commit/7353fed6d62.3.0
[Feature][Connector V2] expose configurable options in IoTDB (#3387)https://github.com/apache/seatunnel/commit/06359ea76a2.3.0
[Improve][Connector-V2][IotDB]Add IotDB sink parameter check (#3412)https://github.com/apache/seatunnel/commit/91240a3dcb2.3.0
[Bug][Connector-v2] Fix IoTDB connector sink NPE (#3080)https://github.com/apache/seatunnel/commit/e5edf024332.3.0-beta
[Imporve][Connector-V2] Imporve iotdb connector (#2917)https://github.com/apache/seatunnel/commit/3da11ce19b2.3.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f755c2.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be69b2.2.0-beta
[chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538)https://github.com/apache/seatunnel/commit/7dc2a273882.2.0-beta
[Connectors-V2]Support IoTDB Source (#2431)https://github.com/apache/seatunnel/commit/7b78d6c9222.2.0-beta
[Feature][Connector-V2] Support IoTDB sink (#2407)https://github.com/apache/seatunnel/commit/c1bbbd59d52.2.0-beta