MCPFlow:为AI代理构建结构化工作流编排与安全执行框架
2026/5/4 8:35:53
| 场景 | 后果 |
|---|---|
| 突发流量(如促销) | 服务雪崩、502 错误 |
| 慢 SQL | 数据库 CPU 100%,拖垮整个系统 |
| 内存泄漏 | Worker 崩溃,需频繁重启 |
| 无监控 | 故障发生后才知晓,MTTR > 1 小时 |
[压测] → [监控] → [分析] → [优化] ↑_________________________↓原则:不要猜测瓶颈,用数据说话。
| 工具 | 编程语言 | 并发模型 | 分布式 | 易用性 |
|---|---|---|---|---|
| JMeter | Java | 线程 | ✔ | ❌(XML 配置复杂) |
| Gatling | Scala | Actor | ✔ | ⚠️(学习曲线陡) |
| Locust | Python | 协程 (gevent) | ✔ | ✅(代码即配置) |
优势:
- 用 Python 写用户行为,灵活度高
- 实时 Web UI 查看 RPS、响应时间、错误率
- 支持分布式压测(Master-Worker)
pip install locust/perf-test ├── locustfile.py ← 主压测脚本 ├── tasks/ │ ├── auth.py ← 登录任务 │ └── api.py ← API 调用任务 └── utils/ └── jwt.py ← Token 管理# perf-test/locustfile.py from locust import HttpUser, task, between from tasks.auth import login from tasks.api import get_profile, create_post class WebsiteUser(HttpUser): wait_time = between(1, 3) # 用户操作间隔 1~3 秒 def on_start(self): """每个用户启动时登录""" self.access_token = login(self.client) @task(3) def view_profile(self): get_profile(self.client, self.access_token) @task(1) def create_new_post(self): create_post(self.client, self.access_token, "Hello from Locust!")# perf-test/tasks/auth.py import json def login(client): response = client.post("/auth/login", json={ "username": "testuser", "password": "secure_password" }) assert response.status_code == 200 return response.json()["access_token"]# perf-test/tasks/api.py def get_profile(client, token): client.get("/api/profile", headers={"Authorization": f"Bearer {token}"}) def create_post(client, token, content): client.post("/api/posts", json={"content": content}, headers={"Authorization": f"Bearer {token}"})关键点:
- 每个虚拟用户独立登录,持有自己的 Token
@task(weight)控制行为频率(profile:post = 3:1)
cd perf-test locust -f locustfile.py --host=http://localhost:5000访问http://localhost:8089:
启动 Master:
locust -f locustfile.py --master --host=http://your-prod-domain.com启动多个 Worker(在不同机器):
locust -f locustfile.py --worker --master-host=MASTER_IP适用场景:单机网络/ CPU 不足以产生足够负载。
| 指标 | 健康阈值 | 危险信号 |
|---|---|---|
| RPS(每秒请求数) | ≥ 预期峰值 | 远低于预期 |
| P95 响应时间 | < 500ms | > 2s |
| 失败率 | 0% | > 0.1% |
| CPU 使用率 | < 70% | 持续 100% |
案例:
- 若 RPS 上升但响应时间暴增 →数据库瓶颈
- 若失败率突增 →连接池耗尽 / 内存溢出
[Flask App] → (metrics) → [Prometheus] → [Grafana] [Celery] ↗ [PostgreSQL]↗ [Redis] ↗安装依赖:
pip install prometheus-client在 Flask 应用中添加:
# app/metrics.py from prometheus_client import Counter, Histogram, generate_latest from flask import Response REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP Requests', ['method', 'endpoint', 'status']) REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP Request Latency', ['method', 'endpoint']) @app.route('/metrics') def metrics(): return Response(generate_latest(), mimetype='text/plain') # 中间件记录请求 @app.before_request def before_request(): g.start_time = time.time() @app.after_request def after_request(response): latency = time.time() - g.start_time REQUEST_LATENCY.labels(request.method, request.endpoint).observe(latency) REQUEST_COUNT.labels(request.method, request.endpoint, response.status_code).inc() return response安装celery-prometheus-exporter:
pip install celery-prometheus-exporter启动 Exporter(作为独立进程):
celery-prometheus-exporter --broker-url redis://redis:6379/0暴露指标端口9808。
启用pg_stat_statements(需 superuser):
CREATE EXTENSION pg_stat_statements;使用postgres_exporter:
# docker-compose.yml services: postgres-exporter: image: wrouesnel/postgres_exporter environment: DATA_SOURCE_NAME: "postgresql://user:pass@postgres:5432/db?sslmode=disable" ports: - "9187:9187"Redis 自带INFO命令,使用redis_exporter:
# docker-compose.yml services: redis-exporter: image: oliver006/redis_exporter command: --redis.addr redis://redis:6379 ports: - "9121:9121"新建prometheus.yml:
scrape_configs: - job_name: 'flask-app' static_configs: - targets: ['web:8000'] # Flask 容器名 - job_name: 'celery' static_configs: - targets: ['celery-exporter:9808'] - job_name: 'postgres' static_configs: - targets: ['postgres-exporter:9187'] - job_name: 'redis' static_configs: - targets: ['redis-exporter:9121']# docker-compose.monitoring.yml version: '3.8' services: prometheus: image: prom/prometheus ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin volumes: - grafana-storage:/var/lib/grafana volumes: grafana-storage:启动:
docker-compose -f docker-compose.yml -f docker-compose.monitoring.yml up -d11895(Python HTTP Metrics)9628763celery_queue_length)| 组件 | 核心指标 |
|---|---|
| Flask | QPS、P95 延迟、错误率 |
| PostgreSQL | 活跃连接数、慢查询(>100ms)、缓存命中率 |
| Redis | 内存使用、命中率、阻塞客户端 |
| Celery | 队列长度、任务处理速率、Worker 数量 |
pg_stat_statements显示某 SQL 平均耗时 2sWHERE字段添加索引celery_queue_length持续增长注意:Docker Compose 本身不支持 HPA,需借助外部脚本。
编写监控脚本autoscale.sh:
#!/bin/bash CPU_THRESHOLD=70 MIN_WORKERS=2 MAX_WORKERS=10 while true; do CPU=$(docker stats --no-stream --format "{{.CPUPerc}}" web | sed 's/%//') CURRENT=$(docker-compose ps -q celery | wc -l) if (( $(echo "$CPU > $CPU_THRESHOLD" | bc -l) )) && [ $CURRENT -lt $MAX_WORKERS ]; then echo "Scaling up Celery to $(($CURRENT + 1))" docker-compose up -d --scale celery=$(($CURRENT + 1)) elif (( $(echo "$CPU < 50" | bc -l) )) && [ $CURRENT -gt $MIN_WORKERS ]; then echo "Scaling down Celery to $(($CURRENT - 1))" docker-compose up -d --scale celery=$(($CURRENT - 1)) fi sleep 30 done若迁移到 K8s,可基于自定义指标扩缩容:
# hpa.yaml apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: celery-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: celery-worker minReplicas: 2 maxReplicas: 20 metrics: - type: Pods pods: metric: name: celery_queue_length target: type: AverageValue averageValue: "10" # 队列长度 >10 则扩容需部署
prometheus-adapter将 Prometheus 指标转为 K8s metrics。
PostgreSQL 配置(postgresql.conf):
log_min_duration_statement = 100 # 记录 >100ms 的查询 shared_preload_libraries = 'pg_stat_statements' pg_stat_statements.track = allSELECT query, calls, total_exec_time, mean_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10;| 问题 | 解决方案 |
|---|---|
| 全表扫描 | 添加 WHERE 字段索引 |
| N+1 查询 | 使用 SQLAlchemyjoinedload() |
| 大分页 | 改用游标分页(WHERE id > last_id) |
| 写入瓶颈 | 批量插入(bulk_insert()) |
pg_stat_statements定位慢 SQL(user_id, created_at)