检测前方障碍物距离,当距离过近时语音报警,避免视障用户碰撞受伤。
2026/4/23 3:10:40 网站建设 项目流程

视障人士防撞预警系统

一、实际应用场景与痛点

应用场景

视障用户李先生在户外使用导盲杖行走。虽然导盲杖能探测地面障碍,但无法检测空中障碍物(如低垂树枝、广告牌、打开的窗户)和前方较远障碍。在一次行走中,他不慎撞到低矮的伸缩门,导致头部受伤。他急需一款能实时探测前方障碍物并语音预警的智能辅助设备。

核心痛点

1. 探测盲区:导盲杖主要探测地面,对空中和前方障碍探测不足

2. 反应延迟:发现障碍时已距离过近,来不及避让

3. 预警不直观:传统振动反馈无法表达障碍物的距离和方位

4. 环境适应性差:不同光照、天气条件下探测准确性下降

5. 操作复杂:多设备配合使用对视障用户不友好

6. 续航问题:长时间户外使用需要持续供电

二、核心逻辑设计

1. 初始化传感器(超声波/雷达/深度摄像头)

2. 实时扫描前方区域,获取距离数据

3. 处理原始数据,滤除噪声干扰

4. 划分安全区域(近、中、远距离警告区)

5. 根据距离计算危险等级

6. 生成多级别语音警报

7. 通过骨传导耳机输出警报

8. 记录行走路径和障碍信息

三、模块化代码实现

主程序文件:obstacle_detection_system.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

视障人士防撞预警系统

实时检测前方障碍物,语音预警避免碰撞

版本:2.0.0

作者:无障碍智能助手

"""

import os

import sys

import time

import threading

import queue

import json

import math

from datetime import datetime

from typing import Dict, List, Tuple, Optional, Any

from dataclasses import dataclass, asdict

from enum import Enum

import warnings

warnings.filterwarnings('ignore')

# 硬件接口

try:

import RPi.GPIO as GPIO

RASPBERRY_PI_AVAILABLE = True

except ImportError:

RASPBERRY_PI_AVAILABLE = False

print("提示: 非树莓派平台,使用模拟传感器")

try:

import numpy as np

NUMPY_AVAILABLE = True

except ImportError:

NUMPY_AVAILABLE = False

print("警告: numpy未安装,部分功能受限")

# 音频处理

try:

import pyaudio

import wave

import simpleaudio as sa

AUDIO_AVAILABLE = True

except ImportError:

AUDIO_AVAILABLE = False

print("警告: 音频模块未安装,语音提示功能受限")

# 语音合成

try:

import pyttsx3

TTS_AVAILABLE = True

except ImportError:

TTS_AVAILABLE = False

print("警告: 语音合成未安装,使用蜂鸣器提示")

class AlertLevel(Enum):

"""警报级别枚举"""

SAFE = 0 # 安全

CAUTION = 1 # 注意

WARNING = 2 # 警告

DANGER = 3 # 危险

CRITICAL = 4 # 紧急

@dataclass

class DetectionZone:

"""探测区域定义"""

name: str

angle_start: float # 起始角度(度)

angle_end: float # 结束角度(度)

distance_min: float # 最小探测距离(米)

distance_max: float # 最大探测距离(米)

alert_thresholds: Dict[AlertLevel, float] # 各级别警报阈值

@dataclass

class Obstacle:

"""障碍物信息"""

timestamp: float

distance: float # 距离(米)

angle: float # 角度(度)

width: float # 宽度(米,估计值)

height: float # 高度(米,估计值)

confidence: float # 置信度

alert_level: AlertLevel

position_x: float # 相对位置X

position_y: float # 相对位置Y

class SensorInterface:

"""传感器接口基类"""

def __init__(self, config: Dict):

"""

初始化传感器

Args:

config: 传感器配置

"""

self.config = config

self.is_connected = False

self.calibration_data = {}

def connect(self) -> bool:

"""连接传感器"""

raise NotImplementedError

def disconnect(self) -> None:

"""断开连接"""

raise NotImplementedError

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取指定角度的距离

Args:

angle: 探测角度(度)

Returns:

距离(米),None表示无效

"""

raise NotImplementedError

def scan_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

raise NotImplementedError

def calibrate(self) -> Dict:

"""校准传感器"""

raise NotImplementedError

class UltrasonicSensor(SensorInterface):

