别再手动处理数据了!用Python脚本批量转换YOLOv5的txt预测结果为结构化JSON
2026/5/1 18:02:51 网站建设 项目流程

从零构建YOLOv5预测结果自动化处理流水线:TXT转JSON实战指南

在计算机视觉项目的实际落地过程中,我们常常会遇到这样的场景:经过YOLOv5模型批量检测后,得到了海量的TXT格式预测结果文件,每个文件对应一张图像的检测信息。这些分散的文本文件虽然包含了宝贵的检测数据,但直接使用它们进行后续分析或可视化展示却异常麻烦。本文将带你用Python构建一个完整的自动化处理流水线,将这些"原始矿藏"提炼成结构清晰、易于使用的JSON格式数据。

1. 理解YOLOv5的TXT输出格式与处理需求

YOLOv5使用--save-txt参数生成的TXT文件遵循特定的格式规范。每个检测目标占据一行,包含6个关键数值:

<class_id> <x_center> <y_center> <width> <height> <confidence>
  • class_id:检测到的物体类别索引(整数)
  • x_center, y_center:边界框中心点的归一化坐标(0-1之间的浮点数)
  • width, height:边界框的归一化宽高(0-1之间的浮点数)
  • confidence:检测置信度(0-1之间的浮点数)

这种格式虽然紧凑高效,但在实际工程应用中存在几个明显痛点:

  1. 数据分散:每个图像对应一个TXT文件,难以整体分析
  2. 信息不完整:缺少原始图像尺寸等上下文信息
  3. 可读性差:类别以索引而非名称表示,需要额外映射

我们的处理脚本需要解决这些问题,同时保持处理效率,能够应对数千个文件的大批量转换。

2. 构建基础处理框架:文件遍历与数据读取

首先,我们需要建立一个能够自动发现并处理所有TXT文件的框架。这里使用Python的pathlib模块,它提供了更现代、更安全的文件路径操作方式。

from pathlib import Path import json def process_yolo_txt_to_json(txt_dir, output_json, class_names): txt_dir = Path(txt_dir) results = [] # 遍历目录下所有.txt文件 for txt_file in txt_dir.glob('*.txt'): # 获取对应的图像文件名(假设图像与txt同名,扩展名不同) image_name = txt_file.stem + '.jpg' # 根据实际情况调整扩展名 image_results = { 'image_name': image_name, 'detections': [] } # 读取并处理每个TXT文件 with open(txt_file, 'r') as f: for line in f: # 解析单行数据 parts = line.strip().split() if len(parts) == 6: # 将数据转换为更友好的结构 detection = { 'class_id': int(parts[0]), 'class_name': class_names[int(parts[0])], 'x_center': float(parts[1]), 'y_center': float(parts[2]), 'width': float(parts[3]), 'height': float(parts[4]), 'confidence': float(parts[5]) } image_results['detections'].append(detection) results.append(image_results) # 保存为JSON文件 with open(output_json, 'w') as f: json.dump(results, f, indent=2)

这个基础版本已经能够完成格式转换,但还有几个关键点需要优化:

  1. 错误处理:增加对文件读取异常的处理
  2. 性能优化:对于大量文件,可以考虑并行处理
  3. 坐标转换:添加将归一化坐标转换为绝对像素坐标的功能

3. 增强版处理:绝对坐标转换与元数据整合

在实际应用中,我们经常需要知道检测框在原始图像中的具体像素位置。为此,我们需要获取原始图像的尺寸信息。这里有两种实现方式:

方案一:通过OpenCV动态读取图像获取尺寸

import cv2 def get_image_size(image_path): try: img = cv2.imread(str(image_path)) if img is not None: return img.shape[1], img.shape[0] # (width, height) except Exception as e: print(f"Error reading {image_path}: {e}") return None, None

方案二:假设图像尺寸已知或通过其他方式获取

在JSON输出中添加绝对坐标信息:

def convert_to_absolute(detection, img_width, img_height): return { 'x_min': int((detection['x_center'] - detection['width']/2) * img_width), 'y_min': int((detection['y_center'] - detection['height']/2) * img_height), 'x_max': int((detection['x_center'] + detection['width']/2) * img_width), 'y_max': int((detection['y_center'] + detection['height']/2) * img_width), 'width': int(detection['width'] * img_width), 'height': int(detection['height'] * img_height) }

将这些功能整合到主处理函数中:

