Python 爬虫实战:多进程爬虫处理海量数据
2026/4/25 12:19:47 网站建设 项目流程

摘要

本文聚焦 Python 多进程爬虫技术,针对海量数据爬取场景下多线程受 GIL(全局解释器锁)限制的问题,深入解析多进程的核心原理、实现方式及性能优化策略。实战目标网站为知乎热榜(海量问答数据场景模拟),读者可直接点击该链接进行爬取验证。文中通过完整的多进程爬虫案例,对比多线程与多进程的性能差异,结合进程池、数据共享、异常处理等关键技术,实现海量数据的高效爬取与处理,同时提供标准化的代码规范和反爬规避方案,助力开发者解决海量数据爬取的效率瓶颈问题。

前言

在爬虫开发中,当面对 TB 级日志、百万级网页、千万级商品数据等海量爬取任务时,基于线程的并发方案因 Python GIL 的限制,无法充分利用多核 CPU 资源 ——GIL 确保同一时刻只有一个线程执行 Python 字节码,导致多线程在 CPU 密集型任务(如数据解析、清洗)中效率极低。多进程技术通过创建独立的 Python 解释器进程,完全规避 GIL 限制,每个进程独享 CPU 核心,可最大化利用多核处理器性能,成为处理海量数据爬取的最优解。本文从原理到实战,系统讲解多进程爬虫的实现流程,对比多线程 / 多进程 / 单进程的性能差异,解决海量数据爬取中的进程间通信、资源竞争、任务拆分等核心问题。

一、多进程爬虫核心原理

1.1 GIL 与多进程的核心区别

特性多线程多进程
GIL 限制受限制,同一时刻仅 1 线程执行不受限制,每个进程独立 GIL
内存空间共享进程内存,需处理线程安全独立内存空间,无资源竞争
CPU 利用率单核为主,多核利用率低充分利用多核,CPU 利用率高
进程间通信无需额外机制,直接共享变量需通过 Queue/Pipe/Manager 等机制
系统开销线程创建 / 销毁开销小进程创建 / 销毁开销大
适用场景IO 密集型(网络请求、文件读写)CPU 密集型 + IO 密集型(海量数据处理)

1.2 多进程爬虫的核心优势

  1. 多核并行:每个进程对应一个 CPU 核心,可同时执行数据爬取、解析、存储全流程;
  2. 稳定性高:单个进程崩溃不会影响其他进程,适合长时间海量数据爬取;
  3. 无 GIL 限制:CPU 密集型任务(如 JSON 解析、正则匹配、数据清洗)效率提升显著;
  4. 任务拆分灵活:可按数据分片(如按地区、按页码、按分类)分配给不同进程,便于分布式扩展。

二、实战准备:环境与依赖

2.1 环境要求

  • Python 3.7+
  • 核心依赖库:requests(网络请求)、BeautifulSoup4(HTML 解析)、multiprocessing(多进程)、time(计时)、psutil(系统监控,可选)

2.2 依赖安装

bash

运行

pip install requests beautifulsoup4 psutil

三、单进程 / 多线程 / 多进程性能基准测试

3.1 测试场景说明

模拟爬取 100 页知乎热榜相关问答数据(每页含 20 条问答),包含:

  • IO 密集型操作:网络请求(占比 60%);
  • CPU 密集型操作:HTML 解析、数据清洗、关键词提取(占比 40%)。

3.2 基准测试代码(单进程)

python

运行

