Python多串口监控:基于pyMultiSerial实现多设备并发通信
2026/6/1 18:12:15 网站建设 项目流程

1. 项目概述

在物联网、工业自动化或者机器人项目中,我们经常会遇到一个核心需求:一台主机需要同时与多个下位机设备进行通信。这些下位机可能是Arduino、ESP32,或者是各种传感器模块。想象一下,你正在搭建一个智能温室系统,需要同时读取分布在各个区域的温湿度、光照和土壤湿度传感器(由多个Arduino控制);或者你在开发一个多轴机械臂控制器,需要实时接收来自多个关节电机的反馈数据。在这些场景下,传统的单串口轮询方式不仅效率低下,还会因为阻塞导致数据丢失和响应延迟。这正是“Python多串口监控”技术要解决的痛点。

简单来说,这个项目就是利用Python,在一台运行Raspberry Pi或普通电脑的主机上,同时、独立地监听多个串口(比如连接了多个Arduino的USB端口),实现数据的并发接收与处理。其核心价值在于,它提供了一种非阻塞、事件驱动的架构,让开发者可以像处理网络请求一样,优雅地管理多个硬件设备的数据流。无论你是物联网开发者、硬件爱好者,还是从事自动化测试的工程师,掌握这项技术都能让你在涉及多设备通信的项目中游刃有余,构建出响应迅速、稳定可靠的数据采集与控制中枢。

2. 核心方案选型与pyMultiSerial库解析

面对多串口监控的需求,通常有几种技术路径。最基础的是单线程轮询,即在一个循环中依次打开每个串口,读取数据,然后处理。这种方法代码简单,但致命缺点是阻塞:当你读取一个没有数据的串口时,程序会卡住,直到超时,其他串口的数据就可能被积压甚至丢失,完全无法满足实时性要求。

进阶一些的方案是使用多线程,为每个串口创建一个独立的线程进行读写操作。这解决了阻塞问题,但线程的创建、销毁、同步(如防止多个线程同时打印到控制台导致信息混乱)会引入额外的复杂度,对开发者的多线程编程能力要求较高,且线程资源开销不容忽视。

更优雅的方案是采用异步I/O(Asyncio)配合串口库。这是现代Python处理高并发I/O的推荐方式,性能好、资源占用低。但对于不熟悉异步编程的开发者来说,其回调、async/await语法的学习曲线较陡峭,且需要所使用的串口库本身支持异步操作。

综合来看,我们需要的是一种既能避免阻塞、简化并发编程,又易于上手和集成的方案。这正是pyMultiSerial库的用武之地。它本质上是一个基于多线程和回调(Callback)机制的高级封装库。你不需要手动创建和管理线程,也不需要深入理解异步编程模型。你只需要告诉它:有哪些串口(或让它自动发现),波特率设多少,然后提供几个回调函数——当串口连接、收到数据、断开连接时应该做什么。库内部会帮你处理好线程调度、数据读取和事件触发这些脏活累活。

注意pyMultiSerial并非Python标准库或最主流的串口库(如pyserial)。它是社区开发的一个专用工具,其优势在于针对“多串口监控”这一特定场景进行了高度抽象和简化。如果你的项目需求非常复杂,需要精细控制每一个字节的收发时序,可能仍需基于pyserial自行构建多线程或异步架构。但对于绝大多数需要同时监控多个设备上报数据的应用来说,pyMultiSerial提供了绝佳的“开箱即用”体验。

2.1 pyMultiSerial的工作原理与核心类

pyMultiSerial的核心是MultiSerial类。当你创建一个MultiSerial对象并启动后,它会在后台做以下几件事:

  1. 端口扫描与监控:库会启动一个独立的监控线程,定期扫描系统可用的串口列表(例如/dev/ttyUSB0,COM3等)。
  2. 动态连接管理:当检测到新的串口设备插入(如连接Arduino的USB线),它会尝试以设定的参数(波特率、超时等)打开该端口。如果打开成功,则触发port_connection_found_callback回调,并将该端口的读写操作交给一个新的后台线程管理。
  3. 数据异步读取:每个已连接端口的后台线程会以非阻塞方式持续读取数据。一旦读到完整的一行(默认以换行符\n为分隔)或达到超时条件,就会将读到的数据通过port_read_callback回调函数传递给主程序。
  4. 断开检测与清理:当某个端口读取失败(通常意味着设备被拔出),该端口的监控线程会关闭连接,并触发port_disconnection_callback回调,同时清理相关资源。

