从0到1搭建企业级RAG系统(三):文档向量化、AI爬虫实战与知识库规模化
2026/4/18 8:22:16 网站建设 项目流程

前言

在前两篇文章中,我们完成了基础设施的搭建和 Embedding 模型的封装。现在,系统有了向量数据库,具备了将文本转换为高维向量的能力。接下来要做的,就是把知识真正“喂”给系统。

数据是 RAG 系统的“底座”,后续所有的检索和问答质量都直接依赖于此。本文将完整记录 Milvus Collection 设计、文档分块模块开发、数据灌入脚本编写、AI 爬虫实战,以及知识库从 0 到近千条向量的规模化演进过程。但本文不止于此——我们还将深入实现混合检索、集成 Reranker 精排,并通过量化对比验证优化效果。


一、设计 Milvus Collection Schema

在灌入数据之前,我们需要先设计好 Collection 的结构。Collection 相当于关系型数据库中的“表”,我们需要预先定义好字段类型和向量索引。

1.1 Schema 设计
字段名类型说明约束
idVARCHAR文档块的唯一标识主键,最大长度 100
textVARCHAR原始文本内容最大长度 65535
vectorFLOAT_VECTOREmbedding 向量维度 384(需与 BGE-small 模型一致)
sourceVARCHAR来源文件路径最大长度 500
1.2 索引配置
  • 索引类型IVF_FLAT(后续可换 HNSW 或 IVF_SQ8 做对比实验)

  • 度量类型COSINE(因为 Embedding 已归一化,余弦相似度最合适)

  • nlist:128(聚类中心数,平衡精度与速度)

1.3 代码实现

创建app/core/retrieval/milvus_client.py,封装数据库连接与管理逻辑:

Python

from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility COLLECTION_NAME = "lite_rag_docs" VECTOR_DIM = 384 # bge-small-en-v1.5 的维度 def connect_milvus(host: str = "localhost", port: str = "19530"): """建立与 Milvus 的连接""" connections.connect(alias="default", host=host, port=port) print(f"✅ 已连接到 Milvus ({host}:{port})") def create_collection() -> Collection: """创建 Collection(如果已存在则先清空重建)""" if utility.has_collection(COLLECTION_NAME): utility.drop_collection(COLLECTION_NAME) print(f"⚠️ 已删除旧 Collection: {COLLECTION_NAME}") fields = [ FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=100), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=VECTOR_DIM), FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=500), ] schema = CollectionSchema(fields, description="LiteRAG 知识库") collection = Collection(COLLECTION_NAME, schema) index_params = { "metric_type": "COSINE", "index_type": "IVF_FLAT", "params": {"nlist": 128} } collection.create_index("vector", index_params) print(f"✅ Collection '{COLLECTION_NAME}' 创建成功") return collection def get_collection() -> Collection: """获取当前可用的 Collection""" if not utility.has_collection(COLLECTION_NAME): raise RuntimeError(f"Collection '{COLLECTION_NAME}' 不存在") return Collection(COLLECTION_NAME) if __name__ == "__main__": connect_milvus() create_collection()

运行测试:

bash

python -m app.core.retrieval.milvus_client

预期输出:

text

✅ 已连接到 Milvus (localhost:19530) ⚠️ 已删除旧 Collection: lite_rag_docs ✅ Collection 'lite_rag_docs' 创建成功

二、文档分块(Chunking)模块开发

在大模型 RAG 系统中,受限于 LLM 的上下文窗口,长文档必须切分成小块(Chunks)才能进行有效的检索。我们需要一个支持多格式、可配置的分块模块。

2.1 安装依赖

bash

pip install pypdf langchain langchain-community langchain-text-splitters
2.2 代码实现

创建app/core/splitter.py

python