"""超声波传感器实现"""

def __init__(self, config: Dict):

"""

初始化超声波传感器

Args:

config: 配置字典

trigger_pin: 触发引脚

echo_pin: 回声引脚

max_distance: 最大探测距离

temperature: 环境温度(用于声速校准)

"""

super().__init__(config)

self.trigger_pin = config.get('trigger_pin', 23)

self.echo_pin = config.get('echo_pin', 24)

self.max_distance = config.get('max_distance', 4.0) # 米

self.temperature = config.get('temperature', 25.0) # 摄氏度

# 计算声速(米/秒)

self.sound_speed = 331.3 + 0.606 * self.temperature

# 伺服控制(如果支持扫描)

self.servo_pin = config.get('servo_pin')

self.current_angle = 0.0

self.setup_pins()

def setup_pins(self):

"""设置GPIO引脚"""

if RASPBERRY_PI_AVAILABLE:

GPIO.setmode(GPIO.BCM)

GPIO.setup(self.trigger_pin, GPIO.OUT)

GPIO.setup(self.echo_pin, GPIO.IN)

GPIO.output(self.trigger_pin, GPIO.LOW)

if self.servo_pin:

GPIO.setup(self.servo_pin, GPIO.OUT)

self.servo = GPIO.PWM(self.servo_pin, 50) # 50Hz PWM

self.servo.start(0)

def connect(self) -> bool:

"""连接传感器"""

try:

# 测试传感器

distance = self.get_distance()

if distance and 0 < distance <= self.max_distance:

self.is_connected = True

print("超声波传感器连接成功")

return True

except Exception as e:

print(f"超声波传感器连接失败: {e}")

self.is_connected = False

return False

def disconnect(self) -> None:

"""断开连接"""

if RASPBERRY_PI_AVAILABLE:

if hasattr(self, 'servo'):

self.servo.stop()

GPIO.cleanup()

self.is_connected = False

def set_angle(self, angle: float) -> bool:

"""

设置伺服角度

Args:

angle: 目标角度(-90到90度)

Returns:

是否成功

"""

if not self.servo_pin or not RASPBERRY_PI_AVAILABLE:

return False

# 限制角度范围

angle = max(-90, min(90, angle))

# 转换为PWM占空比(假设伺服角度范围:0.5ms-2.5ms对应0-180度)

duty_cycle = 2.5 + (angle + 90) * (10.0 / 180.0) # 映射到2.5-12.5%

self.servo.ChangeDutyCycle(duty_cycle)

time.sleep(0.1) # 给伺服时间转动

self.servo.ChangeDutyCycle(0) # 停止PWM信号防止抖动

self.current_angle = angle

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取指定角度的距离

Args:

angle: 探测角度

Returns:

距离(米),None表示无效

