DAMOYOLO-S模型批量推理与结果导出教程:处理海量图像数据
2026/4/14 12:44:10 网站建设 项目流程

DAMOYOLO-S模型批量推理与结果导出教程:处理海量图像数据

你是不是也遇到过这样的烦恼?手头有成百上千张图片需要做目标检测,一张张上传到网页或者调用接口,不仅慢,还容易出错。或者,模型推理完了,结果散落在各处,想做个统计分析还得手动整理半天。

今天,咱们就来彻底解决这个问题。我将手把手带你写一个Python脚本,它能自动扫描你指定文件夹里的所有图片,调用DAMOYOLO-S模型进行批量推理,最后把检测结果——包括每个目标的边界框位置、类别和置信度——整整齐齐地保存到CSV或JSON文件里。整个过程全自动,你只需要泡杯咖啡,回来数据就处理好了。

这个教程特别适合需要处理大量图像数据的研究员、数据分析师或者任何有批量自动化需求的朋友。即使你Python刚入门,跟着步骤走也能轻松搞定。

1. 准备工作:理清思路与搭建环境

在开始写代码之前,我们先花两分钟把整个流程想明白。我们的目标是实现一个“输入-处理-输出”的自动化流水线:

  1. 输入:一个装满图片的文件夹。
  2. 处理:脚本自动读取每张图片,调用DAMOYOLO-S服务进行推理。
  3. 输出:一个结构化的文件(CSV或JSON),清晰记录每张图片、每个目标的详细信息。

想清楚这个,写代码就不会迷路了。

接下来,确保你的电脑环境已经就绪。你需要有Python(建议3.7或以上版本)。我们主要会用到几个非常常用的库:

  • requests:用于向DAMOYOLO-S的API服务发送图片数据。
  • PIL(Pillow) 或opencv-python:用于读取和处理图片。
  • pandas:用于将结果整理并导出为CSV文件(如果你选JSON,用标准库json就行)。

打开你的终端或命令行,用下面这行命令一次性安装它们:

pip install requests pillow pandas opencv-python

通常,DAMOYOLO-S模型会通过一个HTTP API服务来提供。你需要知道这个服务的地址(URL)。比如,它可能运行在你本机的某个端口(如http://localhost:8000),或者在某个远程服务器上。请根据你的实际部署情况,准备好这个API的基础URL,我们稍后会用到。

2. 核心步骤一:如何与DAMOYOLO-S API对话

DAMOYOLO-S模型通常提供一个简单的HTTP接口。我们只需要把图片数据“POST”到这个接口,它就会返回检测结果。我们来写一个函数专门负责这件事。

这个函数要做三件事:1) 读取图片文件;2) 把它发送到API;3) 解析返回的JSON结果。

import requests import json from PIL import Image import io def infer_image(image_path, api_url): """ 单张图片推理函数 :param image_path: 图片文件的路径 :param api_url: DAMOYOLO-S API的完整地址(例如 http://localhost:8000/predict) :return: 包含检测结果的字典,如果失败返回None """ try: # 1. 打开并准备图片数据 with open(image_path, 'rb') as f: image_bytes = f.read() # 2. 构建请求,通常以‘multipart/form-data’形式上传文件 files = {'image': (image_path, image_bytes)} # 3. 发送POST请求到API response = requests.post(api_url, files=files) # 4. 检查请求是否成功 response.raise_for_status() # 如果状态码不是200,会抛出异常 # 5. 解析返回的JSON数据 result = response.json() return result except FileNotFoundError: print(f"错误:找不到图片文件 {image_path}") return None except requests.exceptions.RequestException as e: print(f"请求API时出错 ({image_path}): {e}") return None except json.JSONDecodeError: print(f"解析API返回的JSON时出错 ({image_path})") return None # 使用示例(假设你的服务跑在本机8000端口) api_base_url = "http://localhost:8000/predict" # 请替换为你的实际API地址 test_result = infer_image("./test_image.jpg", api_base_url) if test_result: print("单张图片测试成功,返回数据结构示例:") print(json.dumps(test_result, indent=2, ensure_ascii=False))

运行这个测试,如果成功,你会看到API返回的数据。它的结构通常是一个列表,里面包含多个检测目标,每个目标是一个字典,可能有bbox(边界框),label(类别),score(置信度) 这些字段。请务必根据你实际API返回的数据结构来调整后续的解析代码。

3. 核心步骤二:遍历文件夹,实现批量推理

单张图片搞定后,批量处理就简单了。我们利用Python的osglob模块来遍历文件夹。