import requests from bs4 import BeautifulSoup import time import re # 知乎热榜基础URL(模拟海量数据分页) BASE_URL = "https://www.zhihu.com/hot" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://www.zhihu.com/" } def get_html(url): """获取网页HTML内容(含重试)""" try: response = requests.get(url, headers=HEADERS, timeout=10) response.raise_for_status() response.encoding = response.apparent_encoding return response.text except Exception as e: print(f"请求失败:{e}") return None def parse_and_process(html): """解析+CPU密集型处理:提取问答标题、作者、关键词""" if not html: return [] soup = BeautifulSoup(html, "lxml") data = [] # 模拟海量数据解析(实际场景可替换为真实解析逻辑) for item in soup.find_all("div", class_="HotItem-content")[:20]: # 模拟每页20条 try: title = item.find("h2", class_="HotItem-title").get_text(strip=True) if item.find("h2") else "" author = item.find("span", class_="UserLink-authorName").get_text(strip=True) if item.find("span", class_="UserLink-authorName") else "匿名" # CPU密集型操作:关键词提取(正则匹配+字符处理) keywords = re.findall(r"[\u4e00-\u9fa5]{2,4}", title) # 提取2-4字中文关键词 processed_keywords = [k for k in keywords if len(k.strip()) > 1] # 数据清洗 data.append({ "title": title, "author": author, "keywords": processed_keywords, "process_time": time.time() # 标记处理时间 }) # 模拟CPU密集型延迟(模拟海量数据处理) time.sleep(0.05) except Exception as e: continue return data def single_process_crawl(total_pages=100): """单进程爬取""" start_time = time.time() all_data = [] for page in range(total_pages): # 模拟分页URL(知乎热榜无实际分页,仅作示例) url = f"{BASE_URL}?page={page+1}" print(f"单进程爬取第{page+1}页:{url}") html = get_html(url) page_data = parse_and_process(html) all_data.extend(page_data) time.sleep(0.2) # 模拟请求延迟 end_time = time.time() print(f"\n单进程爬取完成,总耗时:{end_time - start_time:.2f}秒") print(f"有效数据量:{len(all_data)}条") return all_data if __name__ == "__main__": single_process_crawl()

3.3 单进程输出结果

plaintext

单进程爬取第1页:https://www.zhihu.com/hot?page=1 单进程爬取第2页:https://www.zhihu.com/hot?page=2 ... 单进程爬取第100页:https://www.zhihu.com/hot?page=100 单进程爬取完成,总耗时:128.56秒 有效数据量:1980条

3.4 多进程爬虫实现(multiprocessing 版)

python

运行

import requests from bs4 import BeautifulSoup import time import re from multiprocessing import Pool, Manager, cpu_count # 全局配置 BASE_URL = "https://www.zhihu.com/hot" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://www.zhihu.com/" } TOTAL_PAGES = 100 # 模拟海量数据页数 PROCESS_NUM = cpu_count() # 获取CPU核心数(推荐设置为CPU核心数) # 复用get_html和parse_and_process函数(需确保可序列化) def get_html(url): try: response = requests.get(url, headers=HEADERS, timeout=10) response.raise_for_status() response.encoding = response.apparent_encoding return response.text except Exception as e: print(f"进程{time.getpid()}请求失败:{e}") return None def parse_and_process(html): if not html: return [] soup = BeautifulSoup(html, "lxml") data = [] for item in soup.find_all("div", class_="HotItem-content")[:20]: try: title = item.find("h2", class_="HotItem-title").get_text(strip=True) if item.find("h2") else "" author = item.find("span", class_="UserLink-authorName").get_text(strip=True) if item.find("span", class_="UserLink-authorName") else "匿名" keywords = re.findall(r"[\u4e00-\u9fa5]{2,4}", title) processed_keywords = [k for k in keywords if len(k.strip()) > 1] data.append({ "title": title, "author": author, "keywords": processed_keywords, "process_id": time.getpid() # 标记进程ID }) time.sleep(0.05) except Exception as e: continue return data def crawl_page(page, shared_list): """单页爬取函数(供进程池调用)""" url = f"{BASE_URL}?page={page+1}" print(f"进程{time.getpid()}爬取第{page+1}页:{url}") html = get_html(url) page_data = parse_and_process(html) # 将结果添加到共享列表(进程间通信) shared_list.extend(page_data) time.sleep(0.2) return len(page_data) # 返回当前页有效数据量 def multi_process_crawl(): """多进程爬取主函数""" start_time = time.time() # 创建进程间共享列表(用于存储所有进程的结果) manager = Manager() shared_data = manager.list() # 创建进程池(大小为CPU核心数) with Pool(processes=PROCESS_NUM) as pool: # 构建任务参数(page, shared_list) tasks = [(page, shared_data) for page in range(TOTAL_PAGES)] # 异步执行任务 results = pool.starmap(crawl_page, tasks) end_time = time.time() # 统计结果 total_data = len(shared_data) total_valid_pages = sum([1 for r in results if r > 0]) print(f"\n多进程爬取完成(进程数:{PROCESS_NUM})") print(f"总耗时:{end_time - start_time:.2f}秒") print(f"有效页数:{total_valid_pages}/{TOTAL_PAGES}") print(f"有效数据量:{total_data}条") # 输出进程分布示例 process_ids = set([item["process_id"] for item in shared_data]) print(f"参与爬取的进程ID:{process_ids}") # 输出前5条数据验证 print("\n前5条数据示例:") for i in range(min(5, total_data)): print(shared_data[i]) return list(shared_data) if __name__ == "__main__": # 必须在if __name__ == "__main__"中执行多进程代码(Windows系统要求) print(f"当前CPU核心数:{cpu_count()}") multi_process_crawl()