"""

if not self.is_connected and not self.connect():

return None

# 设置角度

if angle != self.current_angle:

self.set_angle(angle)

if RASPBERRY_PI_AVAILABLE:

try:

# 发送触发脉冲

GPIO.output(self.trigger_pin, GPIO.HIGH)

time.sleep(0.00001) # 10微秒

GPIO.output(self.trigger_pin, GPIO.LOW)

# 等待回声引脚变高

start_time = time.time()

while GPIO.input(self.echo_pin) == GPIO.LOW:

if time.time() - start_time > 0.1: # 超时100ms

return None

start_time = time.time()

pulse_start = time.time()

# 等待回声引脚变低

while GPIO.input(self.echo_pin) == GPIO.HIGH:

pulse_end = time.time()

if pulse_end - pulse_start > 0.1: # 超时100ms

return None

# 计算距离

pulse_duration = pulse_end - pulse_start

distance = (pulse_duration * self.sound_speed) / 2

# 有效性检查

if 0.02 <= distance <= self.max_distance: # 2cm到最大距离

return distance

except Exception as e:

print(f"超声波测距错误: {e}")

else:

# 模拟模式,用于测试

import random

time.sleep(0.05) # 模拟测量延迟

# 模拟障碍物:距离随角度变化

base_distance = 2.0

angle_factor = 1.0 + abs(angle) / 90.0 # 角度越大,距离越小

noise = random.uniform(-0.1, 0.1)

distance = base_distance / angle_factor + noise

distance = max(0.1, min(self.max_distance, distance))

return distance

return None

def scan_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

measurements = []

if start_angle > end_angle:

start_angle, end_angle = end_angle, start_angle

current_angle = start_angle

while current_angle <= end_angle:

distance = self.get_distance(current_angle)

if distance:

measurements.append((current_angle, distance))

current_angle += step

time.sleep(0.05) # 防止扫描过快

return measurements

def calibrate(self) -> Dict:

"""校准传感器"""

print("开始超声波传感器校准...")

calibration_results = {

'temperature': self.temperature,

'sound_speed': self.sound_speed,

'measurements': []

}

# 测量已知距离

test_distances = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]

for known_distance in test_distances:

print(f"请将障碍物放置在 {known_distance} 米处,按Enter继续...")

input()

measurements = []

for _ in range(5):

distance = self.get_distance(0)

if distance:

measurements.append(distance)

time.sleep(0.1)

if measurements:

avg_measured = sum(measurements) / len(measurements)

error = avg_measured - known_distance

calibration_results['measurements'].append({

'known': known_distance,

'measured': avg_measured,

'error': error,

'error_percent': (error / known_distance) * 100

})

print(f"已知距离: {known_distance:.2f}m, 测量: {avg_measured:.2f}m, "

f"误差: {error:.3f}m ({error/known_distance*100:.1f}%)")

# 计算校准系数

if calibration_results['measurements']:

total_error = sum(m['error'] for m in calibration_results['measurements'])

avg_error = total_error / len(calibration_results['measurements'])

calibration_results['calibration_offset'] = -avg_error

calibration_results['calibration_factor'] = 1.0

print(f"校准完成,建议偏移量: {-avg_error:.3f}m")

return calibration_results

class RadarSensor(SensorInterface):

"""毫米波雷达传感器(模拟)"""

def __init__(self, config: Dict):

super().__init__(config)

# 雷达特定初始化

self.resolution = config.get('resolution', 0.05) # 距离分辨率

self.fov = config.get('fov', 120) # 视场角

self.connected = False

def connect(self) -> bool:

"""连接雷达"""

# 模拟连接

time.sleep(0.5)

self.connected = True

print("雷达传感器连接成功(模拟)")

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""雷达测距"""

if not self.connected:

return None

# 模拟雷达数据

time.sleep(0.01)

return 1.5 + 0.5 * math.sin(time.time()) # 模拟动态距离

class DepthCameraSensor(SensorInterface):

"""深度摄像头传感器(模拟)"""

def __init__(self, config: Dict):

super().__init__(config)

self.width = config.get('width', 640)

self.height = config.get('height', 480)

self.connected = False

def connect(self) -> bool:

"""连接深度摄像头"""

# 模拟连接

time.sleep(0.5)

self.connected = True

print("深度摄像头连接成功(模拟)")

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""深度摄像头测距"""

if not self.connected:

return None

# 模拟深度数据

time.sleep(0.02)

return 2.0 + 0.3 * math.cos(time.time() * 2) # 模拟动态距离

class SensorManager:

"""传感器管理器(支持多传感器融合)"""

def __init__(self, config: Dict):

"""

初始化传感器管理器

Args:

config: 传感器配置

"""

self.config = config

self.sensors = {}

self.data_history = []

self.history_size = 100

# 初始化传感器

self.init_sensors()

def init_sensors(self):

"""初始化所有传感器"""

sensor_configs = self.config.get('sensors', {})

# 超声波传感器

if 'ultrasonic' in sensor_configs and sensor_configs['ultrasonic'].get('enabled', True):

try:

self.sensors['ultrasonic'] = UltrasonicSensor(

sensor_configs['ultrasonic']

)

print("超声波传感器初始化完成")

except Exception as e:

print(f"超声波传感器初始化失败: {e}")

# 雷达传感器

if 'radar' in sensor_configs and sensor_configs['radar'].get('enabled', False):

try:

self.sensors['radar'] = RadarSensor(sensor_configs['radar'])

print("雷达传感器初始化完成")

except Exception as e:

print(f"雷达传感器初始化失败: {e}")

# 深度摄像头

if 'depth_camera' in sensor_configs and sensor_configs['depth_camera'].get('enabled', False):

try:

self.sensors['depth_camera'] = DepthCameraSensor(sensor_configs['depth_camera'])

print("深度摄像头初始化完成")

except Exception as e:

print(f"深度摄像头初始化失败: {e}")

if not self.sensors:

print("警告: 没有可用的传感器,使用模拟传感器")

self.sensors['simulated'] = UltrasonicSensor({

'trigger_pin': None,

'echo_pin': None,

'max_distance': 4.0

})

def connect_all(self) -> bool:

"""连接所有传感器"""

connected = False

for name, sensor in self.sensors.items():

if sensor.connect():

connected = True

print(f"{name}传感器连接成功")

else:

print(f"{name}传感器连接失败")

return connected

def get_fused_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取融合后的距离数据

Args:

angle: 探测角度

Returns:

融合后的距离

"""

