前言
在前两篇文章中,我们完成了基础设施的搭建和 Embedding 模型的封装。现在,系统有了向量数据库,具备了将文本转换为高维向量的能力。接下来要做的,就是把知识真正“喂”给系统。
数据是 RAG 系统的“底座”,后续所有的检索和问答质量都直接依赖于此。本文将完整记录 Milvus Collection 设计、文档分块模块开发、数据灌入脚本编写、AI 爬虫实战,以及知识库从 0 到近千条向量的规模化演进过程。但本文不止于此——我们还将深入实现混合检索、集成 Reranker 精排,并通过量化对比验证优化效果。
一、设计 Milvus Collection Schema
在灌入数据之前,我们需要先设计好 Collection 的结构。Collection 相当于关系型数据库中的“表”,我们需要预先定义好字段类型和向量索引。
1.1 Schema 设计
| 字段名 | 类型 | 说明 | 约束 |
|---|---|---|---|
id | VARCHAR | 文档块的唯一标识 | 主键,最大长度 100 |
text | VARCHAR | 原始文本内容 | 最大长度 65535 |
vector | FLOAT_VECTOR | Embedding 向量 | 维度 384(需与 BGE-small 模型一致) |
source | VARCHAR | 来源文件路径 | 最大长度 500 |
1.2 索引配置
索引类型:
IVF_FLAT(后续可换 HNSW 或 IVF_SQ8 做对比实验)度量类型:
COSINE(因为 Embedding 已归一化,余弦相似度最合适)nlist:128(聚类中心数,平衡精度与速度)
1.3 代码实现
创建app/core/retrieval/milvus_client.py,封装数据库连接与管理逻辑:
Python
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility COLLECTION_NAME = "lite_rag_docs" VECTOR_DIM = 384 # bge-small-en-v1.5 的维度 def connect_milvus(host: str = "localhost", port: str = "19530"): """建立与 Milvus 的连接""" connections.connect(alias="default", host=host, port=port) print(f"✅ 已连接到 Milvus ({host}:{port})") def create_collection() -> Collection: """创建 Collection(如果已存在则先清空重建)""" if utility.has_collection(COLLECTION_NAME): utility.drop_collection(COLLECTION_NAME) print(f"⚠️ 已删除旧 Collection: {COLLECTION_NAME}") fields = [ FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=100), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_DIM), FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=500), ] schema = CollectionSchema(fields, description="LiteRAG 知识库") collection = Collection(COLLECTION_NAME, schema) index_params = { "metric_type": "COSINE", "index_type": "IVF_FLAT", "params": {"nlist": 128} } collection.create_index("vector", index_params) print(f"✅ Collection '{COLLECTION_NAME}' 创建成功") return collection def get_collection() -> Collection: """获取当前可用的 Collection""" if not utility.has_collection(COLLECTION_NAME): raise RuntimeError(f"Collection '{COLLECTION_NAME}' 不存在") return Collection(COLLECTION_NAME) if __name__ == "__main__": connect_milvus() create_collection()运行测试:
bash
python -m app.core.retrieval.milvus_client预期输出:
text
✅ 已连接到 Milvus (localhost:19530) ⚠️ 已删除旧 Collection: lite_rag_docs ✅ Collection 'lite_rag_docs' 创建成功二、文档分块(Chunking)模块开发
在大模型 RAG 系统中,受限于 LLM 的上下文窗口,长文档必须切分成小块(Chunks)才能进行有效的检索。我们需要一个支持多格式、可配置的分块模块。
2.1 安装依赖
bash
pip install pypdf langchain langchain-community langchain-text-splitters2.2 代码实现
创建app/core/splitter.py:
python
import os from typing import List, Dict, Any from langchain_community.document_loaders import PyPDFLoader, TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from loguru import logger class DocumentSplitter: """文档加载与分块服务""" def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap # 使用递归字符分割器,按自然段落和标点进行切分 self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""], length_function=len, ) def load_and_split(self, file_path: str) -> List[Dict[str, Any]]: if not os.path.exists(file_path): logger.error(f"文件不存在: {file_path}") return [] ext = os.path.splitext(file_path)[1].lower() try: if ext == ".pdf": loader = PyPDFLoader(file_path) elif ext in [".md", ".markdown", ".txt", ".log", ".json", ".yaml", ".yml"]: loader = TextLoader(file_path, encoding="utf-8") else: logger.warning(f"不支持的文件类型: {ext}") return [] documents = loader.load() except Exception as e: logger.error(f"加载文件失败 {file_path}: {e}") return [] chunks = self.text_splitter.split_documents(documents) result = [] for i, chunk in enumerate(chunks): result.append({ "text": chunk.page_content, "metadata": { "source": file_path, "chunk_index": i, **chunk.metadata } }) logger.info(f"文件 {file_path} 切分为 {len(result)} 个文本块") return result def load_directory(self, directory: str) -> List[Dict[str, Any]]: all_chunks = [] supported_exts = {".pdf", ".md", ".markdown", ".txt"} for root, _, files in os.walk(directory): for file in files: ext = os.path.splitext(file)[1].lower() if ext in supported_exts: file_path = os.path.join(root, file) chunks = self.load_and_split(file_path) all_chunks.extend(chunks) logger.info(f"目录 {directory} 共生成 {len(all_chunks)} 个文本块") return all_chunks设计决策:我们刻意避开了
UnstructuredMarkdownLoader,因为它依赖庞大的unstructured库,安装时常因系统 C++ 依赖缺失而失败。改用TextLoader统一处理所有文本类文件,更轻量、更稳定。
三、数据灌入脚本
万事俱备,我们需要一个核心脚本将全流程串联起来:遍历目录 → 文档分块 → 向量化编码 → 插入 Milvus。
创建scripts/ingest.py:
python
#!/usr/bin/env python import hashlib import sys from pathlib import Path # 确保能导入 app 模块 sys.path.insert(0, str(Path(__file__).parent.parent)) from loguru import logger from app.core.splitter import DocumentSplitter from app.core.embedding import get_embedding_service from app.core.retrieval.milvus_client import connect_milvus, get_collection, COLLECTION_NAME def generate_doc_id(source: str, chunk_index: int) -> str: """根据文件路径和块索引生成唯一的短 ID""" raw = f"{source}_{chunk_index}" return hashlib.md5(raw.encode()).hexdigest()[:16] def ingest_docs(directory: str = "docs"): # 1. 连接 Milvus connect_milvus() collection = get_collection() logger.info(f"📦 已连接 Collection: {COLLECTION_NAME}") # 2. 加载并分块 splitter = DocumentSplitter(chunk_size=512, chunk_overlap=50) chunks = splitter.load_directory(directory) logger.info(f"📄 共加载 {len(chunks)} 个文本块") if not chunks: logger.warning("没有找到任何文档块,退出") return # 3. 向量化 embedder = get_embedding_service() texts = [chunk["text"] for chunk in chunks] sources = [chunk["metadata"]["source"] for chunk in chunks] chunk_indices = [chunk["metadata"]["chunk_index"] for chunk in chunks] logger.info("🧠 正在调用模型生成向量...") vectors = embedder.encode(texts) # 4. 插入数据库 ids = [generate_doc_id(src, idx) for src, idx in zip(sources, chunk_indices)] data = [ids, texts, vectors.tolist(), sources] logger.info(f"💾 正在插入 {len(ids)} 条数据到 Milvus...") collection.insert(data) collection.flush() logger.success(f"✅ 成功,当前 Collection 共有 {collection.num_entities} 条向量") if __name__ == "__main__": ingest_docs()四、知识库规模化:从玩具数据到生产级语料
仅有几条测试数据的知识库无法验证系统的真实性能。为了让项目具备面试展示价值,我们需要灌入大规模、高质量的 AI 领域专业语料。
4.1 AI 爬虫环境搭建:Crawl4AI + Playwright
传统的requests难以应对现代动态渲染网页。我们选择Crawl4AI——一款专为大模型时代设计的开源爬虫,它能直接解析网页并输出极度干净的 Markdown。
🔧 踩坑与修复:
lxml 依赖冲突:Python 3.13 环境下解析失败。解决办法:升级 pip 并强制使用清华源拉取预编译包。
浏览器内核下载超时:默认从国外 CDN 下载 Chromium 频繁中断。解决办法:降级 Playwright 至 1.48.0 版本,并手动通过
apt补齐系统底层依赖。
4.2 批量抓取 AI 技术文章
编写带重试机制的爬虫脚本,定点抓取涵盖 RAG、Agent、向量数据库等前沿方向的高质量文章。
抓取结果统计:
| 来源平台 | 成功数 | 典型内容 |
|---|---|---|
| arXiv | 5 篇 | ACL'26 RAG 前沿研究、Agent 综述论文 |
| 阿里云开发者社区 | 4 篇 | RAG 企业级实战、AI 应用开发指南 |
| Machine Learning Mastery | 1 篇 | Graph-RAG(图检索)深度教程 |
| 经济观察网 | 1 篇 | 兆瓦级算力系统新闻 |
| 知乎 / 百度 | 0 篇 | 反爬严重,直接放弃 |
经验总结:对于强反爬平台(如知乎、百度开发者社区),即使使用 Playwright 模拟浏览器也难以稳定抓取。实践中应优先选择提供 RSS、API 或反爬较弱的平台(如 arXiv、各大技术社区)。
4.3 规模化向量入库
清理掉抓取失败的空文件后,执行一键入库脚本python scripts/ingest.py。最终剔除掉早期测试文件后,知识库内沉淀了14 篇高质量长文,共计切分生成了718 条高维向量
注意:初始入库时包含测试文件
test.txt,后续已彻底清理,最终知识库规模为718 条向量。
五、混合检索模块:让召回更精准
仅有向量检索是不够的。向量擅长捕捉语义相似性,但对精确术语(如"HNSW"、"IVF_FLAT")的匹配能力较弱。为此,我们实现了混合检索——融合向量检索和 BM25 关键词检索,取长补短。
5.1 核心设计
| 组件 | 实现方式 | 说明 |
|---|---|---|
| 向量检索 | Milvussearch()+ BGE-small | 384 维 COSINE 相似度 |
| BM25 关键词检索 | rank_bm25库,全量文档构建索引 | 中文分词使用jieba |
| 融合算法 | RRF(Reciprocal Rank Fusion) | k=60,排名加权融合 |
| 索引缓存 | pickle序列化到bm25_index.pkl | 首次构建后秒级加载 |
5.2 中文分词适配
BM25 默认的空格分词对中文完全失效。我们引入jieba并实现智能分词:
python
def tokenize(text: str) -> List[str]: if any('\u4e00' <= char <= '\u9fff' for char in text): return list(jieba.cut(text)) # 中文用 jieba else: return text.lower().split() # 英文用空格5.3 踩坑与修复
| 问题 | 解决方案 |
|---|---|
| BM25 对中文无效 | 引入jieba智能分词 |
Milvus 连接未建立导致get_collection()失败 | 在__init__中显式调用connect_milvus() |
向量格式错误(required argument is not a float) | 显式转换为float32并flatten().tolist() |
collection.is_loaded属性不存在 | 直接调用幂等的collection.load() |
六、Reranker 精排:画龙点睛之笔
混合检索已能返回不错的结果,但在高级 RAG 系统中,Reranker(重排序)是提升精度的关键组件。它对召回的候选文档块进行二次精排,筛选出最相关的几条作为 LLM 的上下文。
概念澄清:这里精排的“候选文档”指的是文本块(Chunks),而非原始文章。我们的知识库包含 14 篇文章,但被切分成了718 个文本块。检索时是从这 718 个块中召回 Top-20 个块,Reranker 再从中精选 Top-3。
6.1 模型选型与显存管理
选用BAAI/bge-reranker-base(约 1.1GB 核心权重)。针对 3060 6GB 显存的限制,我们设计了动态加载/释放机制:
python
class RerankerService: def load_model(self): # 仅在需要时加载到 GPU if self.model is None: self.model = AutoModelForSequenceClassification.from_pretrained( model_name, torch_dtype=torch.float16 ).to(self.device) def unload_model(self): # 用完后立即释放显存 del self.model torch.cuda.empty_cache()6.2 模型本地化下载
使用hf download预先下载模型,支持完全离线运行:
bash
export HF_ENDPOINT=https://hf-mirror.com hf download BAAI/bge-reranker-base --local-dir ./app/models/bge-reranker-base6.3 集成到检索流程
修改检索服务的search()方法,添加use_rerank参数:
python
def search(self, query: str, top_k: int = 5, use_rerank: bool = False): # 1. 召回阶段(混合检索返回 Top-20 个文本块) candidates = self.reciprocal_rank_fusion(vector_hits, bm25_hits) # 2. 精排阶段(可选) if use_rerank: reranker = get_reranker_service() passages = [doc["text"] for doc in candidates] reranked = reranker.rerank(query, passages, top_k=top_k) return reranked return candidates[:top_k]七、效果量化对比:Reranker 带来了什么?
我们设计了对比实验,对同一查询分别测试有/无 Reranker 的 Top-1 结果:
| 查询 | 无 Reranker Top-1 | 有 Reranker Top-1 | 关键变化 |
|---|---|---|---|
| 什么是RAG? | article_1709964.md | article_1709964.md | 来源一致,精排优化了片段选择 |
| How does RAG work? | ...3-tiered-graph-rag-system.md | article_1709964.md | ⭐来源改变,Reranker 纠正了 BM25 偏差 |
| RAG系统中如何优化检索召回率? | article_1709964.md | article_1709964.md | 来源一致,精排优化了片段选择 |
核心发现:
Reranker 具备语义纠偏能力:对于英文查询,能将更通用、更适合回答基础概念的文章排到首位。
分数尺度变化(RRF 0.03 → Reranker 5.05)是正常现象,两者算法不同,我们关注的是相对排序的改善。
面试话术参考:“我通过对比实验验证了 Reranker 的有效性。对于英文查询 'How does RAG work?',原始混合检索由于 BM25 权重较高,返回了一篇深度 Graph-RAG 文章,而 Reranker 精准地将其纠正为更通用、更适合回答基础概念的中文 RAG 文章。”
八、工程规范化
一个优秀的项目不仅代码能跑,还需要具备可复现性和清晰的模块划分。
8.1 导出依赖文件
bash
pip freeze > requirements.txt8.2 配置.gitignore
text
venv/ __pycache__/ app/models/ volumes/ .env *.pyc .DS_Store
8.3 项目目录结构
text
LiteRAG/ ├── app/ │ ├── core/ │ │ ├── embedding.py # Embedding 服务 │ │ ├── splitter.py # 文档分块 │ │ ├── reranker.py # Reranker 精排 │ │ └── retrieval/ │ │ ├── milvus_client.py # Milvus 连接与 Collection 管理 │ │ └── hybrid_search.py # 混合检索 + RRF │ └── models/ # 本地模型缓存 ├── scripts/ │ ├── ingest.py # 数据灌入脚本 │ ├── fetch_articles.py # 第一批爬虫 │ └── fetch_articles_batch2.py # 第二批爬虫 ├── docs/ # 原始文档 ├── docker-compose.yml # 6 个服务容器编排 ├── requirements.txt └── .gitignore
九、踩坑记录(完整版)
| 问题现象 / 报错 | 根本原因 | 解决方案 |
|---|---|---|
No module named 'langchain.text_splitter' | 新版 LangChain 将分块器拆分到了独立包中 | 安装并修改导入:from langchain_text_splitters import ... |
FileNotFoundError: Path ... not found | Embedding 模块中模型路径使用了未解析的占位符 | 去缓存目录使用ls查看真实哈希文件夹名,并更新路径 |
UnstructuredMarkdownLoader各种报错 | 底层unstructured库体积大、系统 C++ 依赖复杂 | 放弃该方案,改用更轻量的TextLoader统一处理文本 |
Crawl4AI 安装时lxml冲突 | Python 3.13 环境太新,部分轮子尚未提供预编译包 | 升级 pip,并强制使用清华镜像源安装 |
| Playwright 浏览器内核下载失败/超时 | 默认的国外 CDN 节点在国内访问不稳定 | 降级 Playwright 至 1.48.0,并使用apt手动安装系统底层依赖 |
| BM25 对中文完全失效 | 默认空格分词无法处理中文 | 引入jieba智能分词,自动检测中英文 |
| Milvus 向量格式错误 | 传入的向量包含numpy.float32而非纯 Python 浮点数 | astype(np.float32).flatten().tolist() |
collection.is_loaded不存在 | pymilvus版本差异 | 直接调用幂等的collection.load() |
| Reranker 测试时测试文件仍为 Top-1 | Milvus 中残留旧数据 | 彻底删除 Collection 并重新灌库 |
volumes/目录无法纳入版本控制 | 宿主机的 Docker 数据卷目录归属 root,权限被拒绝 | 将volumes/加入.gitignore |
十、当前成果速览
| 核心指标 | 当前数值 |
|---|---|
| 知识库文档总数 | 14 篇高质量长文(已剔除测试文件) |
| 文本块 (Chunks) 总数 | 718 条(纯净知识库) |
| 向量维度 | 384 (BGE-Small) |
| Milvus 索引结构 | IVF_FLAT + COSINE |
| 检索方式 | 向量检索 + BM25 + RRF 混合融合 |
| 精排组件 | BGE-Reranker-base(本地化,动态显存管理) |
| Reranker 纠偏效果 | 英文查询 Top-1 来源被正确纠正 |
| 工程规范 | requirements.txt、.gitignore、模块化设计 |
| 离线可用性 | 所有模型本地化,无需联网 |
十一、下一步计划
现在,我们拥有了一个包含718 条高质量向量、混合检索与 Reranker 精排双引擎驱动的 RAG 检索系统。数据底座和检索链路已完全打通,下一篇文章我们将进入 RAG 系统的核心交互环节:
LLM 集成:封装大模型 API(如 DeepSeek、Qwen),设计 Prompt 模板
FastAPI 问答接口:实现
POST /chat,接收用户问题,返回流式/非流式答案Redis 缓存加速:为高频查询添加缓存层
端到端测试:用真实 AI 面试题验证系统问答质量
本文是【从0到1搭建企业级RAG系统】系列的第三篇,如果你在数据处理、混合检索或 Reranker 集成过程中遇到任何问题,欢迎在评论区交流!