IoTDB
IoTDB数据接收器
支持引擎
Spark
Flink
SeaTunnel Zeta
描述
将数据写入IoTDB。
Using Dependency
适用于Spark/Flink引擎
- 您需要确保jdbc驱动程序jar包已放置在目录
 ${SEATUNNEL_HOME}/plugins/中。
适用于SeaTunnelZeta引擎
- 您需要确保jdbc驱动程序jar包已放置在目录“${SEATUNNEL_HOME}/lib/”中。
 
主要特性
IoTDB通过幂等写支持 exactly-once 功能。如果两条数据
如果使用相同的key和timestamp,新数据将覆盖旧数据。
:::提示
IoTDB和Spark之间存在节俭版本冲突。因此,您需要执行rm -f $SPARK_HOME/jars/libthrift* 和 cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/来解决这个问题。
:::
支持的数据源信息
| 数据源 | Supported 版本 | 地址 | 
|---|---|---|
| IoTDB | >= 0.13.0 | localhost:6667 | 
数据类型映射
| IotDB 数据类型 | SeaTunnel 数据类型 | 
|---|---|
| BOOLEAN | BOOLEAN | 
| INT32 | TINYINT | 
| INT32 | SMALLINT | 
| INT32 | INT | 
| INT64 | BIGINT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| TEXT | STRING | 
Sink 选项
| 名称 | 类型 | 是否必传 | 默认值 | 描述 | 
|---|---|---|---|---|
| node_urls | String | 是 | - | IoTDB 集群地址,格式为 "host1:port" 或 "host1:port,host2:port" | 
| username | String | 是 | - | IoTDB 用户的用户名 | 
| password | String | 是 | - | IoTDB 用户的密码 | 
| key_device | String | 是 | - | 在SeaTunnelRow中指定IoTDB设备ID的字段名 | 
| key_timestamp | String | 否 | processing time | 在SeaTunnelRow中指定IoTDB时间戳的字段名。如果未指定,则使用处理时间作为时间戳 | 
| key_measurement_fields | Array | 否 | exclude device & timestamp | 在SeaTunnelRow中指定IoTDB测量列表的字段名称。如果未指定,则包括所有字段,但排除 device & timestamp | 
| storage_group | Array | 否 | - | 指定设备存储组(路径前缀)  例如: deviceId = ${storage_group} + "." + ${key_device}  | 
| batch_size | Integer | 否 | 1024 | 对于批写入,当缓冲区的数量达到batch_size的数量或时间达到batch_interval_ms时,数据将被刷新到IoTDB中 | 
| max_retries | Integer | 否 | - | 刷新的重试次数 failed | 
| retry_backoff_multiplier_ms | Integer | 否 | - | 用作生成下一个退避延迟的乘数 | 
| max_retry_backoff_ms | Integer | 否 | - | 尝试重试对IoTDB的请求之前等待的时间量 | 
| default_thrift_buffer_size | Integer | 否 | - | 在IoTDB客户端中节省初始化缓冲区大小 | 
| max_thrift_frame_size | Integer | 否 | - | 在IoTDB客户端中节约最大帧大小 | 
| zone_id | string | 否 | - | IoTDB java.time.ZoneId  client | 
| enable_rpc_compression | Boolean | 否 | - | 在IoTDB客户端中启用rpc压缩 | 
| connection_timeout_in_ms | Integer | 否 | - | 连接到IoTDB时等待的最长时间(毫秒) | 
| common-options | 否 | - | Sink插件常用参数,详见[Sink common Options](../Sink common Options.md) | 
示例
env {
  parallelism = 2
  job.mode = "BATCH"
}
source {
  FakeSource {
    row.num = 16
    bigint.template = [1664035200001]
    schema = {
      fields {
        device_name = "string"
        temperature = "float"
        moisture = "int"
        event_ts = "bigint"
        c_string = "string"
        c_boolean = "boolean"
        c_tinyint = "tinyint"
        c_smallint = "smallint"
        c_int = "int"
        c_bigint = "bigint"
        c_float = "float"
        c_double = "double"
      }
    }
  }
}
上游SeaTunnelRow数据格式如下:
| device_name | temperature | moisture | event_ts | c_string | c_boolean | c_tinyint | c_smallint | c_int | c_bigint | c_float | c_double | 
|---|---|---|---|---|---|---|---|---|---|---|---|
| root.test_group.device_a | 36.1 | 100 | 1664035200001 | abc1 | true | 1 | 1 | 1 | 2147483648 | 1.0 | 1.0 | 
| root.test_group.device_b | 36.2 | 101 | 1664035200001 | abc2 | false | 2 | 2 | 2 | 2147483649 | 2.0 | 2.0 | 
| root.test_group.device_c | 36.3 | 102 | 1664035200001 | abc3 | false | 3 | 3 | 3 | 2147483649 | 3.0 | 3.0 | 
案例1
只填写所需的配置。
使用当前处理时间作为时间戳。并包括所有字段,但不包括device & timestamp作为测量字段
sink {
  IoTDB {
    node_urls = "localhost:6667"
    username = "root"
    password = "root"
    key_device = "device_name" # specify the `deviceId` use device_name field
  }
}
"IoTDB"数据格式的输出如下:
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|                    Time|                  Device|   temperature|   moisture|      event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int|   c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2023-09-01T00:00:00.001Z|root.test_group.device_a|          36.1|        100| 1664035200001|     abc1|      true|         1|          1|     1| 2147483648|     1.0|      1.0| 
|2023-09-01T00:00:00.001Z|root.test_group.device_b|          36.2|        101| 1664035200001|     abc2|     false|         2|          2|     2| 2147483649|     2.0|      2.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_c|          36.3|        102| 1664035200001|     abc2|     false|         3|          3|     3| 2147483649|     3.0|      3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+
案例2
使用源事件的时间
sink {
  IoTDB {
    node_urls = "localhost:6667"
    username = "root"
    password = "root"
    key_device = "device_name" # specify the `deviceId` use device_name field
    key_timestamp = "event_ts" # specify the `timestamp` use event_ts field
  }
}
"IoTDB"数据格式的输出如下:
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|                    Time|                  Device|   temperature|   moisture|      event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int|   c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100| 1664035200001|     abc1|      true|         1|          1|     1| 2147483648|     1.0|      1.0| 
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101| 1664035200001|     abc2|     false|         2|          2|     2| 2147483649|     2.0|      2.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102| 1664035200001|     abc2|     false|         3|          3|     3| 2147483649|     3.0|      3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+
案例3
使用源事件的时间和限制度量字段
sink {
  IoTDB {
    node_urls = "localhost:6667"
    username = "root"
    password = "root"
    key_device = "device_name"
    key_timestamp = "event_ts"
    key_measurement_fields = ["temperature", "moisture"]
  }
}
"IoTDB"数据格式的输出如下:
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
|                    Time|                  Device|   temperature|   moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102|
+------------------------+------------------------+--------------+-----------+
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| [improve] iotdb options (#8965) | https://github.com/apache/seatunnel/commit/6e073935f | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 | 
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 | 
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 | 
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 | 
| [Doc] update iotdb document (#5404) | https://github.com/apache/seatunnel/commit/856aedb3c | 2.3.4 | 
| [Improve][Connector-V2] Remove scheduler in IoTDB sink (#5270) | https://github.com/apache/seatunnel/commit/299637868 | 2.3.4 | 
| [Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf7 | 2.3.3 | 
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 | 
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 | 
| [Improve][SourceConnector] Unified schema parameter, update IoTDB sou… (#3896) | https://github.com/apache/seatunnel/commit/a0959c5fd | 2.3.1 | 
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 | 
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba | 2.3.1 | 
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 | 
| [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector (#3557) | https://github.com/apache/seatunnel/commit/7353fed6d | 2.3.0 | 
| [Feature][Connector V2] expose configurable options in IoTDB (#3387) | https://github.com/apache/seatunnel/commit/06359ea76 | 2.3.0 | 
| [Improve][Connector-V2][IotDB]Add IotDB sink parameter check (#3412) | https://github.com/apache/seatunnel/commit/91240a3dc | 2.3.0 | 
| [Bug][Connector-v2] Fix IoTDB connector sink NPE (#3080) | https://github.com/apache/seatunnel/commit/e5edf0243 | 2.3.0-beta | 
| [Imporve][Connector-V2] Imporve iotdb connector (#2917) | https://github.com/apache/seatunnel/commit/3da11ce19 | 2.3.0-beta | 
| [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755 | 2.2.0-beta | 
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta | 
| [chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538) | https://github.com/apache/seatunnel/commit/7dc2a2738 | 2.2.0-beta | 
| [Connectors-V2]Support IoTDB Source (#2431) | https://github.com/apache/seatunnel/commit/7b78d6c92 | 2.2.0-beta | 
| [Feature][Connector-V2] Support IoTDB sink (#2407) | https://github.com/apache/seatunnel/commit/c1bbbd59d | 2.2.0-beta |