def enhanced_process(txt_dir, image_dir, output_json, class_names): txt_dir = Path(txt_dir) image_dir = Path(image_dir) results = [] for txt_file in txt_dir.glob('*.txt'): image_name = txt_file.stem + '.jpg' image_path = image_dir / image_name img_width, img_height = get_image_size(image_path) image_results = { 'image_name': image_name, 'image_size': {'width': img_width, 'height': img_height} if img_width else None, 'detections': [] } with open(txt_file, 'r') as f: for line in f: parts = line.strip().split() if len(parts) == 6: detection = { 'class_id': int(parts[0]), 'class_name': class_names[int(parts[0])], 'confidence': float(parts[5]), 'relative_coords': { 'x_center': float(parts[1]), 'y_center': float(parts[2]), 'width': float(parts[3]), 'height': float(parts[4]) } } if img_width: detection['absolute_coords'] = convert_to_absolute( detection['relative_coords'], img_width, img_height ) image_results['detections'].append(detection) results.append(image_results) with open(output_json, 'w') as f: json.dump(results, f, indent=2)

4. 性能优化与错误处理

当处理数千个文件时,性能成为关键考虑因素。以下是几个优化策略:

并行处理:使用concurrent.futures实现多线程处理

from concurrent.futures import ThreadPoolExecutor def process_single_file(txt_file, image_dir, class_names): # 实现单个文件的处理逻辑 pass def parallel_process(txt_dir, image_dir, output_json, class_names, workers=4): txt_files = list(Path(txt_dir).glob('*.txt')) results = [] with ThreadPoolExecutor(max_workers=workers) as executor: futures = [] for txt_file in txt_files: futures.append( executor.submit( process_single_file, txt_file, image_dir, class_names ) ) for future in futures: try: results.append(future.result()) except Exception as e: print(f"Error processing file: {e}") with open(output_json, 'w') as f: json.dump(results, f, indent=2)

错误处理增强:确保单个文件的处理失败不会中断整个流程

def safe_process_single_file(txt_file, image_dir, class_names): try: # 处理逻辑 return result except FileNotFoundError: print(f"File not found: {txt_file}") return None except Exception as e: print(f"Error processing {txt_file}: {e}") return None

进度反馈:对于长时间运行的任务,添加进度显示

from tqdm import tqdm def process_with_progress(txt_dir, image_dir, output_json, class_names): txt_files = list(Path(txt_dir).glob('*.txt')) results = [] for txt_file in tqdm(txt_files, desc="Processing files"): result = safe_process_single_file(txt_file, image_dir, class_names) if result: results.append(result) with open(output_json, 'w') as f: json.dump(results, f, indent=2)

5. 完整解决方案与使用示例

将上述所有功能整合为一个完整的解决方案类:

import json from pathlib import Path import cv2 from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm class YOLOTxtToJsonConverter: def __init__(self, class_names): self.class_names = class_names @staticmethod def get_image_size(image_path): try: img = cv2.imread(str(image_path)) return img.shape[1], img.shape[0] if img is not None else (None, None) except Exception: return None, None def convert_to_absolute(self, relative_coords, img_width, img_height): return { 'x_min': int((relative_coords['x_center'] - relative_coords['width']/2) * img_width), 'y_min': int((relative_coords['y_center'] - relative_coords['height']/2) * img_height), 'x_max': int((relative_coords['x_center'] + relative_coords['width']/2) * img_width), 'y_max': int((relative_coords['y_center'] + relative_coords['height']/2) * img_height), 'width': int(relative_coords['width'] * img_width), 'height': int(relative_coords['height'] * img_height) } def process_single_file(self, txt_file, image_dir): try: image_name = txt_file.stem + '.jpg' image_path = image_dir / image_name img_width, img_height = self.get_image_size(image_path) result = { 'image_name': image_name, 'image_size': {'width': img_width, 'height': img_height} if img_width else None, 'detections': [] } with open(txt_file, 'r') as f: for line in f: parts = line.strip().split() if len(parts) == 6: detection = { 'class_id': int(parts[0]), 'class_name': self.class_names[int(parts[0])], 'confidence': float(parts[5]), 'relative_coords': { 'x_center': float(parts[1]), 'y_center': float(parts[2]), 'width': float(parts[3]), 'height': float(parts[4]) } } if img_width: detection['absolute_coords'] = self.convert_to_absolute( detection['relative_coords'], img_width, img_height ) result['detections'].append(detection) return result except Exception as e: print(f"Error processing {txt_file}: {e}") return None def convert(self, txt_dir, image_dir, output_json, workers=4): txt_files = list(Path(txt_dir).glob('*.txt')) results = [] with ThreadPoolExecutor(max_workers=workers) as executor: futures = [ executor.submit( self.process_single_file, txt_file, Path(image_dir) ) for txt_file in txt_files ] for future in tqdm(futures, desc="Processing files"): result = future.result() if result: results.append(result) with open(output_json, 'w') as f: json.dump(results, f, indent=2)