这种事件驱动的模型,使得你的主程序逻辑可以保持清晰。你只需要关心“当某事发生时,我该做什么”,而不必纠结于“如何轮询、如何不阻塞、如何管理线程生命周期”这些底层细节。

3. 环境准备与硬件连接

在开始编写代码之前,扎实的准备工作是成功的一半。这个阶段的目标是确保每个硬件设备都能被系统正确识别,并独立、稳定地工作。

3.1 硬件清单与连接要点

你需要准备以下硬件:

  • 主机:一台Raspberry Pi(任何型号,建议Raspberry Pi 3B+或以上)或一台装有Python的Windows/Linux/macOS笔记本电脑。
  • 下位机:至少两个Arduino开发板(Uno、Nano、Mega等均可)。数量可以根据你的需求增加,理论上仅受限于主机的USB端口数量和系统资源。
  • 连接线:对应数量的USB数据线(用于同时为Arduino供电和通信)。

连接时的注意事项

  1. 逐一连接,逐一确认:不要一次性把所有Arduino都插上。建议先只连接一个Arduino到主机,完成从烧录程序到在Python中测试识别的全过程后,再连接第二个。这有助于在出现问题时快速定位是某个特定设备、USB线还是端口的问题。
  2. 电源考量:如果使用笔记本电脑,同时连接多个Arduino(尤其是功耗较高的型号或连接了外设时)可能会接近或超过USB集线器或端口的供电极限,导致设备反复连接断开或工作不稳定。如果出现这种情况,考虑为Arduino使用独立的外部电源(如9V电池适配器),仅将USB线用于数据传输。
  3. 端口标识:物理上,给每个Arduino和对应的USB线贴上标签(如Arduino 1, 2, 3),在后续排查问题时非常有用。

3.2 软件环境配置

在主机侧,你需要安装Python(3.6或以上版本)和必要的库。

对于Raspberry Pi(或Linux/macOS): 打开终端,依次执行以下命令:

# 更新软件包列表(可选,但推荐) sudo apt-get update # 安装Python3和pip(如果尚未安装) sudo apt-get install python3 python3-pip # 使用pip安装pyMultiSerial库 pip3 install pyMultiSerial

对于Windows

  1. 确保已从 python.org 安装Python 3,并在安装时勾选了“Add Python to PATH”。
  2. 打开命令提示符(CMD)或PowerShell。
  3. 执行安装命令:
    pip install pyMultiSerial

验证安装与权限(Linux/macOS特别重要): 在Linux系统(包括Raspberry Pi OS)上,普通用户默认可能没有直接访问串口设备(/dev/ttyUSB*/dev/ttyACM*)的权限。这会导致程序运行时出现Permission denied错误。

# 查看串口设备,连接一个Arduino后,通常会出现类似 /dev/ttyACM0 的设备 ls /dev/ttyACM* /dev/ttyUSB* # 将当前用户添加到 dialout 组,该组通常拥有串口访问权限 sudo usermod -a -G dialout $USER

执行完usermod命令后,必须注销并重新登录,或者重启系统,这个组权限变更才会生效。这是很多新手容易忽略的关键一步。

3.3 Arduino端程序准备

每个Arduino需要运行一个简单的程序,周期性地通过串口发送一条包含自身ID的信息。这模拟了传感器数据上报或状态心跳。

打开Arduino IDE,为每个Arduino分别编写并上传以下代码。关键点在于,每个Arduino的ARDUINO_ID必须不同。

// 定义此Arduino的唯一ID,每个设备必须修改此值! #define ARDUINO_ID 1 // 对于第二个Arduino,改为2,以此类推。 void setup() { // 初始化串口通信,波特率设置为9600,与Python端匹配 Serial.begin(9600); // 可选:等待串口连接,对于某些需要稳定连接的场景有用 // while (!Serial) { ; } } void loop() { // 模拟传感器数据采集或周期任务 delay(5000); // 每5秒发送一次数据 // 构造并发送消息 String message = "Hello from Arduino No." + String(ARDUINO_ID); Serial.println(message); // 使用println自动在末尾添加换行符\n // 在实际项目中,这里可能是读取传感器并发送真实数据,例如: // int sensorValue = analogRead(A0); // Serial.println("Sensor_A0:" + String(sensorValue) + ",ID:" + String(ARDUINO_ID)); }