import os import glob from concurrent.futures import ThreadPoolExecutor, as_completed import time def batch_inference_folder(folder_path, api_url, output_dir='./results', max_workers=4): """ 批量处理整个文件夹的图片 :param folder_path: 包含图片的文件夹路径 :param api_url: API地址 :param output_dir: 原始结果JSON的保存目录 :param max_workers: 并发线程数,用于加速处理(根据你的机器和API负载调整) :return: 所有成功推理的结果列表 """ # 支持常见的图片格式 image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff'] image_paths = [] # 递归查找所有匹配的图片文件 for ext in image_extensions: # glob.glob支持递归搜索 ‘**/’ image_paths.extend(glob.glob(os.path.join(folder_path, '**', ext), recursive=True)) print(f"在文件夹 '{folder_path}' 中共找到 {len(image_paths)} 张图片。") if not image_paths: print("未找到任何图片文件,请检查路径和格式。") return [] # 创建输出目录 os.makedirs(output_dir, exist_ok=True) all_results = [] successful = 0 failed = 0 # 使用线程池并发处理,显著提升海量图片处理速度 print(f"开始批量推理,使用 {max_workers} 个线程...") start_time = time.time() with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_path = {executor.submit(infer_image, path, api_url): path for path in image_paths} # 处理完成的任务 for future in as_completed(future_to_path): img_path = future_to_path[future] try: result = future.result() if result: # 为每个结果附加图片路径信息 result['image_file'] = os.path.basename(img_path) result['image_path'] = img_path all_results.append(result) successful += 1 # (可选)实时保存每张图片的原始结果到独立JSON文件,防止中途出错丢失 img_name = os.path.splitext(os.path.basename(img_path))[0] result_file = os.path.join(output_dir, f"{img_name}_result.json") with open(result_file, 'w', encoding='utf-8') as f: json.dump(result, f, indent=2, ensure_ascii=False) else: failed += 1 print(f"推理失败: {img_path}") except Exception as e: failed += 1 print(f"处理图片 {img_path} 时发生异常: {e}") end_time = time.time() print(f"\n批量推理完成!") print(f" 成功: {successful} 张") print(f" 失败: {failed} 张") print(f" 总耗时: {end_time - start_time:.2f} 秒") print(f" 平均每张: {(end_time - start_time) / len(image_paths):.2f} 秒") return all_results # 使用示例 your_image_folder = "./your_image_dataset" # 替换为你的图片文件夹路径 raw_results = batch_inference_folder(your_image_folder, api_base_url)

这段代码的核心是ThreadPoolExecutor,它允许我们同时发送多张图片的推理请求,而不是一张接一张地等,这对于处理海量数据来说速度提升是巨大的。max_workers参数需要根据你的API服务器的处理能力和网络状况适当调整,不是越大越好。

4. 核心步骤三:将结果结构化导出为CSV或JSON

拿到原始的推理结果列表后,它们可能嵌套比较深。我们需要将其“扁平化”,转换成一张规整的表格,每一行代表一个检测到的目标。

import pandas as pd def parse_and_export_to_csv(all_results, output_csv_path='./detection_results.csv'): """ 解析批量推理结果,并导出为CSV文件 :param all_results: batch_inference_folder 返回的结果列表 :param output_csv_path: 输出的CSV文件路径 """ rows = [] # 用来存储所有表格行数据 for img_result in all_results: image_file = img_result.get('image_file', 'unknown') image_path = img_result.get('image_path', '') # 注意:这里需要根据你的API实际返回结构来解析,例如 ‘detections’ 或 ‘predictions’ detections = img_result.get('detections', []) # 请将 ‘detections’ 替换为实际的键名 for i, det in enumerate(detections): # 解析每个检测框的信息,字段名请根据实际情况调整 bbox = det.get('bbox', []) # 可能是 [x1, y1, x2, y2] 或 [x, y, w, h] label = det.get('label', 'unknown') score = det.get('score', 0.0) # 构建一行数据 row = { 'image_filename': image_file, 'image_path': image_path, 'detection_id': i, 'label': label, 'confidence': score, 'bbox_x1': bbox[0] if len(bbox) > 0 else None, 'bbox_y1': bbox[1] if len(bbox) > 1 else None, 'bbox_x2': bbox[2] if len(bbox) > 2 else None, 'bbox_y2': bbox[3] if len(bbox) > 3 else None, # 如果bbox是 [x, y, width, height] 格式,可以计算x2,y2 # 'bbox_width': bbox[2], # 'bbox_height': bbox[3], } rows.append(row) # 创建DataFrame并保存为CSV if rows: df = pd.DataFrame(rows) df.to_csv(output_csv_path, index=False, encoding='utf-8-sig') # utf-8-sig支持Excel中文 print(f"结果已成功导出到 CSV 文件: {output_csv_path}") print(f"共导出 {len(df)} 条检测记录。") # 打印前几行预览 print("\n数据预览:") print(df.head()) else: print("没有检测结果可以导出。") # 使用示例:将之前批量推理的结果导出 parse_and_export_to_csv(raw_results, './my_detection_results.csv')

为什么选择CSV?CSV文件可以用Excel、WPS直接打开,也方便导入到数据库(如MySQL、Pandas)进行进一步分析、筛选和统计,比如统计每个类别出现了多少次,计算平均置信度等。