使用示例:

# YOLOv5默认的COCO数据集类别名 COCO_CLASSES = [ 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush' ] converter = YOLOTxtToJsonConverter(COCO_CLASSES) converter.convert( txt_dir='path/to/txt_files', image_dir='path/to/images', output_json='output.json', workers=8 )

6. 高级功能扩展

结果过滤与排序

在实际应用中,我们可能只关心高置信度的检测结果或特定类别的物体。可以在处理过程中添加过滤逻辑:

def process_single_file(self, txt_file, image_dir, min_confidence=0.5, target_classes=None): # ...原有代码... for line in f: parts = line.strip().split() if len(parts) == 6: confidence = float(parts[5]) class_id = int(parts[0]) # 置信度过滤 if confidence < min_confidence: continue # 类别过滤 if target_classes and self.class_names[class_id] not in target_classes: continue # ...其余处理逻辑...

结果统计分析

添加对整体检测结果的统计分析功能:

def add_statistics(results): stats = { 'total_images': len(results), 'total_detections': sum(len(img['detections']) for img in results), 'class_distribution': {}, 'confidence_distribution': { '0-0.2': 0, '0.2-0.4': 0, '0.4-0.6': 0, '0.6-0.8': 0, '0.8-1.0': 0 } } for img in results: for det in img['detections']: # 更新类别分布 class_name = det['class_name'] stats['class_distribution'][class_name] = stats['class_distribution'].get(class_name, 0) + 1 # 更新置信度分布 conf = det['confidence'] if conf < 0.2: stats['confidence_distribution']['0-0.2'] += 1 elif conf < 0.4: stats['confidence_distribution']['0.2-0.4'] += 1 elif conf < 0.6: stats['confidence_distribution']['0.4-0.6'] += 1 elif conf < 0.8: stats['confidence_distribution']['0.6-0.8'] += 1 else: stats['confidence_distribution']['0.8-1.0'] += 1 return stats

可视化验证

为了验证转换结果的正确性,可以添加可视化功能:

def visualize_detections(image_path, detections, output_path=None): img = cv2.imread(str(image_path)) if img is None: return for det in detections: if 'absolute_coords' not in det: continue coords = det['absolute_coords'] color = (0, 255, 0) # 绿色 thickness = 2 # 绘制边界框 cv2.rectangle( img, (coords['x_min'], coords['y_min']), (coords['x_max'], coords['y_max']), color, thickness ) # 添加类别和置信度标签 label = f"{det['class_name']}: {det['confidence']:.2f}" cv2.putText( img, label, (coords['x_min'], coords['y_min'] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, thickness ) if output_path: cv2.imwrite(str(output_path), img) else: cv2.imshow('Detections', img) cv2.waitKey(0) cv2.destroyAllWindows()

7. 工程实践中的注意事项

在实际项目中使用这套转换流程时,有几个关键点需要特别注意:

  1. 文件命名一致性:确保TXT文件与图像文件的命名完全一致(仅扩展名不同)
  2. 类别映射准确:提供的class_names必须与训练模型时使用的类别顺序完全一致
  3. 内存管理:处理极大量文件时,考虑分批处理而非一次性加载所有结果
  4. 结果验证:对转换后的JSON数据进行抽样检查,确保坐标转换正确
  5. 性能平衡:根据硬件配置调整并行工作线程数,避免资源耗尽

一个典型的错误排查流程可能如下:

1. 检查少量文件的转换结果 2. 验证坐标转换是否正确(特别是边界情况) 3. 确认类别名称映射无误 4. 检查并行处理是否引入竞态条件 5. 验证最终JSON文件的结构是否符合预期

对于特别大的项目,可以考虑将处理过程分解为多个阶段:

原始TXT → 中间处理(分片) → 合并结果 → 最终JSON

这种架构更容易实现断点续处理和分布式处理,适合超大规模数据集。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询