论文 figure、图形摘要、学术海报怎么做:研究生从投稿到参会的视觉素材完整指南
2026/5/4 1:58:24
在 Python UDF 里,指标注册通常写在open():
open():每个并行子任务(subtask)初始化时调用一次eval():每条数据调用(或每批数据调用,取决于 UDF 类型)所以推荐模式是:
open()里注册指标(Counter/Gauge/Distribution/Meter)eval()里更新指标示例骨架:
frompyflink.table.udfimportScalarFunctionclassMyUDF(ScalarFunction):defopen(self,function_context):mg=function_context.get_metric_group()# register metrics heredefeval(self,x):# update metrics herereturnxPyFlink 支持四种常用指标类型,各自适用场景不同。
用途:统计处理条数、错误数、某类事件数等
更新方式:inc()/inc(n)/dec()/dec(n)
frompyflink.table.udfimportScalarFunctionclassMyUDF(ScalarFunction):def__init__(self):self.counter=Nonedefopen(self,function_context):self.counter=function_context.get_metric_group().counter("my_counter")defeval(self,i):self.counter.inc(i)# 示例里用 i 递增returni工程建议(更贴近生产):
inc()统计条数error_counter.inc()用途:展示“当前状态值”,例如当前缓存大小、最近一条数据长度、队列长度等
注册方式:gauge(name, Callable[[], int])
限制:Gauge 只支持整数
frompyflink.table.udfimportScalarFunctionclassMyUDF(ScalarFunction):def__init__(self):self.length=0defopen(self,function_context):function_context.get_metric_group().gauge("my_gauge",lambda:self.length)defeval(self,i):self.length=ireturni-1工程建议:
用途:统计某个值的分布特征,比如每条数据大小、处理耗时(毫秒)、某字段长度等
更新方式:update(n: int)
frompyflink.table.udfimportScalarFunctionclassMyUDF(ScalarFunction):def__init__(self):self.distribution=Nonedefopen(self,function_context):self.distribution=function_context.get_metric_group().distribution("my_distribution")defeval(self,i):self.distribution.update(i)returni-1工程建议:
用途:看吞吐趋势,比如每秒处理记录数、某类事件速率
更新方式:mark_event()/mark_event(n)
可配置统计时间窗:默认 60s,可指定 120s 等
frompyflink.table.udfimportScalarFunctionclassMyUDF(ScalarFunction):def__init__(self):self.meter=Nonedefopen(self,function_context):self.meter=function_context.get_metric_group().meter("my_meter",time_span_in_seconds=120)defeval(self,i):self.meter.mark_event(i)returni-1工程建议:
mark_event(1)表示处理 1 条你可以通过MetricGroup.add_group(key, value=None)做分组,形成更清晰的指标层级。
function_context \.get_metric_group()\.add_group("my_metrics")\.counter("my_counter")效果:指标会挂在my_metrics分组下,避免所有指标挤在一个层级。
function_context \.get_metric_group()\.add_group("my_metrics_key","my_metrics_value")\.counter("my_counter")注意点(文档强调):
下面这些是“埋了之后真的能救命”的指标组合(建议你直接套用):
processed_records(Counter):处理总条数error_records(Counter):异常条数(try/except 里 inc)current_cache_size(Gauge):当前缓存/字典大小(如果你在 open 里加载了东西)latency_ms(Distribution):单条处理耗时或某阶段耗时(整数毫秒)throughput_rps(Meter):记录速率(每秒条数)性能注意:
你可以把这段作为自己的标准模板(结构清晰,扩展方便):
(如果你需要,我也可以按你现有的 UDF 样式,给你写一个“带异常计数 + 耗时分布 + 吞吐 meter + 分组”的完整类)