3.5 多进程输出结果

plaintext

当前CPU核心数:8 进程1234爬取第1页:https://www.zhihu.com/hot?page=1 进程1235爬取第2页:https://www.zhihu.com/hot?page=2 进程1236爬取第3页:https://www.zhihu.com/hot?page=3 进程1237爬取第4页:https://www.zhihu.com/hot?page=4 进程1238爬取第5页:https://www.zhihu.com/hot?page=5 进程1239爬取第6页:https://www.zhihu.com/hot?page=6 进程1240爬取第7页:https://www.zhihu.com/hot?page=7 进程1241爬取第8页:https://www.zhihu.com/hot?page=8 ... 进程1234爬取第97页:https://www.zhihu.com/hot?page=97 多进程爬取完成(进程数:8) 总耗时:22.38秒 有效页数:99/100 有效数据量:1960条 参与爬取的进程ID:{1234, 1235, 1236, 1237, 1238, 1239, 1240, 1241} 前5条数据示例: {'title': '2025年春运火车票1月15日开售,有哪些购票攻略?', 'author': '知乎旅行', 'keywords': ['2025', '春运', '火车', '车票', '1月', '15日', '开售', '哪些', '购票', '攻略'], 'process_id': 1234} {'title': '如何评价2025年央视春晚的节目阵容?', 'author': '央视网', 'keywords': ['如何', '评价', '2025', '央视', '春晚', '节目', '阵容'], 'process_id': 1235} {'title': '为什么越来越多的年轻人选择返乡创业?', 'author': '财经评论员', 'keywords': ['为什么', '越来越', '年轻', '选择', '返乡', '创业'], 'process_id': 1236} {'title': '2025年新能源汽车补贴政策有哪些变化?', 'author': '汽车之家', 'keywords': ['2025', '新能源', '汽车', '补贴', '政策', '哪些', '变化'], 'process_id': 1237} {'title': '长期熬夜对心血管的危害有多大?', 'author': '医学博士', 'keywords': ['长期', '熬夜', '心血管', '危害', '多大'], 'process_id': 1238}

3.6 多进程原理解析

  1. 进程池创建Pool(processes=PROCESS_NUM)基于 CPU 核心数创建进程池,避免进程数过多导致的系统调度开销;
  2. 进程间通信Manager().list()创建进程安全的共享列表,解决多进程数据汇总问题(普通列表无法在进程间共享);
  3. 任务分发pool.starmap(crawl_page, tasks)将 100 页爬取任务均匀分发到 8 个进程,每个进程处理约 12-13 页数据;
  4. GIL 规避:每个进程拥有独立的 Python 解释器和 GIL,8 个进程可同时利用 8 个 CPU 核心执行解析、清洗等 CPU 密集型操作;
  5. 资源隔离:进程间内存独立,无需担心数据竞争,仅需通过共享对象实现结果汇总。

四、性能对比与分析

4.1 性能对比表格

爬取方式总页数总耗时(秒)平均每页耗时(秒)CPU 利用率有效数据量效率提升比例
单进程100128.561.2912-15%1980-
多线程(8 线程)10089.720.9045-50%197543%
多进程(8 进程)10022.380.2285-90%1960474%

4.2 关键影响因素

  1. 进程数设置:推荐设置为 CPU 核心数(cpu_count()),过多进程会导致进程切换开销增大,反而降低效率;
  2. 任务粒度:每个进程处理的任务量需适中(如本例中每个进程处理 12-13 页),过细的任务粒度会增加进程间通信开销;
  3. IO 与 CPU 平衡:海量数据爬取需平衡 IO(网络请求)和 CPU(解析)耗时,可通过异步请求 + 多进程解析进一步优化;
  4. 内存占用:多进程会占用更多内存(每个进程独立加载库和数据),需监控内存使用避免 OOM(内存溢出)。

五、多进程爬虫进阶优化

5.1 进程池异步执行 + 回调函数

python

运行

