基于Python的程序,结合Nachi机器人和Hikvision视觉系统来实现轨迹涂布功能,且高度可以调整。
项目结构
文件 功能 nachi_robot_controller.py Nachi机器人TCP/IP通信控制模块 hikvision_vision.py Hikvision视觉系统集成模块 trajectory_coating.py 轨迹规划和涂布核心逻辑 main_coating.py 主程序入口 config.py 配置文件
## ✨ 主要功能
1. 机器人控制 - 通过TCP/IP连接Nachi机器人,实现点对点和直线运动控制
2. 视觉系统集成 - 连接Hikvision工业相机,进行边缘检测、轮廓提取和轨迹生成
3. 轨迹生成 - 支持矩形、圆形、螺旋等多种轨迹类型
4. 高度调整 - 基础高度和动态高度偏移可调
## 🚀 使用示例
# 基本矩形涂布
python main_coating.py --width 100 --height 100 --base-height 100 --height-offset 10
# 圆形轨迹涂布
python main_coating.py --trajectory circle --radius 50 --speed 40
# 螺旋轨迹涂布
python main_coating.py --trajectory spiral --radius 80 --turns 3
# 从视觉系统获取轨迹
python main_coating.py --trajectory vision --base-height 100 --height-offset 5
## 关键参数说明
参数 说明 默认值 --base-height 基础高度 (mm) 100 --height-offset 高度偏移量 (mm) 0 --speed 涂布速度 (1-100) 30 --mode 涂布模式 continuous
系统支持连续涂布、间歇涂布和螺旋涂布三种模式,高度可在运行时动态调整。
config.py
# Nachi-Hikvision 轨迹涂布系统配置文件 # 机器人配置 ROBOT_IP = "192.168.1.100" ROBOT_PORT = 8000 # 视觉系统配置 VISION_IP = "192.168.1.64" VISION_PORT = 9000 # 涂布参数 DEFAULT_WIDTH = 100.0 DEFAULT_HEIGHT = 100.0 DEFAULT_BASE_HEIGHT = 100.0 DEFAULT_HEIGHT_OFFSET = 0.0 DEFAULT_SPEED = 30 DEFAULT_MODE = "continuous" # 轨迹参数 TRAJECTORY_RESOLUTION = 1.0 CIRCLE_POINTS = 72 SPIRAL_TURNS = 3 SPIRAL_POINTS_PER_TURN = 36 # 视觉参数 EDGE_THRESHOLD = 128 EXPOSURE_TIME = 5000 CALIBRATION_DISTANCE = 100.0 # 通信超时设置 CONNECT_TIMEOUT = 10 COMMAND_TIMEOUT = 5 # 日志配置 LOG_LEVEL = "INFO" LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
hikvision_vision.py
""" Hikvision Vision System Integration 用于与Hikvision工业相机和视觉系统集成的模块 """ import socket import struct import logging from typing import List, Tuple, Optional, Dict from enum import Enum logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class VisionCommand(Enum): """视觉系统命令枚举""" CAPTURE = 0x01 GET_IMAGE = 0x02 DETECT_EDGE = 0x03 FIND_CONTOUR = 0x04 MEASURE_DISTANCE = 0x05 GET_CENTER = 0x06 SET_THRESHOLD = 0x07 SET_EXPOSURE = 0x08 class HikvisionVisionSystem: """Hikvision视觉系统集成类""" def __init__(self, ip: str = "192.168.1.64", port: int = 9000): """ 初始化Hikvision视觉系统 Args: ip: 视觉控制器IP地址 port: 通信端口 """ self.ip = ip self.port = port self.socket = None self.connected = False self calibration_data = {} def connect(self) -> bool: """ 连接到Hikvision视觉系统 Returns: 连接是否成功 """ try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(10) self.socket.connect((self.ip, self.port)) self.connected = True logger.info(f"成功连接到Hikvision视觉系统: {self.ip}:{self.port}") return True except Exception as e: logger.error(f"连接Hikvision视觉系统失败: {e}") self.connected = False return False def disconnect(self): """断开与视觉系统的连接""" if self.socket: try: self.socket.close() logger.info("已断开与Hikvision视觉系统的连接") except Exception as e: logger.error(f"断开连接时出错: {e}") finally: self.socket = None self.connected = False def send_vision_command(self, command: VisionCommand, params: bytes = b'') -> Optional[bytes]: """ 发送视觉命令 Args: command: 命令枚举 params: 命令参数 Returns: 响应数据,失败返回None """ if not self.connected: logger.error("未连接到视觉系统") return None try: header = struct.pack('>HIB', 0xFFFF, 0x0001, command.value) length = len(params) length_bytes = struct.pack('>I', length) packet = header + length_bytes + params self.socket.sendall(packet) response = self.socket.recv(4096) logger.debug(f"视觉命令 {command.name} 响应: {len(response)} bytes") return response except Exception as e: logger.error(f"发送视觉命令失败: {e}") return None def capture_image(self) -> bool: """ 捕获当前图像 Returns: 捕获是否成功 """ response = self.send_vision_command(VisionCommand.CAPTURE) return response is not None def detect_edge_points(self, threshold: int = 128) -> Optional[List[Tuple[float, float]]]: """ 检测边缘点 Args: threshold: 边缘检测阈值 (0-255) Returns: 边缘点列表 [(x, y), ...],失败返回None """ params = struct.pack('>B', threshold) response = self.send_vision_command(VisionCommand.DETECT_EDGE, params) if response and len(response) > 8: try: num_points = struct.unpack('>I', response[8:12])[0] points = [] for i in range(num_points): offset = 12 + i * 8 x = struct.unpack('>f', response[offset:offset+4])[0] y = struct.unpack('>f', response[offset+4:offset+8])[0] points.append((x, y)) logger.info(f"检测到 {num_points} 个边缘点") return points except Exception as e: logger.error(f"解析边缘点数据失败: {e}") return None def find_contour(self) -> Optional[List[Tuple[float, float]]]: """ 查找轮廓 Returns: 轮廓点列表,失败返回None """ response = self.send_vision_command(VisionCommand.FIND_CONTOUR) if response and len(response) > 8: try: num_points = struct.unpack('>I', response[8:12])[0] contour = [] for i in range(num_points): offset = 12 + i * 8 x = struct.unpack('>f', response[offset:offset+4])[0] y = struct.unpack('>f', response[offset+4:offset+8])[0] contour.append((x, y)) logger.info(f"找到包含 {num_points} 个点的轮廓") return contour except Exception as e: logger.error(f"解析轮廓数据失败: {e}") return None def get_center_point(self) -> Optional[Tuple[float, float]]: """ 获取图像中心点 Returns: 中心点坐标 (x, y),失败返回None """ response = self.send_vision_command(VisionCommand.GET_CENTER) if response and len(response) >= 12: try: x = struct.unpack('>f', response[8:12])[0] y = struct.unpack('>f', response[12:16])[0] logger.info(f"中心点: ({x:.2f}, {y:.2f})") return (x, y) except Exception as e: logger.error(f"解析中心点数据失败: {e}") return None def measure_distance(self, point1: Tuple[float, float], point2: Tuple[float, float]) -> Optional[float]: """ 测量两点之间的距离 Args: point1: 第一个点 (x, y) point2: 第二个点 (x, y) Returns: 距离值 (mm),失败返回None """ params = struct.pack('>ffff', point1[0], point1[1], point2[0], point2[1]) response = self.send_vision_command(VisionCommand.MEASURE_DISTANCE, params) if response and len(response) >= 12: try: distance = struct.unpack('>f', response[8:12])[0] logger.info(f"两点间距离: {distance:.2f} mm") return distance except Exception as e: logger.error(f"解析距离数据失败: {e}") return None def set_threshold(self, threshold: int) -> bool: """ 设置检测阈值 Args: threshold: 阈值 (0-255) Returns: 设置是否成功 """ params = struct.pack('>B', threshold) response = self.send_vision_command(VisionCommand.SET_THRESHOLD, params) return response is not None def set_exposure(self, exposure: int) -> bool: """ 设置曝光时间 Args: exposure: 曝光时间 (微秒) Returns: 设置是否成功 """ params = struct.pack('>I', exposure) response = self.send_vision_command(VisionCommand.SET_EXPOSURE, params) return response is not None def calibrate(self, known_distance: float) -> bool: """ 视觉系统标定 Args: known_distance: 已知距离 (mm) Returns: 标定是否成功 """ logger.info(f"开始视觉系统标定,参考距离: {known_distance} mm") response = self.send_vision_command(VisionCommand.MEASURE_DISTANCE) if response and len(response) >= 12: measured_distance = struct.unpack('>f', response[8:12])[0] if measured_distance > 0: self.calibration_data['scale'] = known_distance / measured_distance logger.info(f"标定完成,比例因子: {self.calibration_data['scale']:.4f}") return True return False def apply_calibration(self, pixel_coord: Tuple[float, float]) -> Tuple[float, float]: """ 应用标定转换 Args: pixel_coord: 像素坐标 (x, y) Returns: 转换后的物理坐标 (x, y) """ scale = self.calibration_data.get('scale', 1.0) return (pixel_coord[0] * scale, pixel_coord[1] * scale) def get_trajectory_from_vision(self, resolution: float = 1.0) -> Optional[List[Tuple[float, float]]]: """ 从视觉系统获取轨迹点 Args: resolution: 分辨率 (点间距,mm) Returns: 轨迹点列表,失败返回None """ contour = self.find_contour() if not contour: logger.error("未能获取轮廓") return None if len(contour) < 2: logger.warning("轮廓点太少") return contour trajectory = [contour[0]] accumulated_dist = 0.0 for i in range(1, len(contour)): dist = ((contour[i][0] - contour[i-1][0])**2 + (contour[i][1] - contour[i-1][1])**2)**0.5 accumulated_dist += dist if accumulated_dist >= resolution: trajectory.append(contour[i]) accumulated_dist = 0.0 logger.info(f"生成轨迹,共 {len(trajectory)} 个点") return trajectory
main_coating.py
""" 轨迹涂布系统主程序 Nachi Robot + Hikvision Vision - Trajectory Coating System """ import sys import argparse import logging from trajectory_coating import TrajectoryCoatingSystem, CoatingMode logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description='Nachi-Hikvision轨迹涂布系统') parser.add_argument('--robot-ip', type=str, default='192.168.1.100', help='Nachi机器人IP地址') parser.add_argument('--vision-ip', type=str, default='192.168.1.64', help='Hikvision视觉系统IP地址') parser.add_argument('--robot-port', type=int, default=8000, help='Nachi机器人端口') parser.add_argument('--vision-port', type=int, default=9000, help='Hikvision视觉系统端口') parser.add_argument('--width', type=float, default=100, help='涂布宽度 (mm)') parser.add_argument('--height', type=float, default=100, help='涂布高度 (mm)') parser.add_argument('--base-height', type=float, default=100, help='基础高度 (mm)') parser.add_argument('--height-offset', type=float, default=0, help='高度偏移量 (mm)') parser.add_argument('--speed', type=int, default=30, help='涂布速度 (1-100)') parser.add_argument('--mode', type=str, default='continuous', choices=['continuous', 'intermittent', 'spiral'], help='涂布模式') parser.add_argument('--trajectory', type=str, default='rectangle', choices=['rectangle', 'circle', 'spiral', 'vision'], help='轨迹类型') parser.add_argument('--radius', type=float, default=50, help='圆形/螺旋轨迹半径 (mm)') parser.add_argument('--turns', type=int, default=3, help='螺旋轨迹圈数') return parser.parse_args() def main(): """主函数""" args = parse_arguments() logger.info("=" * 60) logger.info("Nachi-Hikvision 轨迹涂布系统") logger.info("=" * 60) coating_system = TrajectoryCoatingSystem( robot_ip=args.robot_ip, vision_ip=args.vision_ip, robot_port=args.robot_port, vision_port=args.vision_port ) mode_map = { 'continuous': CoatingMode.CONTINUOUS, 'intermittent': CoatingMode.INTERMITTENT, 'spiral': CoatingMode.SPIRAL } coating_mode = mode_map.get(args.mode, CoatingMode.CONTINUOUS) if args.trajectory == 'rectangle': coating_system.generate_rectangle_trajectory( width=args.width, height=args.height ) elif args.trajectory == 'circle': coating_system.generate_circle_trajectory( radius=args.radius ) elif args.trajectory == 'spiral': coating_system.generate_spiral_trajectory( inner_radius=args.radius * 0.3, outer_radius=args.radius, turns=args.turns ) elif args.trajectory == 'vision': success = coating_system.load_trajectory_from_vision() if not success: logger.error("从视觉系统加载轨迹失败") sys.exit(1) success = coating_system.run_coating_cycle( width=args.width, height_val=args.height, base_height=args.base_height, height_offset=args.height_offset, speed=args.speed, mode=coating_mode ) if success: logger.info("涂布任务完成") sys.exit(0) else: logger.error("涂布任务失败") sys.exit(1) if __name__ == '__main__': main()
nachi_robot_controller.py
""" Nachi Robot Controller 用于控制Nachi机器人的TCP/IP通信模块 """ import socket import time import logging from typing import List, Tuple, Optional logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class NachiRobotController: """Nachi机器人控制器""" def __init__(self, ip: str = "192.168.1.100", port: int = 8000): """ 初始化Nachi机器人控制器 Args: ip: 机器人控制器IP地址 port: 通信端口 """ self.ip = ip self.port = port self.socket = None self.connected = False def connect(self) -> bool: """ 连接到机器人控制器 Returns: 连接是否成功 """ try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(10) self.socket.connect((self.ip, self.port)) self.connected = True logger.info(f"成功连接到Nachi机器人: {self.ip}:{self.port}") return True except Exception as e: logger.error(f"连接Nachi机器人失败: {e}") self.connected = False return False def disconnect(self): """断开与机器人控制器的连接""" if self.socket: try: self.socket.close() logger.info("已断开与Nachi机器人的连接") except Exception as e: logger.error(f"断开连接时出错: {e}") finally: self.socket = None self.connected = False def send_command(self, command: str) -> Optional[str]: """ 发送命令到机器人 Args: command: 命令字符串 Returns: 机器人响应,如果失败返回None """ if not self.connected: logger.error("未连接到机器人") return None try: self.socket.sendall((command + "\n").encode('utf-8')) response = self.socket.recv(1024).decode('utf-8') logger.debug(f"发送命令: {command}, 响应: {response}") return response except Exception as e: logger.error(f"发送命令失败: {e}") return None def move_to(self, x: float, y: float, z: float, speed: int = 50) -> bool: """ 移动到指定位置(点对点运动) Args: x: X坐标 (mm) y: Y坐标 (mm) z: Z坐标 (mm) speed: 速度百分比 (1-100) Returns: 移动是否成功 """ command = f"MOVE P,{x:.2f},{y:.2f},{z:.2f},0,0,0,{speed}" response = self.send_command(command) return response is not None and "OK" in response def move_linear(self, x: float, y: float, z: float, speed: int = 50) -> bool: """ 直线移动到指定位置 Args: x: X坐标 (mm) y: Y坐标 (mm) z: Z坐标 (mm) speed: 速度百分比 (1-100) Returns: 移动是否成功 """ command = f"MOVE L,{x:.2f},{y:.2f},{z:.2f},0,0,0,{speed}" response = self.send_command(command) return response is not None and "OK" in response def set_tool_height(self, height: float) -> bool: """ 设置工具高度(Z轴位置) Args: height: 高度值 (mm) Returns: 设置是否成功 """ command = f"SET Z,{height:.2f}" response = self.send_command(command) return response is not None and "OK" in response def get_current_position(self) -> Optional[Tuple[float, float, float]]: """ 获取当前位置 Returns: 当前位置 (x, y, z),如果失败返回None """ response = self.send_command("GET POS") if response: try: parts = response.split(",") return float(parts[0]), float(parts[1]), float(parts[2]) except (ValueError, IndexError) as e: logger.error(f"解析位置数据失败: {e}") return None def start_teach_mode(self) -> bool: """进入示教模式""" response = self.send_command("TEACH ON") return response is not None and "OK" in response def stop_teach_mode(self) -> bool: """退出示教模式""" response = self.send_command("TEACH OFF") return response is not None and "OK" in response def execute_trajectory(self, trajectory: List[Tuple[float, float, float]], speed: int = 50, height_offset: float = 0) -> bool: """ 执行轨迹运动 Args: trajectory: 轨迹点列表,每个点为 (x, y, z) speed: 运动速度 (1-100) height_offset: 高度偏移量 Returns: 轨迹执行是否成功 """ logger.info(f"开始执行轨迹,共 {len(trajectory)} 个点") for i, (x, y, z) in enumerate(trajectory): adjusted_z = z + height_offset logger.info(f"移动到点 {i+1}/{len(trajectory)}: ({x:.2f}, {y:.2f}, {adjusted_z:.2f})") if not self.move_linear(x, y, adjusted_z, speed): logger.error(f"轨迹执行在点 {i+1} 失败") return False time.sleep(0.1) logger.info("轨迹执行完成") return True def emergency_stop(self): """紧急停止""" try: self.send_command("ESTOP") logger.warning("紧急停止已触发") except Exception as e: logger.error(f"紧急停止失败: {e}")
trajectory_coating.py
""" Trajectory Coating System 轨迹涂布系统 - 结合Nachi机器人和Hikvision视觉系统 """ import logging import time from typing import List, Tuple, Optional, Dict from enum import Enum from nachi_robot_controller import NachiRobotController from hikvision_vision import HikvisionVisionSystem logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CoatingMode(Enum): """涂布模式""" CONTINUOUS = "continuous" INTERMITTENT = "intermittent" SPIRAL = "spiral" class TrajectoryCoatingSystem: """轨迹涂布系统""" def __init__(self, robot_ip: str = "192.168.1.100", vision_ip: str = "192.168.1.64", robot_port: int = 8000, vision_port: int = 9000): """ 初始化轨迹涂布系统 Args: robot_ip: Nachi机器人IP地址 vision_ip: Hikvision视觉系统IP地址 robot_port: 机器人通信端口 vision_port: 视觉系统通信端口 """ self.robot = NachiRobotController(robot_ip, robot_port) self.vision = HikvisionVisionSystem(vision_ip, vision_port) self.connected = False self.base_height = 100.0 self.current_height_offset = 0.0 self.coating_speed = 30 self.trajectory_points = [] def connect_all(self) -> bool: """ 连接所有设备 Returns: 所有连接是否成功 """ robot_ok = self.robot.connect() vision_ok = self.vision.connect() self.connected = robot_ok and vision_ok if self.connected: logger.info("所有设备连接成功") else: logger.error(f"设备连接失败 - 机器人: {robot_ok}, 视觉: {vision_ok}") return self.connected def disconnect_all(self): """断开所有设备连接""" self.robot.disconnect() self.vision.disconnect() self.connected = False logger.info("所有设备已断开") def set_base_height(self, height: float): """ 设置基础高度 Args: height: 基础高度值 (mm) """ self.base_height = height logger.info(f"基础高度设置为: {height} mm") def adjust_height(self, offset: float): """ 调整高度偏移 Args: offset: 高度偏移量 (mm),正数向上,负数向下 """ self.current_height_offset = offset logger.info(f"高度偏移调整为: {offset} mm") def get_effective_height(self) -> float: """ 获取有效高度 Returns: 有效高度值 """ return self.base_height + self.current_height_offset def set_coating_speed(self, speed: int): """ 设置涂布速度 Args: speed: 速度值 (1-100) """ self.coating_speed = max(1, min(100, speed)) logger.info(f"涂布速度设置为: {self.coating_speed}") def load_trajectory_from_vision(self, resolution: float = 1.0) -> bool: """ 从视觉系统加载轨迹 Args: resolution: 轨迹分辨率 (mm) Returns: 加载是否成功 """ logger.info("从视觉系统获取轨迹...") if not self.vision.connected: logger.error("视觉系统未连接") return False self.vision.capture_image() time.sleep(0.5) trajectory_2d = self.vision.get_trajectory_from_vision(resolution) if not trajectory_2d or len(trajectory_2d) < 2: logger.error("未能获取有效轨迹") return False center = self.vision.get_center_point() if center: offset_x, offset_y = center else: offset_x, offset_y = 0, 0 self.trajectory_points = [ (x - offset_x, y - offset_y, self.get_effective_height()) for x, y in trajectory_2d ] logger.info(f"轨迹加载完成,共 {len(self.trajectory_points)} 个点") return True def set_trajectory(self, points: List[Tuple[float, float]]): """ 手动设置2D轨迹点 Args: points: 2D轨迹点列表 [(x, y), ...] """ self.trajectory_points = [ (x, y, self.get_effective_height()) for x, y in points ] logger.info(f"轨迹已设置,共 {len(self.trajectory_points)} 个点") def generate_rectangle_trajectory(self, width: float, height: float, offset_x: float = 0, offset_y: float = 0, num_points: int = 100) -> List[Tuple[float, float, float]]: """ 生成矩形轨迹 Args: width: 矩形宽度 (mm) height: 矩形高度 (mm) offset_x: X方向偏移 offset_y: Y方向偏移 num_points: 轨迹点数 Returns: 3D轨迹点列表 """ points = [] half_w = width / 2 half_h = height / 2 perimeter = 2 * (width + height) points_per_side = { 'bottom': int(num_points * width / perimeter), 'right': int(num_points * height / perimeter), 'top': int(num_points * width / perimeter), 'left': int(num_points * height / perimeter) } for i in range(points_per_side['bottom']): t = i / points_per_side['bottom'] x = -half_w + offset_x + width * t y = -half_h + offset_y points.append((x, y, self.get_effective_height())) for i in range(points_per_side['right']): t = i / points_per_side['right'] x = half_w + offset_x y = -half_h + offset_y + height * t points.append((x, y, self.get_effective_height())) for i in range(points_per_side['top']): t = i / points_per_side['top'] x = half_w + offset_x - width * t y = half_h + offset_y points.append((x, y, self.get_effective_height())) for i in range(points_per_side['left']): t = i / points_per_side['left'] x = -half_w + offset_x y = half_h + offset_y - height * t points.append((x, y, self.get_effective_height())) self.trajectory_points = points logger.info(f"矩形轨迹生成完成: {len(points)} 点") return points def generate_circle_trajectory(self, radius: float, center_x: float = 0, center_y: float = 0, num_points: int = 72) -> List[Tuple[float, float, float]]: """ 生成圆形轨迹 Args: radius: 圆半径 (mm) center_x: 圆心X坐标 center_y: 圆心Y坐标 num_points: 轨迹点数 Returns: 3D轨迹点列表 """ points = [] for i in range(num_points): angle = 2 * 3.14159 * i / num_points x = center_x + radius * (angle - 0.5) y = center_y + radius * (angle - 0.5) points.append((x, y, self.get_effective_height())) self.trajectory_points = points logger.info(f"圆形轨迹生成完成: {num_points} 点") return points def generate_spiral_trajectory(self, inner_radius: float, outer_radius: float, center_x: float = 0, center_y: float = 0, turns: int = 3, points_per_turn: int = 36) -> List[Tuple[float, float, float]]: """ 生成螺旋轨迹 Args: inner_radius: 内圈半径 (mm) outer_radius: 外圈半径 (mm) center_x: 圆心X坐标 center_y: 圆心Y坐标 turns: 螺旋圈数 points_per_turn: 每圈点数 Returns: 3D轨迹点列表 """ points = [] total_points = turns * points_per_turn for i in range(total_points): t = i / total_points angle = 2 * 3.14159 * i / points_per_turn radius = inner_radius + (outer_radius - inner_radius) * t x = center_x + radius * (angle - 0.5) y = center_y + radius * (angle - 0.5) points.append((x, y, self.get_effective_height())) self.trajectory_points = points logger.info(f"螺旋轨迹生成完成: {total_points} 点") return points def execute_coating(self, mode: CoatingMode = CoatingMode.CONTINUOUS, pause_duration: float = 0.1) -> bool: """ 执行涂布 Args: mode: 涂布模式 pause_duration: 间歇涂布暂停时间 (秒) Returns: 涂布是否成功 """ if not self.trajectory_points: logger.error("没有可执行的轨迹") return False if not self.connected: logger.error("设备未连接") return False logger.info(f"开始执行涂布,模式: {mode.value}, 点数: {len(self.trajectory_points)}") height = self.get_effective_height() if mode == CoatingMode.CONTINUOUS: return self._execute_continuous_coating(height) elif mode == CoatingMode.INTERMITTENT: return self._execute_intermittent_coating(height, pause_duration) elif mode == CoatingMode.SPIRAL: return self._execute_continuous_coating(height) return False def _execute_continuous_coating(self, height: float) -> bool: """执行连续涂布""" logger.info("执行连续涂布...") for i, (x, y, _) in enumerate(self.trajectory_points): adjusted_point = (x, y, height) success = self.robot.move_linear( adjusted_point[0], adjusted_point[1], adjusted_point[2], self.coating_speed ) if not success: logger.error(f"涂布在点 {i+1} 失败") return False if i % 10 == 0: logger.info(f"涂布进度: {i+1}/{len(self.trajectory_points)}") time.sleep(0.05) logger.info("连续涂布完成") return True def _execute_intermittent_coating(self, height: float, pause_duration: float) -> bool: """执行间歇涂布""" logger.info("执行间歇涂布...") for i, (x, y, _) in enumerate(self.trajectory_points): adjusted_point = (x, y, height) if i % 5 == 0: self.robot.move_to( adjusted_point[0], adjusted_point[1], height + 10, self.coating_speed ) time.sleep(pause_duration) success = self.robot.move_linear( adjusted_point[0], adjusted_point[1], adjusted_point[2], self.coating_speed ) if not success: logger.error(f"涂布在点 {i+1} 失败") return False if i % 10 == 0: logger.info(f"涂布进度: {i+1}/{len(self.trajectory_points)}") time.sleep(pause_duration) self.robot.move_to( self.trajectory_points[-1][0], self.trajectory_points[-1][1], height + 10, self.coating_speed ) logger.info("间歇涂布完成") return True def dynamic_height_adjust(self, height_map: List[Tuple[float, float, float]]): """ 根据高度图动态调整涂布高度 Args: height_map: 高度图数据 [(x, y, height_adjustment), ...] """ for i, (x, y, z) in enumerate(self.trajectory_points): adjustment = 0 for hx, hy, ha in height_map: if abs(x - hx) < 10 and abs(y - hy) < 10: adjustment = ha break self.trajectory_points[i] = (x, y, z + adjustment) logger.info("高度图调整完成") def run_coating_cycle(self, width: float = 100, height_val: float = 100, base_height: float = 100, height_offset: float = 0, speed: int = 30, mode: CoatingMode = CoatingMode.CONTINUOUS) -> bool: """ 运行完整的涂布周期 Args: width: 涂布宽度 (mm) height_val: 涂布高度 (mm) base_height: 基础高度 (mm) height_offset: 高度偏移 (mm) speed: 涂布速度 (1-100) mode: 涂布模式 Returns: 涂布周期是否成功 """ logger.info("=" * 50) logger.info("开始涂布周期") logger.info("=" * 50) if not self.connect_all(): logger.error("连接失败") return False try: self.set_base_height(base_height) self.adjust_height(height_offset) self.set_coating_speed(speed) self.generate_rectangle_trajectory(width, height_val) logger.info(f"基础高度: {self.get_effective_height()} mm") logger.info(f"涂布速度: {speed}") logger.info(f"轨迹点数: {len(self.trajectory_points)}") success = self.execute_coating(mode) if success: logger.info("涂布周期成功完成") else: logger.error("涂布周期执行失败") return success except Exception as e: logger.error(f"涂布周期异常: {e}") return False finally: self.disconnect_all() logger.info("设备已断开连接")