如果你更喜欢JSON格式,因为它能保留更完整的原始结构,也可以轻松导出:

import json def export_to_json(all_results, output_json_path='./detection_results.json'): """ 将批量推理结果导出为JSON文件 :param all_results: 批量推理结果列表 :param output_json_path: 输出的JSON文件路径 """ with open(output_json_path, 'w', encoding='utf-8') as f: json.dump(all_results, f, indent=2, ensure_ascii=False) print(f"结果已成功导出到 JSON 文件: {output_json_path}") # 使用示例 export_to_json(raw_results, './my_detection_results.json')

5. 把它们组装起来:完整的脚本与使用技巧

现在,我们把上面的所有函数整合到一个主程序里,并添加一些让脚本更好用的功能。

#!/usr/bin/env python3 """ DAMOYOLO-S 批量图像推理与结果导出脚本 作者:你的名字 功能:自动遍历文件夹,调用DAMOYOLO-S API进行目标检测,并将结果导出为CSV/JSON。 """ import os import glob import json import time import argparse import pandas as pd import requests from concurrent.futures import ThreadPoolExecutor, as_completed # 这里放入之前定义的 infer_image, batch_inference_folder, parse_and_export_to_csv 函数 # ... (将第2、3、4节的函数代码复制到这里) ... def main(): parser = argparse.ArgumentParser(description='DAMOYOLO-S批量推理与导出工具') parser.add_argument('--input', '-i', required=True, help='输入图片文件夹路径') parser.add_argument('--api', '-a', required=True, help='DAMOYOLO-S API地址 (e.g., http://localhost:8000/predict)') parser.add_argument('--output', '-o', default='./output', help='输出目录 (默认: ./output)') parser.add_argument('--format', '-f', choices=['csv', 'json', 'both'], default='both', help='输出格式 (默认: both)') parser.add_argument('--workers', '-w', type=int, default=4, help='并发线程数 (默认: 4)') args = parser.parse_args() print("="*50) print("DAMOYOLO-S 批量推理与导出工具启动") print("="*50) # 步骤1: 批量推理 print(f"\n[步骤1/3] 开始处理文件夹: {args.input}") all_results = batch_inference_folder( folder_path=args.input, api_url=args.api, output_dir=os.path.join(args.output, 'raw_json'), max_workers=args.workers ) if not all_results: print("未获得有效结果,程序退出。") return # 步骤2: 导出结果 print(f"\n[步骤2/3] 正在导出结构化结果...") os.makedirs(args.output, exist_ok=True) timestamp = time.strftime("%Y%m%d_%H%M%S") base_filename = f"detection_results_{timestamp}" if args.format in ['csv', 'both']: csv_path = os.path.join(args.output, f"{base_filename}.csv") parse_and_export_to_csv(all_results, csv_path) if args.format in ['json', 'both']: json_path = os.path.join(args.output, f"{base_filename}.json") export_to_json(all_results, json_path) print(f"\n[步骤3/3] 所有任务完成!") print(f"结果文件保存在: {os.path.abspath(args.output)}") if __name__ == '__main__': main()

如何使用这个完整脚本?

  1. 将上面的所有代码保存为一个文件,例如batch_infer_damoyolo.py
  2. 打开终端,切换到脚本所在目录。
  3. 使用命令行运行,非常灵活:
# 基本用法:处理图片,导出CSV和JSON python batch_infer_damoyolo.py -i ./my_photos -a http://your-api-server:port/predict # 指定输出目录和仅导出CSV python batch_infer_damoyolo.py -i ./dataset -a http://localhost:8000/predict -o ./my_results -f csv # 增加并发数到8,加速处理 python batch_infer_damoyolo.py -i ./large_dataset -a http://your-api-server:port/predict -w 8

6. 总结与后续建议

走完这一趟,你应该已经拥有了一个属于自己的、可以处理海量图片的DAMOYOLO-S批量推理工具。从遍历文件夹、并发请求API,到解析数据、结构化导出,整个流程已经打通。这个脚本的核心价值在于自动化结构化,把你从重复的机械劳动中解放出来,让数据直接为你所用。

在实际使用中,你可能还会遇到一些情况,比如API返回的字段名和教程里假设的不一样,这时候只需要调整解析函数(parse_and_export_to_csv)中的键名即可。又或者,图片太多导致内存占用大,你可以考虑在batch_inference_folder函数中分批处理,而不是一次性加载所有图片路径。

拿到CSV文件后,数据分析的大门就打开了。你可以用Pandas快速进行筛选(比如只保留置信度高于0.7的目标)、分组统计(统计每个类别出现的频率)、甚至可视化。这套方法不仅适用于DAMOYOLO-S,稍作修改也能套用到其他提供HTTP API的视觉模型上,希望它能成为你工具箱里的一件利器。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询