Paimon changelog-producer 与 merge-engine
2026/4/25 3:34:36 网站建设 项目流程
changelog-producer

changelog producer的主要目的是为了在 Paimon 表上产生读流的changelog,如果只是批读的表是可以不用设置 changelog-producer 的
比如数据库如 Mysql 来说,当执行的语句设计数据的修改例如插入、更新、删除时,Mysql 会将这些数据变动记录在 binlog 中,相当于额外记录一份操作日志,Paimon 中Changelog 来表示对应的参数是:changelog-producer,有四种模式:NoneInputLookupfull-compation
Changelog Producer 的主要作用是生成完整的变更日志,记录数据的插入(+I)、更新(+U/-U)或删除(-D)操作,以便下游消费者(例如流式计算引擎 Flink)能够基于这些变更进行实时分析。
为什么需要完整的变更日志呢?举个例子:创建一张paimon表:user_id 和 order_id 组成主键,amount 表示订单金额,业务需求是通过这张表实时计算每个 user_id 的总消费金额(SUM(amount))。上游对表插入2条数据:第一次插入 +I (user1, order1, 100); 第二次更新数据 +I (user1, order1, 150); 如果没有中间变更日志,这个需求就算不出来。

为什么要有 changelog

Paimon 表需要将数据的增删改操作作为变更数据(changelog),有了变更数据下游才能进行流消费,通过在 with 中设置changelog-producer,Paimon 将会以不同的方式产生变更数据。

None 模式

不查旧值,不额外写 changelog

set'execution.runtime-mode'='streaming';set'table.exec.sink.upsert-materialize'='NONE';set'execution.checkpointing.interval'='10 s';set'parallelism.default'=1;set'sql-client.execution.result-mode'='changelog';
createcatalog paimonwith('type'='paimon','warehouse'='hdfs://hadoop03:50070/bigdata/lakehouse')usecatalog paimon;createdatabaseifnotexiststest;createtableifnotexistspaimon.test.none_init(`id`int,`name`String,`salary`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='deduplicate','changelog-produce'='none')insertpaimon.test.none_initvalues(1,'flink',1000);insertpaimon.test.none_initvalues(1,'flink',2000);insertpaimon.test.none_initvalues(1,'flink',3000);-- 查询 none 在流处理模式下select*frompaimon.test.none_init/*+ OPTION('scan.snapshot-id' = '1') */

默认就是none, 这种模式下 Paimon 不会额外存储数据,Source 读取的时候,就是将 snapshot 的 delta list 文件读取出来,就是本次 snapshot 的增量 Changelog 了,Paimon 主键表将不会产生完整的变更数据。
在 none 模式中,虽然在 Paimon 侧没有占用额外的内存,但是在下游的流任务的状态中其实是有一份全量表的额外存储的开销的(/*+ OPTIONS('scan.snapshot-id'='1') */)
使用场景:Paimon 表通过批作业进行消费。


Input 模式

不查找旧值,额外写 changelog。过来什么传递什么,不涉及额外计算。
Paimon 主键表会直接将输入的消息作为变更数据传递给下游消费者,输入数据流本身是完整的变更数据时(例如数据库的 binlog),input 机制不涉及额外的计算因此其效率最高。

使用场景:关系型数据库 binlog 日志采集

set'execution.runtime-mode'='streaming';set'table.exec.sink.upsert-materialize'='NONE';set'execution.checkpointing.interval'='10 s';set'parallelism.default'=1;set'sql-client.execution.result-mode'='changelog';
createcatalog paimonwith('type'='paimon','warehouse'='hdfs://hadoop03:50070/bigdata/lakehouse')usecatalog paimon;createdatabaseifnotexiststest;createtableifnotexistspaimon.test.input_init(`id`int,`name`String,`salary`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='deduplicate','changelog-produce'='input')insertpaimon.test.input_initvalues(1,'flink',1000);insertpaimon.test.input_initvalues(1,'flink',2000);


Lookup 模式

查找旧值,额外存储 changelog。
Paimon 主键表在 Flink 作业每次创建检查点(checkpoint)时触发小文件合并(compaction), 并利用小文件合并的结果产生完整的变更数据,在 compaction 的过程中,会去向高层查找本次新增 key 的旧值,如果没有查到,那么本次的就是新增 key ,如果有查到那么久生成完整的 UB 和 UA 消息。


使用场景:表在写入过程中有计算逻辑(partial-update/aggregation 合并引擎)使用该模式,用来通过查找旧值来生成正确的changelog,与 full-compation 模式相比,lookup 模式的时效性更好。