import os from typing import List, Dict, Any from langchain_community.document_loaders import PyPDFLoader, TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from loguru import logger class DocumentSplitter: """文档加载与分块服务""" def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap # 使用递归字符分割器,按自然段落和标点进行切分 self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""], length_function=len, ) def load_and_split(self, file_path: str) -> List[Dict[str, Any]]: if not os.path.exists(file_path): logger.error(f"文件不存在: {file_path}") return [] ext = os.path.splitext(file_path)[1].lower() try: if ext == ".pdf": loader = PyPDFLoader(file_path) elif ext in [".md", ".markdown", ".txt", ".log", ".json", ".yaml", ".yml"]: loader = TextLoader(file_path, encoding="utf-8") else: logger.warning(f"不支持的文件类型: {ext}") return [] documents = loader.load() except Exception as e: logger.error(f"加载文件失败 {file_path}: {e}") return [] chunks = self.text_splitter.split_documents(documents) result = [] for i, chunk in enumerate(chunks): result.append({ "text": chunk.page_content, "metadata": { "source": file_path, "chunk_index": i, **chunk.metadata } }) logger.info(f"文件 {file_path} 切分为 {len(result)} 个文本块") return result def load_directory(self, directory: str) -> List[Dict[str, Any]]: all_chunks = [] supported_exts = {".pdf", ".md", ".markdown", ".txt"} for root, _, files in os.walk(directory): for file in files: ext = os.path.splitext(file)[1].lower() if ext in supported_exts: file_path = os.path.join(root, file) chunks = self.load_and_split(file_path) all_chunks.extend(chunks) logger.info(f"目录 {directory} 共生成 {len(all_chunks)} 个文本块") return all_chunks

设计决策:我们刻意避开了UnstructuredMarkdownLoader,因为它依赖庞大的unstructured库,安装时常因系统 C++ 依赖缺失而失败。改用TextLoader统一处理所有文本类文件,更轻量、更稳定。


三、数据灌入脚本

万事俱备,我们需要一个核心脚本将全流程串联起来:遍历目录 → 文档分块 → 向量化编码 → 插入 Milvus。

创建scripts/ingest.py

python

#!/usr/bin/env python import hashlib import sys from pathlib import Path # 确保能导入 app 模块 sys.path.insert(0, str(Path(__file__).parent.parent)) from loguru import logger from app.core.splitter import DocumentSplitter from app.core.embedding import get_embedding_service from app.core.retrieval.milvus_client import connect_milvus, get_collection, COLLECTION_NAME def generate_doc_id(source: str, chunk_index: int) -> str: """根据文件路径和块索引生成唯一的短 ID""" raw = f"{source}_{chunk_index}" return hashlib.md5(raw.encode()).hexdigest()[:16] def ingest_docs(directory: str = "docs"): # 1. 连接 Milvus connect_milvus() collection = get_collection() logger.info(f"📦 已连接 Collection: {COLLECTION_NAME}") # 2. 加载并分块 splitter = DocumentSplitter(chunk_size=512, chunk_overlap=50) chunks = splitter.load_directory(directory) logger.info(f"📄 共加载 {len(chunks)} 个文本块") if not chunks: logger.warning("没有找到任何文档块,退出") return # 3. 向量化 embedder = get_embedding_service() texts = [chunk["text"] for chunk in chunks] sources = [chunk["metadata"]["source"] for chunk in chunks] chunk_indices = [chunk["metadata"]["chunk_index"] for chunk in chunks] logger.info("🧠 正在调用模型生成向量...") vectors = embedder.encode(texts) # 4. 插入数据库 ids = [generate_doc_id(src, idx) for src, idx in zip(sources, chunk_indices)] data = [ids, texts, vectors.tolist(), sources] logger.info(f"💾 正在插入 {len(ids)} 条数据到 Milvus...") collection.insert(data) collection.flush() logger.success(f"✅ 成功,当前 Collection 共有 {collection.num_entities} 条向量") if __name__ == "__main__": ingest_docs()

四、知识库规模化:从玩具数据到生产级语料

仅有几条测试数据的知识库无法验证系统的真实性能。为了让项目具备面试展示价值,我们需要灌入大规模、高质量的 AI 领域专业语料。

4.1 AI 爬虫环境搭建:Crawl4AI + Playwright

传统的requests难以应对现代动态渲染网页。我们选择Crawl4AI——一款专为大模型时代设计的开源爬虫,它能直接解析网页并输出极度干净的 Markdown。

🔧 踩坑与修复:

  1. lxml 依赖冲突:Python 3.13 环境下解析失败。解决办法:升级 pip 并强制使用清华源拉取预编译包。

  2. 浏览器内核下载超时:默认从国外 CDN 下载 Chromium 频繁中断。解决办法:降级 Playwright 至 1.48.0 版本,并手动通过apt补齐系统底层依赖。

4.2 批量抓取 AI 技术文章

编写带重试机制的爬虫脚本,定点抓取涵盖 RAG、Agent、向量数据库等前沿方向的高质量文章。

抓取结果统计:

来源平台成功数典型内容
arXiv5 篇ACL'26 RAG 前沿研究、Agent 综述论文
阿里云开发者社区4 篇RAG 企业级实战、AI 应用开发指南
Machine Learning Mastery1 篇Graph-RAG(图检索)深度教程
经济观察网1 篇兆瓦级算力系统新闻
知乎 / 百度0 篇反爬严重,直接放弃

