前言
Scrapy 框架中,爬虫解析出的 Item 数据最终需落地存储,而管道(Pipeline)是实现数据持久化的核心组件。相较于直接在爬虫文件中处理数据存储,Pipeline 具备模块化、可扩展、支持多管道协同处理的优势,可灵活实现数据清洗、去重、多源存储(如文件、数据库、消息队列)等需求。本文从 Pipeline 核心原理入手,系统讲解不同场景下的数据持久化方案,结合实战案例实现 JSON/CSV 文件存储、MySQL 数据库存储、MongoDB 文档存储及数据去重与校验,帮助开发者掌握企业级爬虫的数据落地能力。
摘要
本文聚焦 Scrapy 管道(Pipeline)的数据持久化实战,首先剖析 Pipeline 的执行流程与优先级规则,明确其核心作用与开发规范;其次以 知乎热榜 为爬取目标,依次实现基础文件存储(JSON/CSV)、关系型数据库存储(MySQL)、非关系型数据库存储(MongoDB)三类核心持久化方案;最后补充数据去重、数据校验、管道异常处理等进阶功能。通过本文,读者可掌握 Scrapy Pipeline 的全场景开发能力,实现爬虫数据的规范化、高可用持久化存储。
一、Scrapy Pipeline 核心原理
1.1 Pipeline 执行流程
- 爬虫解析出 Item 后,将其传递给引擎(Engine);
- 引擎将 Item 依次发送至
settings.py中配置的 Pipeline(按优先级执行); - 每个 Pipeline 可通过
process_item方法处理 Item(如清洗、存储),处理完成后可选择将 Item 传递给下一个 Pipeline 或终止传递; - 所有 Pipeline 执行完成后,Item 数据完成持久化。
1.2 核心方法与优先级规则
| 方法名 | 触发时机 | 返回值规则 |
|---|---|---|
| process_item | 处理每个 Item 时 | 返回 Item:继续传递给下一个 Pipeline;抛出 DropItem 异常:终止传递并丢弃 Item |
| open_spider | 爬虫启动时 | 无返回值,常用于初始化资源(如数据库连接、文件句柄) |
| close_spider | 爬虫关闭时 | 无返回值,常用于释放资源(如关闭数据库连接、文件句柄) |
| from_crawler | 管道初始化时(可选) | 返回管道实例,常用于从配置文件读取参数(如数据库地址、端口) |
Pipeline 优先级通过settings.py中ITEM_PIPELINES的数字值控制:数字越小,优先级越高,越先执行。例如:
python
运行
ITEM_PIPELINES = { 'zhihu_hot.pipelines.DataValidatePipeline': 100, # 优先级最高(数据校验) 'zhihu_hot.pipelines.MysqlPipeline': 300, # 次之(MySQL 存储) 'zhihu_hot.pipelines.MongoDBPipeline': 400, # 优先级最低(MongoDB 存储) }1.3 核心优势
| 优势点 | 具体说明 |
|---|---|
| 模块化开发 | 不同存储方式拆分至不同 Pipeline,便于维护与扩展 |
| 多管道协同 | 可先校验数据,再存储至多个数据源(如同时存 MySQL 和 MongoDB) |
| 资源统一管理 | 通过open_spider/close_spider统一管理数据库连接、文件句柄等资源 |
| 异常隔离 | 单个 Pipeline 异常不影响其他 Pipeline 执行 |
二、环境搭建
2.1 基础环境要求
| 软件 / 库 | 版本要求 | 作用 |
|---|---|---|
| Python | ≥3.8 | 基础开发环境 |
| Scrapy | ≥2.6 | 爬虫框架 |
| pymysql | ≥1.0.2 | Python 操作 MySQL 客户端 |
| pymongo | ≥4.3.3 | Python 操作 MongoDB 客户端 |
| pandas | ≥1.5.3 | 生成 CSV 文件(可选) |
2.2 环境安装
bash
运行
pip install scrapy==2.6.2 pymysql==1.0.2 pymongo==4.3.3 pandas==1.5.32.3 数据库环境准备
(1)MySQL 环境
- 安装 MySQL 并启动服务;
- 创建数据库与数据表:
sql
CREATE DATABASE IF NOT EXISTS scrapy_db DEFAULT CHARACTER SET utf8mb4; USE scrapy_db; CREATE TABLE IF NOT EXISTS zhihu_hot ( id INT AUTO_INCREMENT PRIMARY KEY, title VARCHAR(500) NOT NULL COMMENT '热榜标题', hot_score INT COMMENT '热度值', url VARCHAR(500) NOT NULL COMMENT '热榜链接', create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', UNIQUE KEY uk_url (url) COMMENT 'URL 唯一索引,避免重复' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;(2)MongoDB 环境
- 安装 MongoDB 并启动服务;
- 无需提前创建集合(MongoDB 自动创建)。
三、Pipeline 数据持久化实战开发
3.1 创建基础爬虫项目
bash
运行
# 创建项目 scrapy startproject zhihu_hot # 进入项目目录 cd zhihu_hot # 创建爬虫文件 scrapy genspider zhihu_hot_spider zhihu.com3.2 定义 Item 结构(items.py)
python
运行
import scrapy class ZhihuHotItem(scrapy.Item): # 热榜标题 title = scrapy.Field() # 热度值 hot_score = scrapy.Field() # 热榜链接 url = scrapy.Field()3.3 开发爬虫文件(zhihu_hot_spider.py)
python
运行
import scrapy from zhihu_hot.items import ZhihuHotItem class ZhihuHotSpiderSpider(scrapy.Spider): name = 'zhihu_hot_spider' allowed_domains = ['zhihu.com'] start_urls = ['https://www.zhihu.com/hot'] def parse(self, response): """解析知乎热榜数据""" # 定位热榜列表 hot_list = response.xpath('//div[@class="HotItem-content"]') for hot in hot_list: item = ZhihuHotItem() # 提取标题 item['title'] = hot.xpath('.//h2[@class="HotItem-title"]/a/text()').extract_first() # 提取热度值(处理格式:如 "100万+" 转为 1000000) hot_score_str = hot.xpath('.//div[@class="HotItem-metrics"]/text()').extract_first() or '0' if '万' in hot_score_str: item['hot_score'] = int(float(hot_score_str.replace('万', '').replace('+', '')) * 10000) else: item['hot_score'] = int(hot_score_str.replace('+', '')) if hot_score_str.replace('+', '').isdigit() else 0 # 提取链接(拼接完整 URL) relative_url = hot.xpath('.//h2[@class="HotItem-title"]/a/@href').extract_first() item['url'] = f"https://www.zhihu.com{relative_url}" if relative_url else '' yield item3.4 实战 1:基础文件存储(JSON/CSV)
3.4.1 JSON 存储 Pipeline(pipelines.py)
python
运行
import json import os from scrapy.exceptions import DropItem class JsonPipeline: """将 Item 存储为 JSON 文件""" def open_spider(self, spider): """爬虫启动时创建 JSON 文件""" # 确保存储目录存在 if not os.path.exists('output'): os.makedirs('output') # 打开文件句柄 self.file = open('output/zhihu_hot.json', 'w', encoding='utf-8') # 写入 JSON 数组开头 self.file.write('[') self.first_item = True def process_item(self, item, spider): """处理每个 Item,写入文件""" # 过滤空标题的 Item if not item.get('title'): raise DropItem(f"丢弃无效 Item:标题为空 {item}") # 转换 Item 为字典 item_dict = dict(item) # 处理第一个 Item 无需添加逗号 if not self.first_item: self.file.write(',') else: self.first_item = False # 序列化并写入(ensure_ascii=False 保留中文) self.file.write(json.dumps(item_dict, ensure_ascii=False, indent=2)) spider.logger.info(f"JSON 存储成功:{item['title']}") return item def close_spider(self, spider): """爬虫关闭时关闭文件""" # 写入 JSON 数组结尾 self.file.write(']') self.file.close() spider.logger.info("JSON 文件存储完成,路径:output/zhihu_hot.json")3.4.2 CSV 存储 Pipeline(pipelines.py)
python
运行
import pandas as pd import os from scrapy.exceptions import DropItem class CsvPipeline: """将 Item 存储为 CSV 文件""" def open_spider(self, spider): """初始化数据列表""" self.data_list = [] if not os.path.exists('output'): os.makedirs('output') def process_item(self, item, spider): """收集 Item 数据""" if not item.get('url'): raise DropItem(f"丢弃无效 Item:URL 为空 {item}") self.data_list.append(dict(item)) spider.logger.info(f"CSV 数据收集成功:{item['title']}") return item def close_spider(self, spider): """将收集的数据写入 CSV 文件""" if self.data_list: df = pd.DataFrame(self.data_list) # 按热度值降序排列 df = df.sort_values(by='hot_score', ascending=False) df.to_csv('output/zhihu_hot.csv', index=False, encoding='utf-8-sig') spider.logger.info(f"CSV 文件存储完成,共 {len(self.data_list)} 条数据,路径:output/zhihu_hot.csv") else: spider.logger.warning("无有效数据,未生成 CSV 文件")3.4.3 启用文件存储 Pipeline(settings.py)
python
运行
ITEM_PIPELINES = { 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, }3.4.4 输出结果与原理
JSON 文件输出示例(output/zhihu_hot.json):
json
[ { "title": "为什么现在的年轻人越来越不愿意结婚了?", "hot_score": 1258000, "url": "https://www.zhihu.com/question/6328XXXX" }, { "title": "2025 年养老金调整方案公布,哪些人受益最多?", "hot_score": 985000, "url": "https://www.zhihu.com/question/6329XXXX" } ]CSV 文件输出示例(output/zhihu_hot.csv):
| title | hot_score | url |
|---|---|---|
| 为什么现在的年轻人越来越不愿意结婚了? | 1258000 | https://www.zhihu.com/question/6328XXXX |
| 2025 年养老金调整方案公布,哪些人受益最多? | 985000 | https://www.zhihu.com/question/6329XXXX |
核心原理:
open_spider初始化资源(文件句柄、数据列表),避免频繁创建 / 关闭文件;process_item过滤无效 Item(抛出DropItem异常),有效 Item 写入文件或收集到列表;close_spider释放资源(关闭文件句柄),并对 CSV 数据进行排序后写入;ensure_ascii=False和utf-8-sig确保中文正常显示。
3.5 实战 2:MySQL 数据库存储
3.5.1 MySQL 存储 Pipeline(pipelines.py)
python
运行
import pymysql from scrapy.exceptions import DropItem from twisted.enterprise import adbapi # 异步数据库操作 class MysqlPipeline: """异步将 Item 存储至 MySQL 数据库""" def __init__(self, db_pool): self.db_pool = db_pool @classmethod def from_crawler(cls, crawler): """从配置文件读取数据库参数,创建数据库连接池""" db_params = { 'host': crawler.settings.get('MYSQL_HOST', '127.0.0.1'), 'port': crawler.settings.get('MYSQL_PORT', 3306), 'user': crawler.settings.get('MYSQL_USER', 'root'), 'password': crawler.settings.get('MYSQL_PASSWORD', '123456'), 'db': crawler.settings.get('MYSQL_DB', 'scrapy_db'), 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # 创建异步数据库连接池 db_pool = adbapi.ConnectionPool('pymysql', **db_params) return cls(db_pool) def process_item(self, item, spider): """异步执行插入操作""" # 过滤热度值为 0 的 Item if item.get('hot_score', 0) == 0: raise DropItem(f"丢弃无效 Item:热度值为 0 {item}") # 异步调用插入方法 query = self.db_pool.runInteraction(self.insert_item, item) # 处理插入异常 query.addErrback(self.handle_error, item, spider) return item def insert_item(self, cursor, item): """执行 SQL 插入""" sql = """ INSERT INTO zhihu_hot (title, hot_score, url) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE title = VALUES(title), hot_score = VALUES(hot_score) """ cursor.execute(sql, (item['title'], item['hot_score'], item['url'])) # ON DUPLICATE KEY UPDATE 实现重复 URL 时更新数据 def handle_error(self, failure, item, spider): """处理数据库操作异常""" spider.logger.error(f"MySQL 插入失败:{failure},Item:{item}") def close_spider(self, spider): """关闭数据库连接池""" self.db_pool.close() spider.logger.info("MySQL 连接池已关闭")3.5.2 配置 MySQL 参数(settings.py)
python
运行
# MySQL 数据库配置 MYSQL_HOST = '127.0.0.1' MYSQL_PORT = 3306 MYSQL_USER = 'root' MYSQL_PASSWORD = '123456' # 替换为你的 MySQL 密码 MYSQL_DB = 'scrapy_db' # 启用 MySQL Pipeline ITEM_PIPELINES = { 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, 'zhihu_hot.pipelines.MysqlPipeline': 300, }3.5.3 输出结果与原理
MySQL 数据表查询结果:
sql
SELECT title, hot_score FROM zhihu_hot LIMIT 2;| title | hot_score |
|---|---|
| 为什么现在的年轻人越来越不愿意结婚了? | 1258000 |
| 2025 年养老金调整方案公布,哪些人受益最多? | 985000 |
核心原理:
from_crawler从配置文件读取参数,解耦配置与代码;adbapi.ConnectionPool创建异步连接池,避免同步操作阻塞爬虫;runInteraction异步执行 SQL 插入,提升爬虫效率;ON DUPLICATE KEY UPDATE利用 URL 唯一索引实现数据去重,重复 URL 时更新标题和热度值;addErrback捕获数据库操作异常,避免单个 Item 插入失败导致爬虫中断。
3.6 实战 3:MongoDB 文档存储
3.6.1 MongoDB 存储 Pipeline(pipelines.py)
python
运行
import pymongo from scrapy.exceptions import DropItem class MongoDBPipeline: """将 Item 存储至 MongoDB 数据库""" def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db self.client = None self.db = None @classmethod def from_crawler(cls, crawler): """从配置文件读取 MongoDB 参数""" return cls( mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://127.0.0.1:27017/'), mongo_db=crawler.settings.get('MONGO_DB', 'scrapy_db') ) def open_spider(self, spider): """创建 MongoDB 连接""" self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] # 创建索引,确保 URL 唯一 self.db.zhihu_hot.create_index('url', unique=True) spider.logger.info("MongoDB 连接成功,索引创建完成") def process_item(self, item, spider): """插入 Item 至 MongoDB""" try: # 插入数据,重复 URL 时更新 self.db.zhihu_hot.update_one( {'url': item['url']}, {'$set': dict(item)}, upsert=True # 不存在则插入,存在则更新 ) spider.logger.info(f"MongoDB 存储成功:{item['title']}") except pymongo.errors.DuplicateKeyError: spider.logger.warning(f"MongoDB 重复数据:{item['url']}") except Exception as e: spider.logger.error(f"MongoDB 插入失败:{e},Item:{item}") raise DropItem(f"MongoDB 插入失败,丢弃 Item:{item}") return item def close_spider(self, spider): """关闭 MongoDB 连接""" self.client.close() spider.logger.info("MongoDB 连接已关闭")3.6.2 配置 MongoDB 参数(settings.py)
python
运行
# MongoDB 配置 MONGO_URI = 'mongodb://127.0.0.1:27017/' MONGO_DB = 'scrapy_db' # 启用 MongoDB Pipeline ITEM_PIPELINES = { 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, 'zhihu_hot.pipelines.MysqlPipeline': 300, 'zhihu_hot.pipelines.MongoDBPipeline': 400, }3.6.3 输出结果与原理
MongoDB 集合查询结果:
bash
运行
mongo use scrapy_db db.zhihu_hot.find().limit(2)json
[ { "_id": ObjectId("67658xxxxxx"), "title": "为什么现在的年轻人越来越不愿意结婚了?", "hot_score": 1258000, "url": "https://www.zhihu.com/question/6328XXXX" }, { "_id": ObjectId("67658xxxxxx"), "title": "2025 年养老金调整方案公布,哪些人受益最多?", "hot_score": 985000, "url": "https://www.zhihu.com/question/6329XXXX" } ]核心原理:
create_index('url', unique=True)创建 URL 唯一索引,避免重复数据;update_one结合upsert=True实现 “存在则更新,不存在则插入”;- 捕获
DuplicateKeyError异常,优雅处理重复数据,避免爬虫中断; - MongoDB 无需提前定义表结构,适配动态字段扩展(如后续新增字段无需修改表结构)。
3.7 实战 4:数据去重与校验 Pipeline(进阶)
3.7.1 全局去重 Pipeline(pipelines.py)
python
运行
class DuplicateFilterPipeline: """基于 Redis 实现全局数据去重(跨爬虫/跨节点)""" def __init__(self, redis_uri): import redis self.redis_client = redis.Redis.from_url(redis_uri) self.redis_key = 'zhihu_hot:urls' # 存储已爬取 URL 的 Redis Key @classmethod def from_crawler(cls, crawler): return cls(redis_uri=crawler.settings.get('REDIS_URI', 'redis://127.0.0.1:6379/0')) def process_item(self, item, spider): url = item.get('url') if not url: raise DropItem(f"URL 为空,丢弃 Item:{item}") # Redis SET 实现去重 if self.redis_client.sismember(self.redis_key, url): raise DropItem(f"全局重复 URL,丢弃 Item:{url}") else: self.redis_client.sadd(self.redis_key, url) spider.logger.info(f"全局去重校验通过:{url}") return item def close_spider(self, spider): self.redis_client.close()3.7.2 启用去重 Pipeline(settings.py)
python
运行
# Redis 配置 REDIS_URI = 'redis://127.0.0.1:6379/0' # 调整 Pipeline 优先级(去重优先执行) ITEM_PIPELINES = { 'zhihu_hot.pipelines.DuplicateFilterPipeline': 100, # 最高优先级 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, 'zhihu_hot.pipelines.MysqlPipeline': 300, 'zhihu_hot.pipelines.MongoDBPipeline': 400, }核心原理:
- 利用 Redis Set 的
sismember方法判断 URL 是否已存在,实现全局去重; - 优先级 100 确保去重在校验、存储前执行,减少无效数据处理;
- 跨节点 / 跨爬虫运行时,Redis 可共享去重数据,避免分布式环境下的重复爬取。
四、Pipeline 性能调优与最佳实践
4.1 性能调优策略
| 优化点 | 具体方案 |
|---|---|
| 数据库操作优化 | 使用异步连接池(adbapi),避免同步阻塞;批量插入(每 100 条批量提交) |
| 文件操作优化 | 减少文件写入次数(先收集数据,爬虫关闭时一次性写入) |
| 去重优化 | 优先使用内存 / Redis 去重,再使用数据库唯一索引 |
| 异常处理优化 | 捕获特定异常,避免通用异常导致所有 Item 处理失败 |
4.2 最佳实践
- 单一职责原则:每个 Pipeline 只负责一个功能(如校验、JSON 存储、MySQL 存储);
- 配置解耦:通过
from_crawler读取配置,避免硬编码数据库地址、密码; - 资源管理:通过
open_spider/close_spider统一管理连接 / 文件句柄,避免资源泄露; - 异常日志:详细记录异常信息(如 Item 内容、错误类型),便于问题定位;
- 数据校验:在前置 Pipeline 完成数据校验,避免无效数据进入存储环节。
五、常见问题与解决方案
| 问题现象 | 原因分析 | 解决方案 |
|---|---|---|
| JSON 文件中文乱码 | 未设置 ensure_ascii=False | 序列化时添加ensure_ascii=False |
| CSV 文件中文乱码 | 编码格式错误 | 使用utf-8-sig编码写入 CSV |
| MySQL 插入速度慢 | 同步插入 / 单条插入 | 使用 adbapi 异步插入,批量提交 SQL |
| MongoDB 重复插入 | 未创建唯一索引 | 为 URL 字段创建唯一索引,使用 upsert=True |
| Pipeline 不执行 | 未在 settings.py 中启用 / 优先级配置错误 | 检查ITEM_PIPELINES配置,确保数字优先级正确 |
| 数据库连接超时 | 数据库地址错误 / 防火墙拦截 | 验证数据库地址,开放数据库端口(MySQL 3306、MongoDB 27017) |
六、总结
本文系统讲解了 Scrapy Pipeline 的数据持久化开发,从核心原理出发,实现了文件存储(JSON/CSV)、关系型数据库存储(MySQL)、非关系型数据库存储(MongoDB)三类核心方案,并补充了全局数据去重、异常处理等进阶功能。Pipeline 作为 Scrapy 数据持久化的核心组件,其模块化、可扩展的特性使其适配各类存储场景,是企业级爬虫开发的必备技能。
在实际开发中,可根据业务需求扩展更多 Pipeline 功能:如数据加密存储、数据同步至 Elasticsearch、数据推送至消息队列(Kafka/RabbitMQ)等。掌握 Pipeline 的开发规范与性能调优策略,可大幅提升爬虫数据存储的稳定性、效率与可维护性。