《穿透 MySQL 索引专栏(六):从底层数据结构到千万级慢查询调优》
2026/5/9 11:37:12
消息队列系统需要监控的指标包括队列深度、消费延迟、消息吞吐量、错误率等。
package mqmonitor import ( "context" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" ) type MetricsCollector struct { queueDepth prometheus.Gauge messageProduced prometheus.Counter messageConsumed prometheus.Counter consumeLatency prometheus.Histogram errorRate prometheus.Counter } func NewMetricsCollector(namespace, subsystem string) *MetricsCollector { return &MetricsCollector{ queueDepth: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queue_depth", Help: "Current number of messages in queue", }), messageProduced: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "messages_produced_total", Help: "Total number of messages produced", }), messageConsumed: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "messages_consumed_total", Help: "Total number of messages consumed", }), consumeLatency: prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, Name: "consume_latency_seconds", Help: "Message consume latency in seconds", Buckets: prometheus.DefBuckets, }), errorRate: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "errors_total", Help: "Total number of errors", }), } } func (m *MetricsCollector) Register() { prometheus.MustRegister( m.queueDepth, m.messageProduced, m.messageConsumed, m.consumeLatency, m.errorRate, ) } func (m *MetricsCollector) SetQueueDepth(depth float64) { m.queueDepth.Set(depth) } func (m *MetricsCollector) IncProduced() { m.messageProduced.Inc() } func (m *MetricsCollector) IncConsumed() { m.messageConsumed.Inc() } func (m *MetricsCollector) ObserveLatency(duration time.Duration) { m.consumeLatency.Observe(duration.Seconds()) } func (m *MetricsCollector) IncErrors() { m.errorRate.Inc() }type QueueMonitor struct { collector *MetricsCollector interval time.Duration queues []string stopCh chan struct{} wg sync.WaitGroup } func NewQueueMonitor(collector *MetricsCollector, interval time.Duration) *QueueMonitor { return &QueueMonitor{ collector: collector, interval: interval, queues: make([]string, 0), stopCh: make(chan struct{}), } } func (m *QueueMonitor) AddQueue(queueName string) { m.queues = append(m.queues, queueName) } func (m *QueueMonitor) Start(ctx context.Context) { m.wg.Add(1) go func() { defer m.wg.Done() ticker := time.NewTicker(m.interval) defer ticker.Stop() for { select { case <-ticker.C: m.collect() case <-m.stopCh: return case <-ctx.Done(): return } } }() } func (m *QueueMonitor) Stop() { close(m.stopCh) m.wg.Wait() } func (m *QueueMonitor) collect() { for _, queue := range m.queues { depth, err := m.getQueueDepth(queue) if err != nil { m.collector.IncErrors() continue } m.collector.SetQueueDepth(depth) } } func (m *QueueMonitor) getQueueDepth(queue string) (float64, error) { return 0, nil }本文介绍了消息队列监控指标采集的实现方法,通过Prometheus客户端库可以方便地采集和展示监控数据。