Clickhouse
Clickhouse 数据连接器
支持引擎
Spark
Flink
SeaTunnel Zeta
核心特性
Clickhouse sink 插件通过实现幂等写入可以达到精准一次,需要配合 aggregating merge tree 支持重复数据删除的引擎。
描述
用于将数据写入 Clickhouse。
支持的数据源信息
为了使用 Clickhouse 连接器,需要以下依赖项。它们可以通过 install-plugin.sh 或从 Maven 中央存储库下载。
| 数据源 | 支持的版本 | 依赖 | 
|---|---|---|
| Clickhouse | universal | 下载 | 
数据类型映射
| SeaTunnel 数据类型 | Clickhouse 数据类型 | 
|---|---|
| STRING | String / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygon | 
| INT | Int8 / UInt8 / Int16 / UInt16 / Int32 | 
| BIGINT | UInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecond | 
| DOUBLE | Float64 | 
| DECIMAL | Decimal | 
| FLOAT | Float32 | 
| DATE | Date | 
| TIME | DateTime | 
| ARRAY | Array | 
| MAP | Map | 
输出选项
| 名称 | 类型 | 是否必须 | 默认值 | 描述 | 
|---|---|---|---|---|
| host | String | Yes | - | ClickHouse集群地址, 格式是host:port, 允许多个hosts配置. 例如"host1:8123,host2:8123". | 
| database | String | Yes | - | ClickHouse数据库名称. | 
| table | String | Yes | - | 表名称. | 
| username | String | Yes | - | ClickHouse用户账号. | 
| password | String | Yes | - | ClickHouse用户密码. | 
| clickhouse.config | Map | No | 除了上述必须由 clickhouse-jdbc指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了clickhouse-jdbc提供的所有参数. | |
| bulk_size | String | No | 20000 | 每次通过Clickhouse-jdbc 写入的行数,即默认是20000. | 
| split_mode | String | No | false | 此模式仅支持引擎为 Distributed的clickhouse表。选项internal_replication应该是true。他们将在 seatunnel 中拆分分布式表数据,并直接对每个分片进行写入。分片权重定义为clickhouse将计算在内。 | 
| sharding_key | String | No | - | 使用 split_mode时,将数据发送到哪个节点是个问题,默认为随机选择,但可以使用sharding_key参数来指定分片算法的字段。此选项仅在split_mode为true时有效. | 
| primary_key | String | No | - | 标记 clickhouse表中的主键列,并根据主键执行INSERT/UPDATE/DELETE到clickhouse表. | 
| support_upsert | Boolean | No | false | 支持按查询主键更新插入行. | 
| allow_experimental_lightweight_delete | Boolean | No | false | 允许基于 MergeTree表引擎实验性轻量级删除. | 
| common-options | No | - | Sink插件查用参数,详见Sink常用选项. | 
如何创建一个clickhouse 同步任务
以下示例演示如何创建将随机生成的数据写入Clickhouse数据库的数据同步作业。
# Set the basic configuration of the task to be performed
env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval  = 1000
}
source {
  FakeSource {
      row.num = 2
      bigint.min = 0
      bigint.max = 10000000
      split.num = 1
      split.read-interval = 300
      schema {
        fields {
          c_bigint = bigint
        }
      }
    }
}
sink {
  Clickhouse {
    host = "127.0.0.1:9092"
    database = "default"
    table = "test"
    username = "xxxxx"
    password = "xxxxx"
  }
}
小提示
1.SeaTunnel 部署文档.
2.需要在同步前提前创建要写入的表.
3.当写入 ClickHouse 表,无需设置其结构,因为连接器会在写入前向 ClickHouse 查询当前表的结构信息.
Clickhouse 接收器配置
sink {
  Clickhouse {
    host = "localhost:8123"
    database = "default"
    table = "fake_all"
    username = "xxxxx"
    password = "xxxxx"
    clickhouse.config = {
      max_rows_to_read = "100"
      read_overflow_mode = "throw"
    }
  }
}
切分模式
sink {
  Clickhouse {
    host = "localhost:8123"
    database = "default"
    table = "fake_all"
    username = "xxxxx"
    password = "xxxxx"
    
    # split mode options
    split_mode = true
    sharding_key = "age"
  }
}
CDC(Change data capture) Sink
sink {
  Clickhouse {
    host = "localhost:8123"
    database = "default"
    table = "fake_all"
    username = "xxxxx"
    password = "xxxxx"
    
    # cdc options
    primary_key = "id"
    support_upsert = true
  }
}
CDC(Change data capture) for *MergeTree engine
sink {
  Clickhouse {
    host = "localhost:8123"
    database = "default"
    table = "fake_all"
    username = "xxxxx"
    password = "xxxxx"
    
    # cdc options
    primary_key = "id"
    support_upsert = true
    allow_experimental_lightweight_delete = true
  }
}