经验总结:对于强反爬平台(如知乎、百度开发者社区),即使使用 Playwright 模拟浏览器也难以稳定抓取。实践中应优先选择提供 RSS、API 或反爬较弱的平台(如 arXiv、各大技术社区)。

4.3 规模化向量入库

清理掉抓取失败的空文件后,执行一键入库脚本python scripts/ingest.py。最终剔除掉早期测试文件后,知识库内沉淀了14 篇高质量长文,共计切分生成了718 条高维向量

注意:初始入库时包含测试文件test.txt,后续已彻底清理,最终知识库规模为718 条向量


五、混合检索模块:让召回更精准

仅有向量检索是不够的。向量擅长捕捉语义相似性,但对精确术语(如"HNSW"、"IVF_FLAT")的匹配能力较弱。为此,我们实现了混合检索——融合向量检索和 BM25 关键词检索,取长补短。

5.1 核心设计
组件实现方式说明
向量检索Milvussearch()+ BGE-small384 维 COSINE 相似度
BM25 关键词检索rank_bm25库,全量文档构建索引中文分词使用jieba
融合算法RRF(Reciprocal Rank Fusion)k=60,排名加权融合
索引缓存pickle序列化到bm25_index.pkl首次构建后秒级加载
5.2 中文分词适配

BM25 默认的空格分词对中文完全失效。我们引入jieba并实现智能分词:

python

