Python 爬虫实战:Scrapy 管道(Pipeline)数据持久化
2026/5/12 6:22:23 网站建设 项目流程

前言

Scrapy 框架中,爬虫解析出的 Item 数据最终需落地存储,而管道(Pipeline)是实现数据持久化的核心组件。相较于直接在爬虫文件中处理数据存储,Pipeline 具备模块化、可扩展、支持多管道协同处理的优势,可灵活实现数据清洗、去重、多源存储(如文件、数据库、消息队列)等需求。本文从 Pipeline 核心原理入手,系统讲解不同场景下的数据持久化方案,结合实战案例实现 JSON/CSV 文件存储、MySQL 数据库存储、MongoDB 文档存储及数据去重与校验,帮助开发者掌握企业级爬虫的数据落地能力。

摘要

本文聚焦 Scrapy 管道(Pipeline)的数据持久化实战,首先剖析 Pipeline 的执行流程与优先级规则,明确其核心作用与开发规范;其次以 知乎热榜 为爬取目标,依次实现基础文件存储(JSON/CSV)、关系型数据库存储(MySQL)、非关系型数据库存储(MongoDB)三类核心持久化方案;最后补充数据去重、数据校验、管道异常处理等进阶功能。通过本文,读者可掌握 Scrapy Pipeline 的全场景开发能力,实现爬虫数据的规范化、高可用持久化存储。

一、Scrapy Pipeline 核心原理

1.1 Pipeline 执行流程

  1. 爬虫解析出 Item 后,将其传递给引擎(Engine);
  2. 引擎将 Item 依次发送至settings.py中配置的 Pipeline(按优先级执行);
  3. 每个 Pipeline 可通过process_item方法处理 Item(如清洗、存储),处理完成后可选择将 Item 传递给下一个 Pipeline 或终止传递;
  4. 所有 Pipeline 执行完成后,Item 数据完成持久化。

1.2 核心方法与优先级规则

方法名触发时机返回值规则
process_item处理每个 Item 时返回 Item:继续传递给下一个 Pipeline;抛出 DropItem 异常:终止传递并丢弃 Item
open_spider爬虫启动时无返回值,常用于初始化资源(如数据库连接、文件句柄)
close_spider爬虫关闭时无返回值,常用于释放资源(如关闭数据库连接、文件句柄)
from_crawler管道初始化时(可选)返回管道实例,常用于从配置文件读取参数(如数据库地址、端口)

Pipeline 优先级通过settings.pyITEM_PIPELINES的数字值控制:数字越小,优先级越高,越先执行。例如:

python

运行