上传与独立测试

  1. ARDUINO_ID改为1,上传到第一个Arduino。
  2. 在Arduino IDE中打开串口监视器(波特率设为9600),确认每隔5秒能收到Hello from Arduino No.1
  3. 关闭这个Arduino IDE的串口监视器窗口。这一点至关重要,因为一个串口在同一时间只能被一个程序打开。如果IDE占用了端口,我们的Python程序将无法访问。
  4. 对第二个Arduino重复步骤1-3,将ID改为2,并确保在独立的IDE实例或上传后关闭监视器进行测试。

4. Python多串口监控程序实现详解

现在,我们将重心转移到主机上的Python程序。我们将一步步构建一个健壮的多串口监控脚本。

4.1 基础监控脚本构建

首先,创建一个名为multi_serial_monitor.py的文件。以下是完整的代码,我们将逐部分拆解。

import pyMultiSerial as p import time from datetime import datetime # 创建全局MultiSerial对象 ms = p.MultiSerial() # 配置串口参数 ms.baudrate = 9600 # 必须与Arduino端设置的波特率一致 ms.timeout = 2 # 读超时时间(秒)。设置一个合理的值,避免无数据时线程长时间阻塞。 ms.encoding = 'utf-8' # 指定编码,确保文本正确解码。根据Arduino发送的数据调整,默认可能是'ascii'。 # 可选:手动指定端口列表,而不是自动发现 # ms.portlist = ['COM3', 'COM5'] # Windows # ms.portlist = ['/dev/ttyACM0', '/dev/ttyACM1'] # Linux/RPi # 回调函数1:当发现并成功连接一个新端口时触发 def on_port_connected(portno, serial_obj): # portno: 字符串,端口名称,如 'COM3' 或 '/dev/ttyACM0' # serial_obj: 该端口对应的串口对象(来自pyserial),可用于进行更底层的操作(如直接write) current_time = datetime.now().strftime("%H:%M:%S") print(f"[{current_time}] 设备已连接至端口: {portno}") # 你可以在这里初始化与该设备相关的数据结构,比如: # device_data[portno] = {'id': 'unknown', 'last_seen': current_time} # 将回调函数注册给MultiSerial对象 ms.port_connection_found_callback = on_port_connected # 回调函数2:当从某个端口读取到数据时触发 def on_data_received(portno, serial_obj, text): # portno: 来源端口 # serial_obj: 来源端口的串口对象 # text: 读取到的字符串(已根据encoding解码)。注意:如果一行数据很长,可能会分多次回调。 current_time = datetime.now().strftime("%H:%M:%S") # 去除接收到的文本两端的空白字符(如换行符) cleaned_text = text.strip() if cleaned_text: # 避免打印空行 print(f"[{current_time}] 端口 {portno} -> {cleaned_text}") # 示例:解析数据并采取行动 # if "Temperature" in cleaned_text: # # 提取温度值并处理... # pass # 你也可以通过serial_obj.write()向该特定端口发送指令 # serial_obj.write(b"ACK\n") # 发送字节数据 ms.port_read_callback = on_data_received # 回调函数3:当某个端口断开连接(设备拔出)时触发 def on_port_disconnected(portno): current_time = datetime.now().strftime("%H:%M:%S") print(f"[{current_time}] 警告: 端口 {portno} 已断开连接!") # 在这里进行资源清理,比如从 device_data 字典中删除 portno 对应的项 ms.port_disconnection_callback = on_port_disconnected # 主程序入口 if __name__ == "__main__": print("=== Python多串口监控程序启动 ===") print(f"配置: 波特率={ms.baudrate}, 超时={ms.timeout}秒") print("正在扫描并监控串口... (按 Ctrl+C 停止)") print("-" * 40) try: # 启动监控!这是一个阻塞调用,程序会停在这里,直到监控被停止。 ms.Start() except KeyboardInterrupt: # 当用户在控制台按下 Ctrl+C 时,会触发此异常 print("\n接收到中断信号,正在停止监控...") finally: # 确保资源被清理(尽管库内部可能已经处理,但显式调用是好的习惯) # 注意:原库的Stop方法可能需要根据版本确认,有些版本在Start()阻塞时无法被外部调用。 # 更常见的做法是Ctrl+C后直接退出,库的析构函数会处理。 print("监控程序已停止。")

