Skip to main content
Version: Next

IoTDB

IoTDB source connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Description

Used to read data from IoTDB.

Key features

Supported DataSource Info

DatasourceSupported VersionsUrl
IoTDB2.0 <= versionlocalhost:6667

Data Type Mapping

IotDB Data TypeSeaTunnel Data Type
BOOLEANBOOLEAN
INT32TINYINT
INT32SMALLINT
INT32INT
INT64BIGINT
FLOATFLOAT
DOUBLEDOUBLE
TEXTSTRING
STRINGSTRING
TIMESTAMPBIGINT
TIMESTAMPTIMESTAMP
BLOBSTRING
DATEDATE

Source Options

NameTypeRequiredDefault ValueDescription
node_urlsArrayYes-IoTDB cluster address, the format is ["host1:port"] or ["host1:port","host2:port"]
usernameStringYes-IoTDB username
passwordStringYes-IoTDB user password
sql_dialectStringNotreeThe sql dialect of IoTDB, options available is "tree" or "table"
databaseStringNo-The database selected (only valid when sql_dielct is "table")
sqlStringYes-The sql statement to be executed
schemaConfigYes-The data schema
fetch_sizeIntegerNo-The fetch_size of the IoTDB when you select
lower_boundLongNo-The lower_bound of the IoTDB when you select
upper_boundLongNo-The upper_bound of the IoTDB when you select
num_partitionsIntegerNo-The num_partitions of the IoTDB when you select
default_thrift_buffer_sizeIntegerNo-The thrift_default_buffer_size of the IoTDB when you select
max_thrift_frame_sizeIntegerNo-The thrift max frame size
enable_cache_leaderBooleanNo-Enable_cache_leader of the IoTDB when you select
common-optionsno-Source plugin common parameters, please refer to Source Common Options for details

We can use time column as a partition key in SQL queries.

num_partitions [int]

the number of partitions

upper_bound [long]

the upper bound of the time range

lower_bound [long]

the lower bound of the time range

     split the time range into numPartitions parts
if numPartitions = 1, the whole time range will be used
if numPartitions < (upper_bound - lower_bound), will use (upper_bound - lower_bound) as numPartitions

eg: lower_bound = 1, upper_bound = 10, numPartitions = 2
sql = "select * from test where age > 0 and age < 10"

split result:
split 1: select * from test where (time >= 1 and time < 6) and ( age > 0 and age < 10 )
split 2: select * from test where (time >= 6 and time < 11) and ( age > 0 and age < 10 )

Examples

Example 1: Read data from IoTDB-tree

env {
parallelism = 2
job.mode = "BATCH"
}

source {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
sql = "SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device"
schema {
fields {
ts = timestamp
device_name = string
temperature = float
moisture = bigint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_string = string
c_boolean = boolean
}
}
}
}

sink {
Console {
}
}

The data format from upstream IoTDB is as follows:

IoTDB> SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device;
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
| Time| Device| temperature| moisture| c_int| c_bigint| c_float| c_double| c_string| c_boolean|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1| 21474836470| 1.0f| 1.0d| abc| true|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 2| 21474836470| 2.0f| 2.0d| abc| true|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 3| 21474836470| 3.0f| 3.0d| abc| true|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+

The data format loaded to SeaTunnelRow is as follows:

tsdevice_nametemperaturemoisturec_intc_bigintc_floatc_doublec_stringc_boolean
1664035200001root.test_group.device_a36.11001214748364701.0f1.0dabctrue
1664035200001root.test_group.device_b36.21012214748364702.0f2.0dabctrue
1664035200001root.test_group.device_c36.31023214748364703.0f3.0dabctrue

Example 2:Read data from IoTDB-table

env {
parallelism = 2
job.mode = "BATCH"
}

source {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
sql_dialect = "table"
database = "test_database"
sql = "SELECT time, sn, type, bidprice, bidsize, domain, buyno, askprice FROM test_table"
schema {
fields {
ts = timestamp
sn = string
type = string
bidprice = int
bidsize = double
domain = boolean
buyno = bigint
askprice = string
}
}
}
}

sink {
Console {
}
}

If database is specified in SQL query, the database option is not required.

The data format from upstream IoTDB is as follows:

IoTDB> SELECT time, sn, type, bidprice, bidsize, domain, buyno, askprice FROM test_table
+-----------------------------+------+----+--------+------------------+------+-----+-----------+
| time| sn|type|bidprice| bidsize|domain|buyno| askprice|
+-----------------------------+------+----+--------+------------------+------+-----+-----------+
|2025-07-30T17:52:34.851+08:00|0700HK| L1| 9|10.323907796459721| true| 10|-1064754527|
|2025-07-30T17:52:34.951+08:00|0700HK| L1| 10| 9.844574317657585| false| 9|-1088662576|
|2025-07-30T17:52:35.051+08:00|0700HK| L1| 9| 9.272974132434069| true| 9| 402003616|
+-----------------------------+------+----+--------+------------------+------+-----+-----------+

The data format loaded to SeaTunnelRow is as follows:

tssntypebidpricebidsizedomainbuynoaskprice
2025-07-30T17:52:34.8510700HKL1910.323907796459721true10-1064754527
2025-07-30T17:52:34.9510700HKL1109.844574317657585false9-1088662576
2025-07-30T17:52:35.0510700HKL199.272974132434069true9402003616

Changelog

Change Log
ChangeCommitVersion
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[improve] iotdb options (#8965)https://github.com/apache/seatunnel/commit/6e073935f42.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Doc] update iotdb document (#5404)https://github.com/apache/seatunnel/commit/856aedb3c92.3.4
[Improve][Connector-V2] Remove scheduler in IoTDB sink (#5270)https://github.com/apache/seatunnel/commit/299637868c2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Improve][SourceConnector] Unified schema parameter, update IoTDB sou… (#3896)https://github.com/apache/seatunnel/commit/a0959c5fd12.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector (#3557)https://github.com/apache/seatunnel/commit/7353fed6d62.3.0
[Feature][Connector V2] expose configurable options in IoTDB (#3387)https://github.com/apache/seatunnel/commit/06359ea76a2.3.0
[Improve][Connector-V2][IotDB]Add IotDB sink parameter check (#3412)https://github.com/apache/seatunnel/commit/91240a3dcb2.3.0
[Bug][Connector-v2] Fix IoTDB connector sink NPE (#3080)https://github.com/apache/seatunnel/commit/e5edf024332.3.0-beta
[Imporve][Connector-V2] Imporve iotdb connector (#2917)https://github.com/apache/seatunnel/commit/3da11ce19b2.3.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f755c2.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be69b2.2.0-beta
[chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538)https://github.com/apache/seatunnel/commit/7dc2a273882.2.0-beta
[Connectors-V2]Support IoTDB Source (#2431)https://github.com/apache/seatunnel/commit/7b78d6c9222.2.0-beta
[Feature][Connector-V2] Support IoTDB sink (#2407)https://github.com/apache/seatunnel/commit/c1bbbd59d52.2.0-beta