ITEM_PIPELINES = { 'zhihu_hot.pipelines.DataValidatePipeline': 100, # 优先级最高(数据校验) 'zhihu_hot.pipelines.MysqlPipeline': 300, # 次之(MySQL 存储) 'zhihu_hot.pipelines.MongoDBPipeline': 400, # 优先级最低(MongoDB 存储) }

1.3 核心优势

优势点具体说明
模块化开发不同存储方式拆分至不同 Pipeline,便于维护与扩展
多管道协同可先校验数据,再存储至多个数据源(如同时存 MySQL 和 MongoDB)
资源统一管理通过open_spider/close_spider统一管理数据库连接、文件句柄等资源
异常隔离单个 Pipeline 异常不影响其他 Pipeline 执行

二、环境搭建

2.1 基础环境要求

软件 / 库版本要求作用
Python≥3.8基础开发环境
Scrapy≥2.6爬虫框架
pymysql≥1.0.2Python 操作 MySQL 客户端
pymongo≥4.3.3Python 操作 MongoDB 客户端
pandas≥1.5.3生成 CSV 文件(可选)

2.2 环境安装

bash

运行

pip install scrapy==2.6.2 pymysql==1.0.2 pymongo==4.3.3 pandas==1.5.3

2.3 数据库环境准备

(1)MySQL 环境
  • 安装 MySQL 并启动服务;
  • 创建数据库与数据表:

sql

CREATE DATABASE IF NOT EXISTS scrapy_db DEFAULT CHARACTER SET utf8mb4; USE scrapy_db; CREATE TABLE IF NOT EXISTS zhihu_hot ( id INT AUTO_INCREMENT PRIMARY KEY, title VARCHAR(500) NOT NULL COMMENT '热榜标题', hot_score INT COMMENT '热度值', url VARCHAR(500) NOT NULL COMMENT '热榜链接', create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', UNIQUE KEY uk_url (url) COMMENT 'URL 唯一索引,避免重复' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
(2)MongoDB 环境
  • 安装 MongoDB 并启动服务;
  • 无需提前创建集合(MongoDB 自动创建)。

三、Pipeline 数据持久化实战开发

3.1 创建基础爬虫项目

bash

运行

# 创建项目 scrapy startproject zhihu_hot # 进入项目目录 cd zhihu_hot # 创建爬虫文件 scrapy genspider zhihu_hot_spider zhihu.com

3.2 定义 Item 结构(items.py)

python

运行

import scrapy class ZhihuHotItem(scrapy.Item): # 热榜标题 title = scrapy.Field() # 热度值 hot_score = scrapy.Field() # 热榜链接 url = scrapy.Field()

3.3 开发爬虫文件(zhihu_hot_spider.py)

python

运行

import scrapy from zhihu_hot.items import ZhihuHotItem class ZhihuHotSpiderSpider(scrapy.Spider): name = 'zhihu_hot_spider' allowed_domains = ['zhihu.com'] start_urls = ['https://www.zhihu.com/hot'] def parse(self, response): """解析知乎热榜数据""" # 定位热榜列表 hot_list = response.xpath('//div[@class="HotItem-content"]') for hot in hot_list: item = ZhihuHotItem() # 提取标题 item['title'] = hot.xpath('.//h2[@class="HotItem-title"]/a/text()').extract_first() # 提取热度值(处理格式:如 "100万+" 转为 1000000) hot_score_str = hot.xpath('.//div[@class="HotItem-metrics"]/text()').extract_first() or '0' if '万' in hot_score_str: item['hot_score'] = int(float(hot_score_str.replace('万', '').replace('+', '')) * 10000) else: item['hot_score'] = int(hot_score_str.replace('+', '')) if hot_score_str.replace('+', '').isdigit() else 0 # 提取链接(拼接完整 URL) relative_url = hot.xpath('.//h2[@class="HotItem-title"]/a/@href').extract_first() item['url'] = f"https://www.zhihu.com{relative_url}" if relative_url else '' yield item

3.4 实战 1:基础文件存储(JSON/CSV)

3.4.1 JSON 存储 Pipeline(pipelines.py)

python

运行

import json import os from scrapy.exceptions import DropItem class JsonPipeline: """将 Item 存储为 JSON 文件""" def open_spider(self, spider): """爬虫启动时创建 JSON 文件""" # 确保存储目录存在 if not os.path.exists('output'): os.makedirs('output') # 打开文件句柄 self.file = open('output/zhihu_hot.json', 'w', encoding='utf-8') # 写入 JSON 数组开头 self.file.write('[') self.first_item = True def process_item(self, item, spider): """处理每个 Item,写入文件""" # 过滤空标题的 Item if not item.get('title'): raise DropItem(f"丢弃无效 Item:标题为空 {item}") # 转换 Item 为字典 item_dict = dict(item) # 处理第一个 Item 无需添加逗号 if not self.first_item: self.file.write(',') else: self.first_item = False # 序列化并写入(ensure_ascii=False 保留中文) self.file.write(json.dumps(item_dict, ensure_ascii=False, indent=2)) spider.logger.info(f"JSON 存储成功:{item['title']}") return item def close_spider(self, spider): """爬虫关闭时关闭文件""" # 写入 JSON 数组结尾 self.file.write(']') self.file.close() spider.logger.info("JSON 文件存储完成,路径:output/zhihu_hot.json")
3.4.2 CSV 存储 Pipeline(pipelines.py)

python

运行

import pandas as pd import os from scrapy.exceptions import DropItem class CsvPipeline: """将 Item 存储为 CSV 文件""" def open_spider(self, spider): """初始化数据列表""" self.data_list = [] if not os.path.exists('output'): os.makedirs('output') def process_item(self, item, spider): """收集 Item 数据""" if not item.get('url'): raise DropItem(f"丢弃无效 Item:URL 为空 {item}") self.data_list.append(dict(item)) spider.logger.info(f"CSV 数据收集成功:{item['title']}") return item def close_spider(self, spider): """将收集的数据写入 CSV 文件""" if self.data_list: df = pd.DataFrame(self.data_list) # 按热度值降序排列 df = df.sort_values(by='hot_score', ascending=False) df.to_csv('output/zhihu_hot.csv', index=False, encoding='utf-8-sig') spider.logger.info(f"CSV 文件存储完成,共 {len(self.data_list)} 条数据,路径:output/zhihu_hot.csv") else: spider.logger.warning("无有效数据,未生成 CSV 文件")
3.4.3 启用文件存储 Pipeline(settings.py)

python

运行

ITEM_PIPELINES = { 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, }
3.4.4 输出结果与原理

JSON 文件输出示例(output/zhihu_hot.json)

json

[ { "title": "为什么现在的年轻人越来越不愿意结婚了?", "hot_score": 1258000, "url": "https://www.zhihu.com/question/6328XXXX" }, { "title": "2025 年养老金调整方案公布,哪些人受益最多?", "hot_score": 985000, "url": "https://www.zhihu.com/question/6329XXXX" } ]

CSV 文件输出示例(output/zhihu_hot.csv)

titlehot_scoreurl
为什么现在的年轻人越来越不愿意结婚了?1258000https://www.zhihu.com/question/6328XXXX
2025 年养老金调整方案公布,哪些人受益最多?985000https://www.zhihu.com/question/6329XXXX

核心原理

  • open_spider初始化资源(文件句柄、数据列表),避免频繁创建 / 关闭文件;
  • process_item过滤无效 Item(抛出DropItem异常),有效 Item 写入文件或收集到列表;
  • close_spider释放资源(关闭文件句柄),并对 CSV 数据进行排序后写入;
  • ensure_ascii=Falseutf-8-sig确保中文正常显示。

3.5 实战 2:MySQL 数据库存储

3.5.1 MySQL 存储 Pipeline(pipelines.py)

python

运行

import pymysql from scrapy.exceptions import DropItem from twisted.enterprise import adbapi # 异步数据库操作 class MysqlPipeline: """异步将 Item 存储至 MySQL 数据库""" def __init__(self, db_pool): self.db_pool = db_pool @classmethod def from_crawler(cls, crawler): """从配置文件读取数据库参数,创建数据库连接池""" db_params = { 'host': crawler.settings.get('MYSQL_HOST', '127.0.0.1'), 'port': crawler.settings.get('MYSQL_PORT', 3306), 'user': crawler.settings.get('MYSQL_USER', 'root'), 'password': crawler.settings.get('MYSQL_PASSWORD', '123456'), 'db': crawler.settings.get('MYSQL_DB', 'scrapy_db'), 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # 创建异步数据库连接池 db_pool = adbapi.ConnectionPool('pymysql', **db_params) return cls(db_pool) def process_item(self, item, spider): """异步执行插入操作""" # 过滤热度值为 0 的 Item if item.get('hot_score', 0) == 0: raise DropItem(f"丢弃无效 Item:热度值为 0 {item}") # 异步调用插入方法 query = self.db_pool.runInteraction(self.insert_item, item) # 处理插入异常 query.addErrback(self.handle_error, item, spider) return item def insert_item(self, cursor, item): """执行 SQL 插入""" sql = """ INSERT INTO zhihu_hot (title, hot_score, url) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE title = VALUES(title), hot_score = VALUES(hot_score) """ cursor.execute(sql, (item['title'], item['hot_score'], item['url'])) # ON DUPLICATE KEY UPDATE 实现重复 URL 时更新数据 def handle_error(self, failure, item, spider): """处理数据库操作异常""" spider.logger.error(f"MySQL 插入失败:{failure},Item:{item}") def close_spider(self, spider): """关闭数据库连接池""" self.db_pool.close() spider.logger.info("MySQL 连接池已关闭")
3.5.2 配置 MySQL 参数(settings.py)

python

运行

# MySQL 数据库配置 MYSQL_HOST = '127.0.0.1' MYSQL_PORT = 3306 MYSQL_USER = 'root' MYSQL_PASSWORD = '123456' # 替换为你的 MySQL 密码 MYSQL_DB = 'scrapy_db' # 启用 MySQL Pipeline ITEM_PIPELINES = { 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, 'zhihu_hot.pipelines.MysqlPipeline': 300, }
3.5.3 输出结果与原理

MySQL 数据表查询结果

sql

SELECT title, hot_score FROM zhihu_hot LIMIT 2;
titlehot_score
为什么现在的年轻人越来越不愿意结婚了?1258000
2025 年养老金调整方案公布,哪些人受益最多?985000

核心原理

  • from_crawler从配置文件读取参数,解耦配置与代码;
  • adbapi.ConnectionPool创建异步连接池,避免同步操作阻塞爬虫;
  • runInteraction异步执行 SQL 插入,提升爬虫效率;
  • ON DUPLICATE KEY UPDATE利用 URL 唯一索引实现数据去重,重复 URL 时更新标题和热度值;
  • addErrback捕获数据库操作异常,避免单个 Item 插入失败导致爬虫中断。

3.6 实战 3:MongoDB 文档存储

3.6.1 MongoDB 存储 Pipeline(pipelines.py)

python

运行

import pymongo from scrapy.exceptions import DropItem class MongoDBPipeline: """将 Item 存储至 MongoDB 数据库""" def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db self.client = None self.db = None @classmethod def from_crawler(cls, crawler): """从配置文件读取 MongoDB 参数""" return cls( mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://127.0.0.1:27017/'), mongo_db=crawler.settings.get('MONGO_DB', 'scrapy_db') ) def open_spider(self, spider): """创建 MongoDB 连接""" self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] # 创建索引,确保 URL 唯一 self.db.zhihu_hot.create_index('url', unique=True) spider.logger.info("MongoDB 连接成功,索引创建完成") def process_item(self, item, spider): """插入 Item 至 MongoDB""" try: # 插入数据,重复 URL 时更新 self.db.zhihu_hot.update_one( {'url': item['url']}, {'$set': dict(item)}, upsert=True # 不存在则插入,存在则更新 ) spider.logger.info(f"MongoDB 存储成功:{item['title']}") except pymongo.errors.DuplicateKeyError: spider.logger.warning(f"MongoDB 重复数据:{item['url']}") except Exception as e: spider.logger.error(f"MongoDB 插入失败:{e},Item:{item}") raise DropItem(f"MongoDB 插入失败,丢弃 Item:{item}") return item def close_spider(self, spider): """关闭 MongoDB 连接""" self.client.close() spider.logger.info("MongoDB 连接已关闭")
3.6.2 配置 MongoDB 参数(settings.py)

python

运行

# MongoDB 配置 MONGO_URI = 'mongodb://127.0.0.1:27017/' MONGO_DB = 'scrapy_db' # 启用 MongoDB Pipeline ITEM_PIPELINES = { 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, 'zhihu_hot.pipelines.MysqlPipeline': 300, 'zhihu_hot.pipelines.MongoDBPipeline': 400, }
3.6.3 输出结果与原理

MongoDB 集合查询结果

bash

运行

mongo use scrapy_db db.zhihu_hot.find().limit(2)

json

[ { "_id": ObjectId("67658xxxxxx"), "title": "为什么现在的年轻人越来越不愿意结婚了?", "hot_score": 1258000, "url": "https://www.zhihu.com/question/6328XXXX" }, { "_id": ObjectId("67658xxxxxx"), "title": "2025 年养老金调整方案公布,哪些人受益最多?", "hot_score": 985000, "url": "https://www.zhihu.com/question/6329XXXX" } ]

核心原理

  • create_index('url', unique=True)创建 URL 唯一索引,避免重复数据;
  • update_one结合upsert=True实现 “存在则更新,不存在则插入”;
  • 捕获DuplicateKeyError异常,优雅处理重复数据,避免爬虫中断;
  • MongoDB 无需提前定义表结构,适配动态字段扩展(如后续新增字段无需修改表结构)。

3.7 实战 4:数据去重与校验 Pipeline(进阶)

3.7.1 全局去重 Pipeline(pipelines.py)

python

运行

class DuplicateFilterPipeline: """基于 Redis 实现全局数据去重(跨爬虫/跨节点)""" def __init__(self, redis_uri): import redis self.redis_client = redis.Redis.from_url(redis_uri) self.redis_key = 'zhihu_hot:urls' # 存储已爬取 URL 的 Redis Key @classmethod def from_crawler(cls, crawler): return cls(redis_uri=crawler.settings.get('REDIS_URI', 'redis://127.0.0.1:6379/0')) def process_item(self, item, spider): url = item.get('url') if not url: raise DropItem(f"URL 为空,丢弃 Item:{item}") # Redis SET 实现去重 if self.redis_client.sismember(self.redis_key, url): raise DropItem(f"全局重复 URL,丢弃 Item:{url}") else: self.redis_client.sadd(self.redis_key, url) spider.logger.info(f"全局去重校验通过:{url}") return item def close_spider(self, spider): self.redis_client.close()
3.7.2 启用去重 Pipeline(settings.py)

python

运行

# Redis 配置 REDIS_URI = 'redis://127.0.0.1:6379/0' # 调整 Pipeline 优先级(去重优先执行) ITEM_PIPELINES = { 'zhihu_hot.pipelines.DuplicateFilterPipeline': 100, # 最高优先级 'zhihu_hot.pipelines.JsonPipeline': 200, 'zhihu_hot.pipelines.CsvPipeline': 250, 'zhihu_hot.pipelines.MysqlPipeline': 300, 'zhihu_hot.pipelines.MongoDBPipeline': 400, }

核心原理

  • 利用 Redis Set 的sismember方法判断 URL 是否已存在,实现全局去重;
  • 优先级 100 确保去重在校验、存储前执行,减少无效数据处理;
  • 跨节点 / 跨爬虫运行时,Redis 可共享去重数据,避免分布式环境下的重复爬取。

四、Pipeline 性能调优与最佳实践

4.1 性能调优策略

优化点具体方案
数据库操作优化使用异步连接池(adbapi),避免同步阻塞;批量插入(每 100 条批量提交)
文件操作优化减少文件写入次数(先收集数据,爬虫关闭时一次性写入)
去重优化优先使用内存 / Redis 去重,再使用数据库唯一索引
异常处理优化捕获特定异常,避免通用异常导致所有 Item 处理失败

4.2 最佳实践

  1. 单一职责原则:每个 Pipeline 只负责一个功能(如校验、JSON 存储、MySQL 存储);
  2. 配置解耦:通过from_crawler读取配置,避免硬编码数据库地址、密码;
  3. 资源管理:通过open_spider/close_spider统一管理连接 / 文件句柄,避免资源泄露;
  4. 异常日志:详细记录异常信息(如 Item 内容、错误类型),便于问题定位;
  5. 数据校验:在前置 Pipeline 完成数据校验,避免无效数据进入存储环节。

五、常见问题与解决方案

问题现象原因分析解决方案
JSON 文件中文乱码未设置 ensure_ascii=False序列化时添加ensure_ascii=False
CSV 文件中文乱码编码格式错误使用utf-8-sig编码写入 CSV
MySQL 插入速度慢同步插入 / 单条插入使用 adbapi 异步插入,批量提交 SQL
MongoDB 重复插入未创建唯一索引为 URL 字段创建唯一索引,使用 upsert=True
Pipeline 不执行未在 settings.py 中启用 / 优先级配置错误检查ITEM_PIPELINES配置,确保数字优先级正确
数据库连接超时数据库地址错误 / 防火墙拦截验证数据库地址,开放数据库端口(MySQL 3306、MongoDB 27017)

六、总结

本文系统讲解了 Scrapy Pipeline 的数据持久化开发,从核心原理出发,实现了文件存储(JSON/CSV)、关系型数据库存储(MySQL)、非关系型数据库存储(MongoDB)三类核心方案,并补充了全局数据去重、异常处理等进阶功能。Pipeline 作为 Scrapy 数据持久化的核心组件,其模块化、可扩展的特性使其适配各类存储场景,是企业级爬虫开发的必备技能。

在实际开发中,可根据业务需求扩展更多 Pipeline 功能:如数据加密存储、数据同步至 Elasticsearch、数据推送至消息队列(Kafka/RabbitMQ)等。掌握 Pipeline 的开发规范与性能调优策略,可大幅提升爬虫数据存储的稳定性、效率与可维护性。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询