关键代码解析与实操要点

  1. ms.timeout参数:这个值非常重要。它决定了每次尝试从串口读取数据时,等待的最长时间。设置得太短(如0.1秒),在数据发送间隔期内会频繁触发超时,增加CPU开销;设置得太长(如10秒),会延迟设备断开连接的检测。对于我们的示例(5秒发送一次),2秒是一个合理的折中值。它允许在数据发送周期内有一次完整的读取尝试,并在设备拔出后相对快速地感知。

  2. ms.encoding参数:确保Python端解码方式与Arduino端编码方式匹配。Arduino的Serial.println()默认发送ASCII/UTF-8文本。如果传输的是二进制数据,则需要将此参数设为None,并在回调函数中直接处理bytes对象。

  3. 回调函数的执行上下文:务必理解,回调函数是在pyMultiSerial库的内部线程中被调用的。这意味着:

    • 避免耗时操作:不要在回调函数中执行复杂的计算、网络请求或阻塞性I/O,否则会拖慢其他端口的处理速度,甚至导致数据缓冲区溢出。如果必须处理复杂逻辑,应考虑将数据放入一个队列(queue.Queue),由主线程或其他工作线程消费。
    • 线程安全:如果回调函数会修改全局变量(如一个存储所有设备数据的字典),需要考虑使用线程锁(threading.Lock)来防止数据竞争。
  4. ms.Start()的阻塞特性:调用ms.Start()后,主线程就阻塞在那里了。这就是为什么所有逻辑都要写在回调函数里。如果你想在监控的同时执行其他任务(比如一个GUI界面),你需要将ms.Start()放在一个单独的线程中运行。

4.2 运行与效果验证

  1. 确保所有Arduino已上传正确ID的程序,且Arduino IDE的串口监视器已关闭。
  2. 在主机上打开终端或命令提示符,导航到脚本所在目录。
  3. 运行脚本:
    # 在Raspberry Pi或Linux/macOS上 python3 multi_serial_monitor.py
    # 在Windows上 python multi_serial_monitor.py
  4. 观察输出。初始时,程序会开始扫描端口。此时,依次将你的Arduino通过USB线连接到主机。你应该会看到类似以下的输出:
    === Python多串口监控程序启动 === 配置: 波特率=9600, 超时=2秒 正在扫描并监控串口... (按 Ctrl+C 停止) ---------------------------------------- [14:25:31] 设备已连接至端口: COM3 [14:25:36] 端口 COM3 -> Hello from Arduino No.1 [14:25:33] 设备已连接至端口: COM5 [14:25:38] 端口 COM5 -> Hello from Arduino No.2 [14:25:41] 端口 COM3 -> Hello from Arduino No.1 [14:25:43] 端口 COM5 -> Hello from Arduino No.2
  5. 尝试拔掉一个Arduino的USB线,你会看到断开连接的回调信息被触发。
  6. Ctrl+C可以优雅地停止整个监控程序。

5. 进阶应用与功能扩展

基础监控跑通后,我们可以根据实际项目需求,对这个框架进行功能增强。

5.1 数据解析与结构化存储

在实际项目中,Arduino发送的很少是简单的问候语,而是结构化的传感器数据。例如:TEMP:25.6,HUM:60,ID:1。我们需要在回调函数中解析这些数据,并存储起来。

import json import threading # 用于存储各端口最新数据的字典,使用线程锁保证安全 data_lock = threading.Lock() device_status = {} def on_data_received_advanced(portno, serial_obj, text): cleaned_text = text.strip() if not cleaned_text: return print(f"[{datetime.now()}] RAW from {portno}: {cleaned_text}") # 示例1:解析键值对数据 (如 "TEMP:25.6,HUM:60,ID:1") try: # 假设数据格式为逗号分隔的 key:value 对 data_pairs = cleaned_text.split(',') parsed_data = {} for pair in data_pairs: if ':' in pair: key, value = pair.split(':', 1) parsed_data[key.strip()] = value.strip() # 获取设备ID(假设数据中包含ID字段) device_id = parsed_data.get('ID', 'unknown') # 线程安全地更新全局状态 with data_lock: device_status[portno] = { 'id': device_id, 'data': parsed_data, 'last_update': datetime.now().isoformat() } # 示例:根据数据做出反应 temp = parsed_data.get('TEMP') if temp: temp_f = float(temp) if temp_f > 30.0: print(f"警告: 设备 {device_id} (端口 {portno}) 温度过高: {temp_f}°C") # 可以在这里通过 serial_obj.write() 发送控制指令,如打开风扇 # serial_obj.write(b"FAN:ON\n") except Exception as e: print(f"解析来自 {portno} 的数据时出错: {e}, 原始数据: {cleaned_text}") # 示例2:将数据追加到文件(注意:频繁文件IO可能影响性能,生产环境建议用队列+批量写入) # with open('sensor_log.csv', 'a') as f: # f.write(f"{datetime.now().isoformat()},{portno},{cleaned_text}\n")