set'execution.runtime-mode'='streaming';set'table.exec.sink.upsert-materialize'='NONE';set'execution.checkpointing.interval'='10 s';set'parallelism.default'=1;set'sql-client.execution.result-mode'='changelog';
createcatalog paimonwith('type'='paimon','warehouse'='hdfs://hadoop03:50070/bigdata/lakehouse')usecatalog paimon;createdatabaseifnotexiststest;createtableifnotexistspaimon.test.lookup_input(`id`int,`name`String,`salary`int,`sum_cnt`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='aggregation','fileds.salary.aggregation-function'='max','fileds.sun_cnt.aggregation-function'='sum','changelog-producer'='input')createtableifnotexistspaimon.test.lookup_lookup(`id`int,`name`String,`salary`int,`sum_cnt`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='aggregation','fileds.salary.aggregation-function'='max','fileds.sun_cnt.aggregation-function'='sum','changelog-producer'='lookup')insertpaimon.test.lookup_inputvalues(1,'flink',1000,1000);insertpaimon.test.lookup_inputvalues(1,'flink',2000,2000);insertpaimon.test.lookup_inputvalues(1,'flink',500,500);insertpaimon.test.lookup_lookupvalues(1,'flink',1000,1000);insertpaimon.test.lookup_lookupvalues(1,'flink',2000,1000);insertpaimon.test.lookup_lookupvalues(1,'flink',500,500);-- 查询 none 在流处理模式下select*frompaimon.test.lookup_input/*+ OPTION('scan.snapshot-id' = '1') */-- 查询 none 在流处理模式下select*frompaimon.test.lookup_lookup/*+ OPTION('scan.snapshot-id' = '1') */


full-compaction

查找旧值,额外存储 changelog。
Paimon 主键表会在每一次执行小文件全量合并(full compaction)时,产生完整的变更数据。
这种模式下一般通过设置full-compaction.delta-commits定期进行 full compaction,因为 full compaction 其实代价是比较高的,所以这种模式整体的开销也是比较大的。
full compaction 时间 =full-compaction.delta-commits次数乘以 checkpoint 时间

changelog 生成时机取决于 full-compaction 完成时机

eg:4 * 60 s = 4 分钟

注意:由于小文件全量合并小孩较多计算资源,因此频率不宜过高,建议每30 分钟值一小时强制执行一次。

set'execution.runtime-mode'='streaming';set'table.exec.sink.upsert-materialize'='NONE';set'execution.checkpointing.interval'='10 s';set'parallelism.default'=1;set'sql-client.execution.result-mode'='changelog';
createcatalog paimonwith('type'='paimon','warehouse'='hdfs://hadoop03:50070/bigdata/lakehouse')usecatalog paimon;createdatabaseifnotexiststest;createtableifnotexistspaimon.test.compaction_input(`id`int,`name`String,`age`int)with('connector'='datagen','fields.id.kind'='random','fields.id.max'='100','fields.id.min'='1','fields.name.length'='10','fields.age.min'='18','fields.age.max'='60','rows-per-second'='3')createtableifnotexistspaimon.test.compaction(`id`int,`name`String,`age`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='aggregation','fileds.age.aggregation-function'='sum','changelog-producer'='full-compaction','full-compaction.delta-commits'='3')insertintopaimon.test.compactionselect*frompaimon.test.compaction_input


lookup vs full-compaction

无论输入数据流是否为完整的变更数据,都可以使用 lookup、full-compaction。

  • lookup:与 full-compaction 相比较,lookup模式的时效性更好,但总体消耗的资源更多,如果数据时效性要求很高(分钟级)的情况下使用。
  • full-compaction:与 lookup 模式相比,full-compaction 模式的时效性较差,小文件合并流程,不产生额外计算,因此总体来看消耗的资源更少,数据时效性(小时级)的情况下使用,由于小文件全量合并会消耗较多计算资源,因此频率不宜过高,建议每30 分钟值一小时强制执行一次。
merge-engine

相同主键的多条数据, Paimon 会根据merge-engine参数对数据进行合并

deduplicate (默认)

对于相同主键的数据,主键表仅会保留最新的一条数据,如果最新数据时 delete 操作,所有对应主键的数据都会被丢弃

createtableifnotexistspaimon.test.compaction(`id`int,`name`String,`age`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='deduplicate')
first-row

Paimon 只会保留相同主键的第一条数据,与 deduplicate 合并机制相比,first-row 只会产生 insert 类型的变更数据。

重点注意:

  • 下游如果需要流式消费 first-row 产生的数据,上游表changelog-producer参数必须是 lookup。
  • first-row 无法处理 delete 与 update_before 的消息,可以设置 ‘first-row.ignore-delete’ = ‘true’ 以忽略这两类消息。
  • first-row 不支持changelog-producer input、full-compaction 模式。

