Python自动化办公:企业微信机器人定时推送日报实战指南
每天早上9点,团队群聊里准时弹出当天的项目进度、服务器状态和待办事项——这不是行政同事的手动操作,而是Python脚本与企业微信机器人协作的结果。本文将带你从零构建一套可配置、高可用的自动化日报推送系统,让重复性工作彻底交给代码。
1. 企业微信机器人核心功能封装
企业微信机器人提供了丰富的消息类型支持,但直接调用API缺乏工程化设计。我们先构建一个高可用的消息发送基座。
import requests import hashlib import base64 from typing import Union, Optional from pathlib import Path class WeComRobot: def __init__(self, key: str): self.base_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={key}" self.upload_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file" def _safe_request(self, method: str, url: str, **kwargs) -> dict: """统一处理请求异常""" try: resp = requests.request(method, url, **kwargs) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException as e: print(f"请求失败: {str(e)}") return {"errcode": -1, "errmsg": str(e)}关键改进点:
- 使用Python类型注解提升代码可读性
- 内置异常处理机制
- 支持文件路径的Path对象处理
消息类型封装示例(文本+Markdown):
def send_text(self, content: str, mentioned_list: Optional[list] = None) -> dict: """发送文本消息,支持@特定成员""" payload = { "msgtype": "text", "text": { "content": content, "mentioned_list": mentioned_list or [] } } return self._safe_request("POST", self.base_url, json=payload) def send_markdown(self, content: str) -> dict: """发送Markdown格式消息""" payload = { "msgtype": "markdown", "markdown": { "content": content } } return self._safe_request("POST", self.base_url, json=payload)2. 动态消息模板引擎设计
静态消息模板缺乏灵活性,我们引入Jinja2模板引擎实现动态内容生成。
安装依赖:
pip install jinja2模板示例(保存为daily_report.md.j2):
## {{ date }} 项目日报 **服务器状态**: - 在线率: {{ metrics.uptime }}% - 异常服务: {{ metrics.abnormal_services|length }}个 {% for service in metrics.abnormal_services %} - {{ service.name }} ({{ service.status }}) {% endfor %} **今日重点任务**: {% for task in tasks %} {{ loop.index }}. [{{ "✅" if task.done else "⏳" }}] {{ task.name }} - 负责人: @{{ task.owner }} {% endfor %}模板渲染代码:
from jinja2 import Environment, FileSystemLoader import datetime class ReportGenerator: def __init__(self, template_dir: str = "templates"): self.env = Environment( loader=FileSystemLoader(template_dir), autoescape=True, trim_blocks=True ) def render_daily_report(self, context: dict) -> str: template = self.env.get_template("daily_report.md.j2") context.setdefault("date", datetime.date.today().isoformat()) return template.render(context)使用示例:
context = { "metrics": { "uptime": 99.8, "abnormal_services": [ {"name": "Redis", "status": "high_memory"}, {"name": "MySQL", "status": "slow_query"} ] }, "tasks": [ {"name": "完成用户模块测试", "owner": "zhangsan", "done": False}, {"name": "部署v1.2版本", "owner": "lisi", "done": True} ] } generator = ReportGenerator() markdown_content = generator.render_daily_report(context)3. 定时任务系统集成
选择APScheduler作为定时任务框架,相比简单schedule模块更适合生产环境。
核心调度器实现:
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_ERROR import logging class DailyReporter: def __init__(self, robot: WeComRobot): self.robot = robot self.scheduler = BlockingScheduler() self.logger = logging.getLogger("DailyReporter") # 配置错误监听 self.scheduler.add_listener( self._on_job_error, EVENT_JOB_ERROR ) def _on_job_error(self, event): self.logger.error(f"任务执行失败: {event.exception}") def add_daily_job(self, hour: int, minute: int, job_func): """添加每日定时任务""" self.scheduler.add_job( job_func, 'cron', hour=hour, minute=minute, misfire_grace_time=60 ) def start(self): try: self.scheduler.start() except KeyboardInterrupt: self.logger.info("接收到终止信号,关闭调度器...") self.scheduler.shutdown()完整工作流示例:
def build_and_send_report(): # 1. 获取数据 metrics = fetch_system_metrics() tasks = get_todo_tasks() # 2. 渲染模板 report = ReportGenerator().render_daily_report({ "metrics": metrics, "tasks": tasks }) # 3. 发送消息 robot = WeComRobot("YOUR_KEY") result = robot.send_markdown(report) if result.get("errcode") != 0: raise RuntimeError(f"发送失败: {result}") if __name__ == "__main__": reporter = DailyReporter(WeComRobot("YOUR_KEY")) reporter.add_daily_job(9, 0, build_and_send_report) # 每天9点执行 reporter.start()4. 异常处理与监控增强
生产环境需要完善的错误恢复机制,我们实现三级保障策略:
1. 自动重试机制:
from tenacity import retry, stop_after_attempt, wait_exponential class ResilientWeComRobot(WeComRobot): @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) def send_markdown_with_retry(self, content: str) -> dict: result = self.send_markdown(content) if result.get("errcode") != 0: raise RuntimeError(result.get("errmsg")) return result2. 备用通知通道:
def send_with_fallback(robot: WeComRobot, content: str): try: return robot.send_markdown_with_retry(content) except Exception as e: # 切换到邮件通知 send_email_alert( subject="企业微信机器人通知失败", content=f"原始内容: {content}\n错误信息: {str(e)}" ) return {"status": "fallback_to_email"}3. 心跳检测脚本(可加入定时任务):
def check_robot_health(): robot = WeComRobot("YOUR_KEY") test_result = robot.send_text("健康检查ping") if test_result.get("errcode") != 0: alert_to_slack("企业微信机器人不可用!") return False return True5. 与企业现有系统集成
与CI/CD流水线集成示例:
def on_pipeline_complete(build_info): robot = WeComRobot("CI_KEY") content = f""" 🚀 构建完成: {build_info['project']} #{build_info['build_number']} - 状态: {"成功" if build_info['success'] else "失败"} - 耗时: {build_info['duration']}秒 - 变更: {build_info['commit_message']} """ robot.send_markdown(content)与监控系统Prometheus集成:
from prometheus_client import CollectorRegistry, push_to_gateway def report_metrics(): registry = CollectorRegistry() # ... 添加自定义指标 push_to_gateway( 'prometheus-pushgateway:9091', job='daily_report', registry=registry ) # 同时发送关键指标到企业微信 critical = get_critical_metrics() if critical: WeComRobot("ALERT_KEY").send_markdown( f"⚠️ 监控指标异常:\n{critical}" )6. 高级功能扩展
消息卡片交互:
def send_interactive_card(): payload = { "msgtype": "template_card", "template_card": { "card_type": "button_interaction", "source": { "desc": "每日站会提醒" }, "main_title": { "title": "10:00 每日站会", "desc": "请更新您的任务状态" }, "task_id": "daily_meeting_reminder", "buttons": [ { "text": "我已准备就绪", "key": "ready" }, { "text": "需要延迟", "key": "delay" } ] } } requests.post(robot.base_url, json=payload)消息加密与安全增强:
import hmac from cryptography.fernet import Fernet class SecureWeComRobot(WeComRobot): def __init__(self, key: str, secret: str): self.secret = secret self.cipher = Fernet.generate_key() super().__init__(key) def _sign_payload(self, payload: dict) -> dict: timestamp = str(int(time.time())) sign_str = f"{timestamp}\n{self.secret}" signature = hmac.new( self.secret.encode(), sign_str.encode(), hashlib.sha256 ).hexdigest() payload.update({ "timestamp": timestamp, "sign": signature }) return payload7. 部署与维护方案
Docker化部署:
FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY templates ./templates COPY reporter.py . CMD ["python", "reporter.py"]系统服务配置(systemd示例):
[Unit] Description=Daily Report Bot After=network.target [Service] User=reportbot WorkingDirectory=/opt/reportbot ExecStart=/usr/bin/docker-compose up Restart=always [Install] WantedBy=multi-user.target日志收集配置:
import structlog structlog.configure( processors=[ structlog.processors.JSONRenderer() ], logger_factory=structlog.WriteLoggerFactory( file=open("/var/log/reportbot.json", "a") ) ) logger = structlog.get_logger() logger.info("启动日报机器人", config=current_config)