def tokenize(text: str) -> List[str]: if any('\u4e00' <= char <= '\u9fff' for char in text): return list(jieba.cut(text)) # 中文用 jieba else: return text.lower().split() # 英文用空格
5.3 踩坑与修复
问题解决方案
BM25 对中文无效引入jieba智能分词
Milvus 连接未建立导致get_collection()失败__init__中显式调用connect_milvus()
向量格式错误(required argument is not a float显式转换为float32flatten().tolist()
collection.is_loaded属性不存在直接调用幂等的collection.load()

六、Reranker 精排:画龙点睛之笔

混合检索已能返回不错的结果,但在高级 RAG 系统中,Reranker(重排序)是提升精度的关键组件。它对召回的候选文档块进行二次精排,筛选出最相关的几条作为 LLM 的上下文。

概念澄清:这里精排的“候选文档”指的是文本块(Chunks),而非原始文章。我们的知识库包含 14 篇文章,但被切分成了718 个文本块。检索时是从这 718 个块中召回 Top-20 个块,Reranker 再从中精选 Top-3。

6.1 模型选型与显存管理

选用BAAI/bge-reranker-base(约 1.1GB 核心权重)。针对 3060 6GB 显存的限制,我们设计了动态加载/释放机制:

python

class RerankerService: def load_model(self): # 仅在需要时加载到 GPU if self.model is None: self.model = AutoModelForSequenceClassification.from_pretrained( model_name, torch_dtype=torch.float16 ).to(self.device) def unload_model(self): # 用完后立即释放显存 del self.model torch.cuda.empty_cache()
6.2 模型本地化下载

使用hf download预先下载模型,支持完全离线运行:

bash

export HF_ENDPOINT=https://hf-mirror.com hf download BAAI/bge-reranker-base --local-dir ./app/models/bge-reranker-base
6.3 集成到检索流程

修改检索服务的search()方法,添加use_rerank参数:

python

def search(self, query: str, top_k: int = 5, use_rerank: bool = False): # 1. 召回阶段(混合检索返回 Top-20 个文本块) candidates = self.reciprocal_rank_fusion(vector_hits, bm25_hits) # 2. 精排阶段(可选) if use_rerank: reranker = get_reranker_service() passages = [doc["text"] for doc in candidates] reranked = reranker.rerank(query, passages, top_k=top_k) return reranked return candidates[:top_k]

七、效果量化对比:Reranker 带来了什么?

我们设计了对比实验,对同一查询分别测试有/无 Reranker 的 Top-1 结果:

查询无 Reranker Top-1有 Reranker Top-1关键变化
什么是RAG?article_1709964.mdarticle_1709964.md来源一致,精排优化了片段选择
How does RAG work?...3-tiered-graph-rag-system.mdarticle_1709964.md来源改变,Reranker 纠正了 BM25 偏差
RAG系统中如何优化检索召回率?article_1709964.mdarticle_1709964.md来源一致,精排优化了片段选择

核心发现

  • Reranker 具备语义纠偏能力:对于英文查询,能将更通用、更适合回答基础概念的文章排到首位。

  • 分数尺度变化(RRF 0.03 → Reranker 5.05)是正常现象,两者算法不同,我们关注的是相对排序的改善

面试话术参考:“我通过对比实验验证了 Reranker 的有效性。对于英文查询 'How does RAG work?',原始混合检索由于 BM25 权重较高,返回了一篇深度 Graph-RAG 文章,而 Reranker 精准地将其纠正为更通用、更适合回答基础概念的中文 RAG 文章。”


八、工程规范化

一个优秀的项目不仅代码能跑,还需要具备可复现性和清晰的模块划分。

8.1 导出依赖文件

bash

pip freeze > requirements.txt
8.2 配置.gitignore

text

venv/ __pycache__/ app/models/ volumes/ .env *.pyc .DS_Store
8.3 项目目录结构

text

LiteRAG/ ├── app/ │ ├── core/ │ │ ├── embedding.py # Embedding 服务 │ │ ├── splitter.py # 文档分块 │ │ ├── reranker.py # Reranker 精排 │ │ └── retrieval/ │ │ ├── milvus_client.py # Milvus 连接与 Collection 管理 │ │ └── hybrid_search.py # 混合检索 + RRF │ └── models/ # 本地模型缓存 ├── scripts/ │ ├── ingest.py # 数据灌入脚本 │ ├── fetch_articles.py # 第一批爬虫 │ └── fetch_articles_batch2.py # 第二批爬虫 ├── docs/ # 原始文档 ├── docker-compose.yml # 6 个服务容器编排 ├── requirements.txt └── .gitignore

九、踩坑记录(完整版)

问题现象 / 报错根本原因解决方案
No module named 'langchain.text_splitter'新版 LangChain 将分块器拆分到了独立包中安装并修改导入:from langchain_text_splitters import ...
FileNotFoundError: Path ... not foundEmbedding 模块中模型路径使用了未解析的占位符去缓存目录使用ls查看真实哈希文件夹名,并更新路径
UnstructuredMarkdownLoader各种报错底层unstructured库体积大、系统 C++ 依赖复杂放弃该方案,改用更轻量的TextLoader统一处理文本
Crawl4AI 安装时lxml冲突Python 3.13 环境太新,部分轮子尚未提供预编译包升级 pip,并强制使用清华镜像源安装
Playwright 浏览器内核下载失败/超时默认的国外 CDN 节点在国内访问不稳定降级 Playwright 至 1.48.0,并使用apt手动安装系统底层依赖
BM25 对中文完全失效默认空格分词无法处理中文引入jieba智能分词,自动检测中英文
Milvus 向量格式错误传入的向量包含numpy.float32而非纯 Python 浮点数astype(np.float32).flatten().tolist()
collection.is_loaded不存在pymilvus版本差异直接调用幂等的collection.load()
Reranker 测试时测试文件仍为 Top-1Milvus 中残留旧数据彻底删除 Collection 并重新灌库
volumes/目录无法纳入版本控制宿主机的 Docker 数据卷目录归属 root,权限被拒绝volumes/加入.gitignore

十、当前成果速览

核心指标当前数值
知识库文档总数14 篇高质量长文(已剔除测试文件)
文本块 (Chunks) 总数718 条(纯净知识库)
向量维度384 (BGE-Small)
Milvus 索引结构IVF_FLAT + COSINE
检索方式向量检索 + BM25 + RRF 混合融合
精排组件BGE-Reranker-base(本地化,动态显存管理)
Reranker 纠偏效果英文查询 Top-1 来源被正确纠正
工程规范requirements.txt.gitignore、模块化设计
离线可用性所有模型本地化,无需联网

十一、下一步计划

现在,我们拥有了一个包含718 条高质量向量、混合检索与 Reranker 精排双引擎驱动的 RAG 检索系统。数据底座和检索链路已完全打通,下一篇文章我们将进入 RAG 系统的核心交互环节:

  1. LLM 集成:封装大模型 API(如 DeepSeek、Qwen),设计 Prompt 模板

  2. FastAPI 问答接口:实现POST /chat,接收用户问题,返回流式/非流式答案

  3. Redis 缓存加速:为高频查询添加缓存层

  4. 端到端测试:用真实 AI 面试题验证系统问答质量


本文是【从0到1搭建企业级RAG系统】系列的第三篇,如果你在数据处理、混合检索或 Reranker 集成过程中遇到任何问题,欢迎在评论区交流!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询