distances = []

weights = []

for name, sensor in self.sensors.items():

if sensor.is_connected:

distance = sensor.get_distance(angle)

if distance:

distances.append(distance)

# 根据传感器类型设置权重

if name == 'ultrasonic':

weight = 0.6

elif name == 'radar':

weight = 0.8

elif name == 'depth_camera':

weight = 1.0

else:

weight = 0.5

weights.append(weight)

if not distances:

return None

# 加权平均

weighted_sum = sum(d * w for d, w in zip(distances, weights))

total_weight = sum(weights)

fused_distance = weighted_sum / total_weight

# 记录到历史数据

self.data_history.append({

'timestamp': time.time(),

'angle': angle,

'distance': fused_distance,

'sensor_readings': distances

})

# 限制历史数据大小

if len(self.data_history) > self.history_size:

self.data_history.pop(0)

return fused_distance

def scan_fused_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描融合扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

measurements = []

if start_angle > end_angle:

start_angle, end_angle = end_angle, start_angle

# 使用主传感器扫描

main_sensor = next(iter(self.sensors.values()))

current_angle = start_angle

while current_angle <= end_angle:

distance = self.get_fused_distance(current_angle)

if distance:

measurements.append((current_angle, distance))

current_angle += step

return measurements

def disconnect_all(self):

"""断开所有传感器"""

for sensor in self.sensors.values():

sensor.disconnect()

class ObstacleDetector:

"""障碍物检测器"""

def __init__(self, config: Dict):

"""

初始化障碍物检测器

Args:

config: 检测器配置

"""

self.config = config

self.zones = self.setup_detection_zones()

self.obstacle_history = []

self.history_size = 50

# 滤波参数

self.filter_window = config.get('filter_window', 5)

self.distance_history = {}

def setup_detection_zones(self) -> List[DetectionZone]:

"""设置探测区域"""

zones_config = self.config.get('zones', [

{

'name': 'center',

'angle_start': -15,

'angle_end': 15,

'distance_min': 0.1,

'distance_max': 4.0,

'alert_thresholds': {

AlertLevel.SAFE: 2.0,

AlertLevel.CAUTION: 1.5,

AlertLevel.WARNING: 1.0,

AlertLevel.DANGER: 0.5,

AlertLevel.CRITICAL: 0.3

}

},

{

'name': 'left',

'angle_start': -60,

'angle_end': -15,

'distance_min': 0.1,

'distance_max': 3.0,

'alert_thresholds': {

AlertLevel.SAFE: 1.5,

AlertLevel.CAUTION: 1.0,

AlertLevel.WARNING: 0.7,

AlertLevel.DANGER: 0.4,

AlertLevel.CRITICAL: 0.25

}

},

{

'name': 'right',

'angle_start': 15,

'angle_end': 60,

'distance_min': 0.1,

'distance_max': 3.0,

'alert_thresholds': {

AlertLevel.SAFE: 1.5,

AlertLevel.CAUTION: 1.0,

AlertLevel.WARNING: 0.7,

AlertLevel.DANGER: 0.4,

AlertLevel.CRITICAL: 0.25

}

}

])

zones = []

for zone_config in zones_config:

thresholds = {

AlertLevel[key.upper()]: value

for key, value in zone_config['alert_thresholds'].items()

}

zone = DetectionZone(

name=zone_config['name'],

angle_start=zone_config['angle_start'],

angle_end=zone_config['angle_end'],

distance_min=zone_config['distance_min'],

distance_max=zone_config['distance_max'],

alert_thresholds=thresholds

)

zones.append(zone)

return zones

def apply_filter(self, angle: float, distance: float) -> float:

"""

应用滤波器去除噪声

Args:

如果你觉得这个工具好用,欢迎关注我!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询