Paimon
Paimon 数据连接器
描述
Apache Paimon数据连接器。支持cdc写以及自动建表。
支持的数据源信息
| 数据源 | 依赖 | Maven | 
|---|---|---|
| Paimon | hive-exec | Download | 
| Paimon | libfb303 | Download | 
数据源依赖
为了兼容不同版本的Hadoop和Hive,在项目pom文件中Hive -exec的作用域为provided,所以如果您使用Flink引擎,首先可能需要将以下Jar包添加到<FLINK_HOME>/lib目录下,如果您使用Spark引擎并与Hadoop集成,则不需要添加以下Jar包。
hive-exec-xxx.jar
libfb303-xxx.jar
有些版本的hive-exec包没有libfb303-xxx.jar,所以您还需要手动导入Jar包。
主要特性
连接器选项
| 名称 | 类型 | 是否必须 | 默认值 | 描述 | |
|---|---|---|---|---|---|
| warehouse | 字符串 | 是 | - | Paimon warehouse路径 | |
| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive | |
| catalog_uri | 字符串 | 否 | - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | |
| database | 字符串 | 是 | - | 数据库名称 | |
| table | 字符串 | 是 | - | 表名 | |
| hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 | |
| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 | |
| data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 | |
| paimon.table.primary-keys | 字符串 | 否 | - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) | |
| paimon.table.partition-keys | 字符串 | 否 | - | 分区字段列表,多字段使用逗号分隔 | |
| paimon.table.write-props | Map | 否 | - | Paimon表初始化指定的属性, 参考 | |
| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 | |
| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | 
示例
单表
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}
source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
    username = "root"
    password = "******"
    table-names = ["seatunnel.role"]
  }
}
transform {
}
sink {
  Paimon {
    catalog_name="seatunnel_test"
    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
    database="seatunnel"
    table="role"
  }
}
单表(指定hadoop HA配置和kerberos配置)
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}
source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
    username = "root"
    password = "******"
    table-names = ["seatunnel.role"]
  }
}
transform {
}
sink {
  Paimon {
    catalog_name="seatunnel_test"
    warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
    database="seatunnel"
    table="role"
    paimon.hadoop.conf = {
      fs.defaultFS = "hdfs://nameservice1"
      dfs.nameservices = "nameservice1"
      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
      dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      dfs.client.use.datanode.hostname = "true"
      security.kerberos.login.principal = "your-kerberos-principal"
      security.kerberos.login.keytab = "your-kerberos-keytab-path"
    }
  }
}
单表(使用Hive catalog)
env {
  parallelism = 1
  job.mode = "BATCH"
}
source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_1", 100]
      },
      {
        kind = DELETE
        fields = [2, "B", 100]
      }
    ]
  }
}
sink {
  Paimon {
    schema_save_mode = "RECREATE_SCHEMA"
    catalog_name="seatunnel_test"
    catalog_type="hive"
    catalog_uri="thrift://hadoop04:9083"
    warehouse="hdfs:///tmp/seatunnel"
    database="seatunnel_test"
    table="st_test3"
    paimon.hadoop.conf = {
      fs.defaultFS = "hdfs://nameservice1"
      dfs.nameservices = "nameservice1"
      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
      dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      dfs.client.use.datanode.hostname = "true"
    }
  }
}
指定paimon的写属性的单表
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}
source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
    username = "root"
    password = "******"
    table-names = ["seatunnel.role"]
  }
}
sink {
  Paimon {
    catalog_name="seatunnel_test"
    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
    database="seatunnel"
    table="role"
    paimon.table.write-props = {
        bucket = 2
        file.format = "parquet"
    }
    paimon.table.partition-keys = "dt"
    paimon.table.primary-keys = "pk_id,dt"
  }
}
多表
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}
source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
    username = "root"
    password = "******"
    table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
  }
}
transform {
}
sink {
  Paimon {
    catalog_name="seatunnel_test"
    warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
    database="${database_name}"
    table="${table_name}"
  }
}