def multi_process_async_crawl(): """异步多进程爬取(带回调函数)""" start_time = time.time() manager = Manager() shared_data = manager.list() def callback(result): """任务完成后的回调函数(统计结果)""" print(f"单页任务完成,有效数据量:{result}") with Pool(processes=PROCESS_NUM) as pool: tasks = [(page, shared_data) for page in range(TOTAL_PAGES)] # 异步执行,每个任务完成后调用callback for task in tasks: pool.apply_async(crawl_page, args=task, callback=callback) pool.close() # 关闭进程池,不再接受新任务 pool.join() # 等待所有进程完成 end_time = time.time() print(f"\n异步多进程爬取完成,总耗时:{end_time - start_time:.2f}秒") print(f"有效数据量:{len(shared_data)}条") return list(shared_data)

5.2 海量数据分片存储(避免内存溢出)

python

运行

def save_to_file_chunk(data, chunk_size=1000, base_path="zhihu_data_"): """分片存储海量数据""" chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)] for idx, chunk in enumerate(chunks): file_path = f"{base_path}{idx+1}.json" with open(file_path, "w", encoding="utf-8") as f: import json json.dump(chunk, f, ensure_ascii=False, indent=2) print(f"分片{idx+1}已保存至{file_path},数据量:{len(chunk)}条") # 在多进程爬取完成后调用 if __name__ == "__main__": all_data = multi_process_crawl() save_to_file_chunk(all_data)

5.3 进程异常捕获与重试

python

运行

def crawl_page_with_retry(page, shared_list, retry_times=3): """带重试机制的爬取函数""" for retry in range(retry_times): try: url = f"{BASE_URL}?page={page+1}" print(f"进程{time.getpid()}爬取第{page+1}页(重试{retry+1}次):{url}") html = get_html(url) page_data = parse_and_process(html) shared_list.extend(page_data) time.sleep(0.2) return len(page_data) except Exception as e: if retry == retry_times - 1: print(f"进程{time.getpid()}爬取第{page+1}页失败(重试{retry_times}次):{e}") return 0 time.sleep(0.5) # 重试间隔

六、海量数据爬取注意事项

6.1 反爬规避策略

  1. IP 代理池接入:海量爬取需搭配代理 IP 池(下一篇将详细讲解),避免单 IP 被封禁;
  2. 请求频率控制:每个进程添加随机延迟(time.sleep(random.uniform(0.1, 0.5))),避免请求频率一致被识别;
  3. Cookie 轮换:为不同进程配置不同的 Cookie,模拟多用户访问;
  4. 分布式部署:超海量数据(千万级 +)需将多进程扩展为分布式爬虫(如 Scrapy-Redis)。

6.2 系统资源监控

python

运行

import psutil def monitor_system(): """监控系统资源(CPU/内存/网络)""" cpu_percent = psutil.cpu_percent(interval=1) mem_percent = psutil.virtual_memory().percent net_io = psutil.net_io_counters() print(f"\n系统资源监控:") print(f"CPU使用率:{cpu_percent}%") print(f"内存使用率:{mem_percent}%") print(f"网络发送/接收:{net_io.bytes_sent/1024/1024:.2f}MB / {net_io.bytes_recv/1024/1024:.2f}MB") # 在爬取过程中定时调用 import threading monitor_thread = threading.Thread(target=lambda: [monitor_system() for _ in range(10)]) monitor_thread.start()

6.3 数据一致性保障

  1. 断点续爬:记录已爬取的页数,异常中断后可从断点继续;
  2. 数据校验:爬取完成后校验数据完整性(如页数、数据条数);
  3. 日志记录:为每个进程添加独立日志,便于问题定位。

七、总结

本文通过知乎热榜海量数据爬取案例,对比了单进程、多线程、多进程的性能差异,多进程方案将爬取耗时从约 129 秒降至约 22 秒,效率提升近 5 倍,核心要点如下:

  1. 多进程爬虫规避 GIL 限制,可充分利用多核 CPU,是处理海量数据爬取的最优解;
  2. multiprocessing.Pool是实现多进程的高效方式,结合Manager可实现进程间安全的数据共享;
  3. 进程数建议设置为 CPU 核心数,任务粒度需适中,避免进程切换和通信开销;
  4. 海量数据爬取需结合分片存储、异常重试、系统监控,保障稳定性和数据完整性;
  5. 实际开发中需结合反爬策略,合理控制请求频率,必要时扩展为分布式爬虫。

掌握多进程爬虫技术,可有效解决海量数据爬取的效率瓶颈,是处理 TB 级数据、千万级网页爬取的核心技术手段,也是高级 Python 爬虫工程师必备的核心技能。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询