5.2 主动查询与双向通信

目前的模型是“被动监听”,即Arduino主动上报。有时我们需要“主动查询”,即主机向特定Arduino发送指令,请求数据。

import threading import time def query_device(serial_obj, command): """向指定串口对象发送查询命令""" if serial_obj and serial_obj.is_open: try: # 命令需要以字节形式发送,末尾常加换行符作为结束标志 serial_obj.write((command + '\n').encode('utf-8')) print(f"已发送命令: {command}") except Exception as e: print(f"发送命令失败: {e}") # 在主线程或另一个定时线程中,可以这样调用 def periodic_query_task(): """一个示例性的定时查询任务""" while True: time.sleep(10) # 每10秒查询一次 # 注意:需要以线程安全的方式获取当前的串口对象列表。 # pyMultiSerial可能没有直接提供这个接口。一种方法是自己维护一个映射。 # 更简单的方式是利用回调:在on_port_connected中将(portno, serial_obj)存入一个字典。 # 这里仅为示意逻辑。 # for port, ser in connected_ports.items(): # if port == '/dev/ttyACM0': # 只查询特定设备 # query_device(ser, "GET_STATUS") # 启动查询线程(需谨慎,确保线程安全) # query_thread = threading.Thread(target=periodic_query_task, daemon=True) # query_thread.start()

重要提示:在回调函数内部(如on_data_received)向同一个serial_obj写入数据通常是安全的,因为pyMultiSerial内部可能已经做了同步。但从外部其他线程直接调用serial_obj.write(),则需要非常小心线程竞争问题。更稳健的做法是,在需要发送指令时,通过一个线程安全的队列将(portno, command)对传递出去,然后由一个专有的发送线程(或就在主回调线程的某个安全时机)统一处理写操作。

5.3 集成到Web API或GUI界面

一个强大的监控系统往往需要一个前端界面。我们可以使用轻量级的Web框架(如Flask)将数据暴露为API,或使用tkinterPyQt创建桌面GUI。

Flask Web API示例片段

from flask import Flask, jsonify import threading app = Flask(__name__) @app.route('/api/status', methods=['GET']) def get_all_status(): """获取所有设备的最新状态""" with data_lock: # 使用之前定义的线程锁 # 返回JSON格式的数据 return jsonify(device_status) @app.route('/api/device/<port>/command', methods=['POST']) def send_command_to_device(port): """向指定端口的设备发送命令(需实现命令队列)""" command = request.json.get('command') if not command: return jsonify({'error': 'No command provided'}), 400 # 这里需要将命令放入一个线程安全的队列,供串口发送线程消费 # command_queue.put((port, command)) return jsonify({'message': f'Command {command} queued for {port}'}) if __name__ == '__main__': # 注意:Flask默认是单线程,不适合处理长时间阻塞任务。 # 必须将ms.Start()放在一个独立的线程中运行,否则Flask服务器无法启动。 serial_thread = threading.Thread(target=ms.Start, daemon=True) serial_thread.start() app.run(host='0.0.0.0', port=5000, debug=False) # 生产环境请关闭debug

这样,你就可以通过浏览器访问http://你的树莓派IP:5000/api/status来实时查看所有连接设备的数据了。

6. 常见问题排查与实战经验

即使按照步骤操作,你也可能会遇到一些坑。以下是我在多次实践中总结的常见问题及其解决方法。

6.1 问题排查速查表