为什么first-row 不支持 input、full-compaction 模式?
full-compaction: 每一次执行小文件全量合并(full compaction)时,产生完整的变更数据
input:直接将输入的消息作为变更数据传递给下游消费者(作为 changelog)
input、full-compaction 模式

严格意义上与first-row 相违背,所以 first-row 只支持 none、lookup 模式

流模式: first-row 合并只支持 lookup changelog-producer

createtableifnotexistspaimon.test.compaction(`id`int,`name`String,`age`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='first-row','changelog-producer'='none')createtableifnotexistspaimon.test.compaction(`id`int,`name`String,`age`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='first-row','changelog-producer'='input')createtableifnotexistspaimon.test.compaction(`id`int,`name`String,`age`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='first-row','changelog-producer'='lookup')createtableifnotexistspaimon.test.compaction(`id`int,`name`String,`age`int,PRIMARYKEY(`id`)NOTENFORCED)with('merge-engine'='first-row','changelog-producer'='full-compaction')`
aggregation

具有相同主键的多条数据,主键表将会根据指定的聚合函数,对于不属于主键的每一列都需要通过fileds.<field-name>.aggregation-function指定一个聚合函数,否则将默认使用last_non_null_value聚合函数。

支持的聚合函数如下

  • sum(求和): 支持 DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT或 DOUBLE
  • product(求乘积): 支持 DECIMAL、TINYINT、SMALLINT、INTGER、BIGINT、FLOAT或 DOUBLE
  • count(统计非 null 值总数): 支持INTEGER 和 BIGINT
  • max(最大值) / min(最小值)
  • first_value(返回第一次输入的值) / last_value(返回最新输入的值)
  • first_non_null_value(返回第一次输入非null的值) / last_non_null_value(返回最新输入非null的值)
  • listagg(将输入的字符串依次用英文逗号连接)
  • bool_and / bool_or

注意:
如果下游需要流式消费 aggregation 的结果,需要将changelog-producer参数设置为 input、lookup、full-compaction。

partial update

通过设置 ‘merge-engine’ = ‘partial-update’,支持多数据输入源对于分别更新同一主键部分字段,应用于湖仓宽表开发的业务需求场景。
partial-update 必须跟 lookup 或者 full-compaction changelog producer 结合使用。partial 无法接收 DELETE 消息,可以将partial-update.ignore-delete设置为以忽略 delete 消息。

Sequence Group

sequence-field 并不能解决多流更新的部分更新表的乱序问题,因为多流更新 sequence-field 可能会被另一个流的最新数据覆盖。因此我们引入了部分更新表的序列组 sequence group 机制。 它可以解决:

  • 多流更新时出现混乱。 每个流定义其自己的序列组。
  • 真正做到部分更新,而不仅仅是非空更新。
CREATETABLEt(kINT,aINT,bINT,g_1INT,cINT,dINT,g_2INT,PRIMARYKEY(k)NOTENFORCED)WITH('merge-engine'='partial-update','fields.g_1.sequence-group'='a,b','fields.g_2.sequence-group'='c,d');INSERTINTOtVALUES(1,1,1,1,1,1,1);-- g_2 is null, c, d should not be updatedINSERTINTOtVALUES(1,2,2,2,2,2,CAST(NULLASINT));SELECT*FROMt;-- output 1, 2, 2, 2, 1, 1, 1-- g_1 is smaller, a, b should not be updatedINSERTINTOtVALUES(1,3,3,1,3,3,3);SELECT*FROMt;-- output 1, 2, 2, 2, 3, 3, 3

参考:https://cloud.tencent.com/developer/article/2574738

========================================================

消费端的补全
CREATETABLEorders_no_changelog(order_idBIGINT,priceDOUBLE,PRIMARYKEY(order_id)NOTENFORCED)WITH('bucket'='2');explainSELECTorder_id,SUM(price)AStotalFROMorders_no_changelogGROUPBYorder_id;

changelog-producer = none只有+I,+U,-D消息类型但无-U消息类型;
Flink 的解决方式是:在 Source 和下游算子之间自动注入一个“补全”算子——ChangelogNormalize,由它根据主键的历史状态替我们补出缺失的 -U

==Optimized Physical Plan==ChangelogNormalize(key=[order_id])+- Exchange(distribution=[hash[order_id]])+- TableSourceScan(table=[[paimon_catalog, default, orders_no_changelog]],fields=[order_id, price])
CREATETABLEorders_input_changelog(order_idBIGINT,priceDOUBLE,PRIMARYKEY(order_id)NOTENFORCED)WITH('bucket'='2','changelog-producer'='input');explainSELECTorder_id,SUM(price)AStotalFROMorders_input_changelogGROUPBYorder_id;
==Optimized Physical Plan==DropUpdateBefore +- TableSourceScan(table=[[paimon_catalog, default, orders_input_changelog]],fields=[order_id, price])

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询