**工业4.0时代下基于Python的智能制造设备状态实时监控系统设计与实现**在工业4.
2026/4/23 3:18:22 网站建设 项目流程

工业4.0时代下基于Python的智能制造设备状态实时监控系统设计与实现

在工业4.0浪潮中,设备联网、数据驱动决策、边缘计算和数字孪生已成为核心趋势。传统工厂依赖人工巡检与离线报表,难以满足柔性制造与预测性维护的需求。本文将介绍一个基于Python + MQTT + SQLite + Flask 的轻量级设备状态实时监控平台,适用于中小型企业快速部署。


一、系统架构设计(流程图示意)

+-------------------+ | PLC/传感器终端 | ←→ MQTT Broker (Mosquitto) +---------+---------+ | v +---------+---------+ | Python 数据采集层 | ←→ SQLite 存储设备状态日志 +---------+---------+ | v +---------+---------+ | Flask Web API 层 | ←→ 前端可视化界面(Vue.js) +---------+---------+ | v +-------------------+ | 实时告警 & 报表生成 | +-------------------+ ``` > 🔍 核心思想:用 **MQTT 协议实现低延迟通信**,利用 **SQLite 轻量级数据库记录历史状态**,通过 Flask 提供 RESTful 接口供前端调用。 --- ### 二、代码实现详解 #### 1. MQTT 数据订阅脚本(`mqtt_collector.py`) ```python import paho.mqtt.client as mqtt import json import sqlite3 from datetime import datetime # 数据库初始化 def init_db(): conn = sqlite3.connect('device_status.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS device_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT, status TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() # MQTT 回调函数 def on_message(client, userdata, msg): payload = json.loads(msg.payload.decode()) device_id = payload.get("device_id") status = payload.get("status") # 如 "running", "error", "maintenance" conn = sqlite3.connect('device_status.db') cursor = conn.cursor() cursor.execute( "INSERT INTO device_logs (device_id, status) VALUES (?, ?)", (device_id, status) ) conn.commit() conn.close() print(f"[{datetime.now()}] Device {device_id} status: {status}") client = mqtt.Client() client.on_message = on_message client.connect("localhost", 1883, 60) client.subscribe("industrial/device/#") client.loop_forever()

✅ 这段代码可直接运行在边缘设备或服务器上,监听来自现场PLC的JSON格式状态消息,并持久化到本地数据库。


2. Flask API 接口(api.py
fromflaskimportFlask,jsonifyimportsqlite3 app=Flask(__name__)@app.route('/api/device/<device_id>/status',methods=['GET'])defget_latest_status(device_id):conn=sqlite3.connect('device_status.db')cursor=conn.cursor()cursor.execute(""" SELECT status, timestamp FROM device_logs WHERE device_id = ? ORDER BY timestamp DESC LIMIT 1 """,(device_id,))row=cursor.fetchone()ifnotrow:returnjsonify({"error":"Device not found"}),404returnjsonify({"device_id":device_id,"status":row[0],"timestamp":row[1]})@app.route('/api/device/<device_id>/history',methods=['GET'])defget_history(device_id):days=int(request.args.get('days',7))conn=sqlite3.connect('device_status.db')cursor=conn.cursor()cursor.execute(""" SELECT status, timestamp FROM device_logs WHERE device_id = ? AND timestamp > datetime('now', '-{} days') ORDER BY timestamp DESC """.format(days),(device_id,))data=[{"status":r[0],"time":r[1]}forrincursor.fetchall()]returnjsonify(data)if__name__=='__main__':app.run(host='0.0.0.0',port=5000,debug=False)``` 📌 此API支持两个关键功能:-获取某设备最新状态(用于前端展示)--获取过去N天的状态变化(用于趋势分析)---### 三、实际应用场景示例假设你有一个注塑机(设备ID为 `MACHINE_01`),它每5秒上报一次状态: ```json{"device_id":"MACHINE_01","status":"running"}``` 当其变为 `"error"` 时,你可以设置如下规则触发告警: ```python# 在 MQTT 消息处理逻辑中添加判断ifstatus=="error":send_alert_to_slack(f"⚠️ 设备{device_id}出现异常!")``` 这使得整个系统具备**故障自动感知+日志归档+Web 可视化**三位一体的能力。---### 四、为什么选择这个技术栈?|组件|优势||------\------\|**Python**|生态丰富,开发效率高,适合工业场景快速原型||**MQTT**|协议轻量、支持断线重连,适合工业网络不稳定环境||**SQLite**|无需独立DB服务,嵌入式存储,适合单节点部署||**Flask**\ 极简Web框架,便于构建内部管理系统|>💡 特别适合中小型制造企业,无需引入Kafka或elasticsearch这类重型中间件即可实现基础数字化能力。---### 五、未来扩展建议-✅ 加入 Grafana dashboard 展示设备在线率、停机时间等指标--✅ 引入机器学习模型对设备振动数据进行预测性维护(如 LSTM)--✅ 部署到树莓派等边缘设备,形成“边缘采集+中心分析”架构---### 总结本方案以极低成本构建了工业4.0所需的**设备状态感知闭环**,不仅解决了信息孤岛问题,也为后续引入AI预测模型打下坚实基础。对于正在推进数字化转型的企业而言,这是值得优先落地的第一步! 👉 现在就可以动手试试看:安装 mosquitto、运行采集脚本、访问 `/api/device/MACHINE_01/status` 查看结果! 你不需要复杂的硬件,只需要一台带网卡的linux服务器或树莓派,就能迈出智能制造的第一步。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询