问题现象可能原因排查步骤与解决方案
程序报错ModuleNotFoundError: No module named 'pyMultiSerial'1. 库未安装。
2. 使用了错误的Python环境(如系统Python vs 虚拟环境)。
1. 确认安装命令执行成功:pip3 list | grep pyMultiSerial(Linux) 或pip list | findstr pyMultiSerial(Windows)。
2. 检查你运行脚本的Python解释器是否和安装库的是同一个。使用python3 -c "import pyMultiSerial; print(pyMultiSerial.__file__)"来验证。
程序运行但无任何输出,设备插入也无反应1. 端口权限问题(Linux/macOS)。
2.pyMultiSerial扫描的端口列表不对。
3. Arduino程序未运行或波特率不匹配。
1.Linux/macOS:确认用户已在dialout组且已重新登录。运行ls -l /dev/ttyACM*查看权限。
2. 在代码中ms.Start()前添加print(“扫描端口:”, ms.portlist),或尝试手动指定ms.portlist = [‘COM3’, ‘COM4’]
3. 用Arduino IDE串口监视器单独测试每个Arduino,确保其能正常发送数据,并核对波特率是否为9600。
能检测到设备连接,但收不到数据1. Arduino的串口监视器未关闭,端口被占用。
2. Python程序和Arduino的波特率、数据位、停止位、校验位不匹配。
3. 数据未以换行符结尾,而pyMultiSerial默认按行读取。
1. 关闭所有可能占用串口的程序(Arduino IDE, Putty, 其他串口工具)。
2. 确保ms.baudrateSerial.begin()设置一致。其他参数ms.bytesize,ms.parity,ms.stopbits默认通常为8-N-1,与Arduino一致。
3. 检查Arduino代码是否使用Serial.println()(自动加\n)或Serial.print()。如果是print(),需在末尾手动加\n,或调整pyMultiSerial的读取方式(如果库支持设置结束符)。
收到乱码1. 编码不匹配。
2. 波特率误差太大(特别是使用软件串口时)。
1. 尝试在代码中设置ms.encoding = ‘ascii’‘latin-1’,或直接处理bytes(设置encoding=None,回调函数中textbytes)。
2. 使用更稳定的硬件串口,并确保主机和下位机使用相同的标准波特率(如9600, 115200)。
程序运行一段时间后卡死或无响应1. 回调函数中有阻塞或异常,导致监控线程被挂起。
2. 系统USB电源管理导致端口休眠。
3. 内存泄漏(在长时间运行中积累)。
1.确保回调函数尽可能轻量、快速,避免任何可能阻塞或长时间运行的操作。用try...except包裹回调内部逻辑。
2. 在Linux上,可以尝试禁用USB自动挂起:sudo sh -c "echo -1 > /sys/module/usbcore/parameters/autosuspend"(临时)。
3. 检查代码中是否有全局列表或字典在无限增长而未清理。利用on_port_disconnected回调及时清理无效数据。
在Windows上,COM端口号大于COM9时出现问题Windows对于COM10及以上的端口,在pyserialpyMultiSerial底层)中需要特殊的设备名格式。使用\\.\COM10这样的格式来指定端口。例如:ms.portlist = [‘COM3’, ‘COM5’, r’\\.\COM10’]。注意字符串前的r和双反斜杠。

6.2 实战经验与技巧

  1. 从简单开始,逐步复杂化:首先让最基本的“Hello World”示例在两个设备上稳定运行。然后再逐步添加数据解析、文件存储、网络通信等功能。每添加一个功能,都进行测试,便于定位问题。

  2. 日志是你的好朋友:不要只依赖print。使用Python的logging模块,将信息记录到不同级别的日志文件中(DEBUG, INFO, ERROR)。这尤其有助于排查那些偶发性的、在程序运行数小时后才出现的问题。

    import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler('serial_monitor.log'), logging.StreamHandler()]) logger = logging.getLogger(__name__) # 在回调中使用 logger.info(f"Port {portno} connected") 代替 print
  3. 处理“粘包”与“断包”:串口通信是流式的,没有消息边界。pyMultiSerial的回调基于“行”(换行符)。如果你的数据本身包含换行符,或者数据量很大一行发不完,就可能出现消息被拆分或合并。对于复杂协议,建议在Arduino端定义自己的帧结构(例如,以特定起始字节和长度开头),然后在Python端实现一个简单的状态机在回调函数中解析,或者考虑使用更底层的pyserial自己实现读取逻辑。

  4. 为每个设备维护状态:在全局字典中,以端口号或设备ID为键,存储该设备的上次通信时间、数据历史、配置等信息。在on_port_connected中初始化,在on_data_received中更新,在on_port_disconnected中清理。这为制作设备在线状态面板、超时报警等功能打下基础。

  5. 压力测试:在实际部署前,模拟最坏情况。同时插拔多个设备,让Arduino以最高频率发送数据,连续运行监控程序24小时以上,观察内存使用是否稳定,是否有连接泄漏或异常崩溃。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询