Paimon
Paimon 源连接器
描述
用于从 Apache Paimon 读取数据
主要功能
配置选项
| 名称 | 类型 | 是否必须 | 默认值 | 
|---|---|---|---|
| warehouse | String | 是 | - | 
| catalog_type | String | 否 | filesystem | 
| catalog_uri | String | 否 | - | 
| database | String | 是 | - | 
| table | String | 是 | - | 
| hdfs_site_path | String | 否 | - | 
| query | String | 否 | - | 
| paimon.hadoop.conf | Map | 否 | - | 
| paimon.hadoop.conf-path | String | 否 | - | 
warehouse [string]
Paimon warehouse 路径
catalog_type [string]
Paimon Catalog 类型,支持 filesystem 和 hive
catalog_uri [string]
Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
database [string]
需要访问的数据库
table [string]
需要访问的表
hdfs_site_path [string]
hdfs-site.xml 文件地址
query [string]
读取表格的筛选条件,例如:select * from st_test where id > 100。如果未指定,则将读取所有记录。 
目前,where 支持<, <=, >, >=, =, !=, or, and,is null, is not null, between...and,其他暂不支持。 
由于 Paimon 限制,目前不支持 Having, Group By 和 Order By,未来版本将会支持 projection 和 limit。
注意:当 where 后的字段为字符串或布尔值时,其值必须使用单引号,否则将会报错。例如 name='abc' 或 tag='true'。
当前 where 支持的字段数据类型如下:
- string
- boolean
- tinyint
- smallint
- int
- bigint
- float
- double
- date
- timestamp
- time
paimon.hadoop.conf [string]
hadoop conf 属性
paimon.hadoop.conf-path [string]
指定 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' 文件加载路径。
Filesystems
Paimon 连接器支持向多个文件系统写入数据。目前,支持的文件系统有 hdfs 和 s3。
如果使用 s3 文件系统,可以在 paimon.hadoop.conf 中配置fs.s3a.access-key、fs.s3a.secret-key、fs.s3a.endpoint、fs.s3a.path.style.access、fs.s3a.aws.credentials.provider 属性,数仓地址应该以 s3a:// 开头。
示例
简单示例
source {
 Paimon {
     warehouse = "/tmp/paimon"
     database = "default"
     table = "st_test"
   }
}
Filter 示例
source {
  Paimon {
    warehouse = "/tmp/paimon"
    database = "full_type"
    table = "st_test"
    query = "select c_boolean, c_tinyint from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
  }
}
S3 示例
env {
  execution.parallelism = 1
  job.mode = "BATCH"
}
source {
  Paimon {
    warehouse = "s3a://test/"
    database = "seatunnel_namespace11"
    table = "st_test"
    paimon.hadoop.conf = {
        fs.s3a.access-key=G52pnxg67819khOZ9ezX
        fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
        fs.s3a.endpoint="http://minio4:9000"
        fs.s3a.path.style.access=true
        fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
    }
  }
}
sink {
  Console{}
}
Hadoop 配置示例
source {
  Paimon {
    catalog_name="seatunnel_test"
    warehouse="hdfs:///tmp/paimon"
    database="seatunnel_namespace1"
    table="st_test"
    query = "select * from st_test where pk_id is not null and pk_id < 3"
    paimon.hadoop.conf = {
      hadoop_user_name = "hdfs"
      fs.defaultFS = "hdfs://nameservice1"
      dfs.nameservices = "nameservice1"
      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
      dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      dfs.client.use.datanode.hostname = "true"
    }
  }
}
Hive catalog 示例
source {
  Paimon {
    catalog_name="seatunnel_test"
    catalog_type="hive"
    catalog_uri="thrift://hadoop04:9083"
    warehouse="hdfs:///tmp/seatunnel"
    database="seatunnel_test"
    table="st_test3"
    paimon.hadoop.conf = {
      fs.defaultFS = "hdfs://nameservice1"
      dfs.nameservices = "nameservice1"
      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
      dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      dfs.client.use.datanode.hostname = "true"
    }
  }
}
Changelog
如果要读取 paimon 表的 changelog,首先要为 Paimon 源表设置 changelog-producer,然后使用 SeaTunnel 流任务读取。
Note
目前,批读取总是读取最新的快照,如需读取更完整的 changelog 数据,需使用流读取,并在将数据写入 Paimon 表之前开始流读取,为了确保顺序,流读取任务并行度应该设置为 1。
Streaming read 示例
env {
  parallelism = 1
  job.mode = "Streaming"
}
source {
  Paimon {
    warehouse = "/tmp/paimon"
    database = "full_type"
    table = "st_test"
  }
}
sink {
  Paimon {
    warehouse = "/tmp/paimon"
    database = "full_type"
    table = "st_test_sink"
    paimon.table.primary-keys = "c_tinyint"
  }
}
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Feature][Connector-V2] Support between predicate pushdown in paimon (#8962) | https://github.com/apache/seatunnel/commit/3b141cf62 | 2.3.10 | 
| [Feature][Connector-V2] Suppor Time type in paimon connector (#8880) | https://github.com/apache/seatunnel/commit/9f1e59009 | 2.3.10 | 
| [Feature][Paimon] Customize the hadoop user (#8888) | https://github.com/apache/seatunnel/commit/2657626f9 | 2.3.10 | 
| [Improve][Connector-v2][Paimon]PaimonCatalog close error message update (#8640) | https://github.com/apache/seatunnel/commit/48253da8d | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 | 
| [Improve][Connector-v2] Support checkpoint in batch mode for paimon sink (#8333) | https://github.com/apache/seatunnel/commit/f22d4ebd4 | 2.3.9 | 
| [Feature][Connector-v2] Support schema evolution for paimon sink (#8211) | https://github.com/apache/seatunnel/commit/57190e2a3 | 2.3.9 | 
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 | 
| [Feature][Connector-v2] Support S3 filesystem of paimon connector (#8036) | https://github.com/apache/seatunnel/commit/e2a477293 | 2.3.9 | 
| [Feature][transform] transform support explode (#7928) | https://github.com/apache/seatunnel/commit/132278c06 | 2.3.9 | 
| [Feature][Connector-V2] Piamon Sink supports changelog-procuder is lookup and full-compaction mode (#7834) | https://github.com/apache/seatunnel/commit/c0f27c2f7 | 2.3.9 | 
| [Fix][connector-v2]Fix Paimon table connector Error log information. (#7873) | https://github.com/apache/seatunnel/commit/a3b49e635 | 2.3.9 | 
| [Improve][Connector-v2] Use checkpointId as the commit's identifier instead of the hash for streaming write of paimon sink (#7835) | https://github.com/apache/seatunnel/commit/c7a384af2 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 | 
| [Fix][Connecotr-V2] Fix paimon dynamic bucket tale in primary key is not first (#7728) | https://github.com/apache/seatunnel/commit/dc7f69553 | 2.3.8 | 
| [Improve][Connector-v2] Remove useless code and add changelog doc for paimon sink (#7748) | https://github.com/apache/seatunnel/commit/846d876dc | 2.3.8 | 
| [Hotfix][Connector-V2] Release resources even the task is crashed for paimon sink (#7726) | https://github.com/apache/seatunnel/commit/5ddf8d461 | 2.3.8 | 
| [Fix][Connector-V2] Fix paimon e2e error (#7721) | https://github.com/apache/seatunnel/commit/61d196436 | 2.3.8 | 
| [Feature][Connector-Paimon] Support dynamic bucket splitting improves Paimon writing efficiency (#7335) | https://github.com/apache/seatunnel/commit/bc0326cba | 2.3.8 | 
| [Feature][Connector-v2] Support streaming read for paimon (#7681) | https://github.com/apache/seatunnel/commit/4a2e27291 | 2.3.8 | 
| [Hotfix][Seatunnel-common] Fix the CommonError msg for paimon sink (#7591) | https://github.com/apache/seatunnel/commit/d1f5db925 | 2.3.8 | 
| [Feature][CONNECTORS-V2-Paimon] Paimon Sink supported truncate table (#7560) | https://github.com/apache/seatunnel/commit/4f3df2212 | 2.3.8 | 
| [Improve][Connector-v2] Improve the exception msg in case-sensitive case for paimon sink (#7549) | https://github.com/apache/seatunnel/commit/7d31e5668 | 2.3.8 | 
| [Hotfix][Connector-V2] Fixed lost data precision for decimal data types (#7527) | https://github.com/apache/seatunnel/commit/df210ea73 | 2.3.8 | 
| [Improve][API] Move catalog open to SaveModeHandler (#7439) | https://github.com/apache/seatunnel/commit/8c2c5c79a | 2.3.8 | 
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446 | 2.3.7 | 
| The isNullable attribute is true when the primary key field in the Paimon table converts the Column object. #7231 (#7242) | https://github.com/apache/seatunnel/commit/b0fe432e9 | 2.3.6 | 
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122 | 2.3.6 | 
| [Paimon]support projection for paimon source (#6343) | https://github.com/apache/seatunnel/commit/6c1577267 | 2.3.6 | 
| [Improve][Paimon] Add check for the base type between source and sink before write. (#6953) | https://github.com/apache/seatunnel/commit/d56d64fc0 | 2.3.6 | 
| [Improve][Connector-V2] Improve the paimon source (#6887) | https://github.com/apache/seatunnel/commit/658643ae5 | 2.3.6 | 
| [Hotfix][Connector-V2] Close the tableWrite when task is close (#6897) | https://github.com/apache/seatunnel/commit/23a744b9b | 2.3.6 | 
| [Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion (#6767) | https://github.com/apache/seatunnel/commit/6cf6e41da | 2.3.6 | 
| [Improve][Connector-V2] Support hive catalog for paimon sink (#6833) | https://github.com/apache/seatunnel/commit/4969c91dc | 2.3.6 | 
| [Hotfix][Connector-V2] Fix the batch write with paimon (#6865) | https://github.com/apache/seatunnel/commit/9ec971d94 | 2.3.6 | 
| [Feature][Doris] Add Doris type converter (#6354) | https://github.com/apache/seatunnel/commit/518999184 | 2.3.6 | 
| [Improve][Connector-V2] Support hadoop ha and kerberos for paimon sink (#6585) | https://github.com/apache/seatunnel/commit/20b62f3bf | 2.3.5 | 
| [Feature][Paimon] Support specify paimon table write properties, partition keys and primary keys (#6535) | https://github.com/apache/seatunnel/commit/2b1234c7a | 2.3.5 | 
| [Feature][Connector-V2] Support multi-table sink feature for paimon #5652 (#6449) | https://github.com/apache/seatunnel/commit/b0abbd2d8 | 2.3.5 | 
| [Feature][Connectors-v2-Paimon] Adaptation Paimon 0.6 Version (#6061) | https://github.com/apache/seatunnel/commit/b32df930e | 2.3.4 | 
| [Fix][Connectors-v2-Paimon] Flink table store failed to prepare commit (#6057) | https://github.com/apache/seatunnel/commit/c8dcefc3b | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 | 
| [Improve] Remove use SeaTunnelSink::getConsumedTypemethod and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 | 
| [Hotfix][Connector-V2][Paimon] Bump paimon-bundle version to 0.4.0-incubating (#5219) | https://github.com/apache/seatunnel/commit/2917542bf | 2.3.3 | 
| [Improve] Documentation and partial word optimization. (#4936) | https://github.com/apache/seatunnel/commit/6e8de0e2a | 2.3.3 | 
| [Connector-V2][Paimon] Introduce paimon connector (#4178) | https://github.com/apache/seatunnel/commit/da507bbe0 | 2.3.2 |