小李今天差点把电脑砸了。
他写了一个爬虫,要从一万个网站上抓数据。代码很简单:请求网址、解析内容、存进数据库。跑了十分钟,才抓了三百个。他打开任务管理器一看,CPU占用率才5%,网络流量几乎为零。
“我这电脑是i9啊,怎么就这水平?”
问题出在哪?他的代码老老实实一个一个等:请求发出去,等服务器响应,等数据传回来,然后再发下一个。每个请求耗时0.5秒,一万个就是5000秒,一个多小时。但CPU大部分时间都在闲着,因为它在等网络。
这就是典型的I/O密集型任务。代码在等,但CPU不干活。
小李心想:能不能让它不等?发出去十个请求,谁先回来就处理谁?
当然能。这就是“异步”。
但问题是,他的代码是同步写的。改写成异步?几十个函数都得动,一堆库要换,想想就头大。
于是他想知道:有没有办法,让同步代码“假装”在异步执行?
先搞清楚:同步和异步到底差在哪
举个例子。
你点了三份外卖。同步的做法是:站在第一家店门口等,拿到第一份,再去第二家等,拿到第二份,再去第三家等。第二家店如果忙,你就干等着。全程啥也干不了。
异步的做法是:三家店都下单,然后回家坐着。谁做好了给你打电话,你去拿。中间你可以看电视、打游戏、甚至再点一份。
在代码里,“等”通常就是I/O操作——读文件、发HTTP请求、查数据库、等用户输入。这些操作的特点是:慢,但不太占CPU。
同步代码遇到I/O就卡住。异步代码遇到I/O就去干别的,等I/O完成了再回来继续。
那么问题来了:同步代码怎么异步执行?
Python里有个很直接的办法:扔进线程池。
说白了就是开几个“小弟”,每个小弟跑一个同步任务。主程序不用等,继续干自己的。
看个例子:
import time import requests from concurrent.futures import ThreadPoolExecutor # 这是一个同步函数,请求一个网址 def fetch(url): print(f"开始抓取 {url}") response = requests.get(url) # 这里会等 print(f"抓取完成 {url}") return response.status_code # 十个网址 urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 11)] # 同步执行:一个一个等 start = time.time() for url in urls: fetch(url) print(f"同步耗时: {time.time() - start:.2f}秒") # 异步执行:用线程池 start = time.time() with ThreadPoolExecutor(max_workers=5) as executor: results = executor.map(fetch, urls) print(f"线程池耗时: {time.time() - start:.2f}秒")跑一下你会发现:同步版本大概20秒(每个请求等2秒),线程池版本只要4秒左右。
神奇吗?不神奇。就是开了5个线程,每个线程处理两个请求,同时等。
但这里有个坑:线程池适合I/O任务,不适合CPU密集任务。你如果开10个线程做计算(比如循环一亿次),反而会因为线程切换开销变慢。
有没有更轻量的办法?有,asyncio
线程池虽然好用,但每个线程要占内存(大概8MB),开多了扛不住。而且线程切换有开销。
Python 3.4之后引入了asyncio,它是真正的异步,不靠线程,靠一个叫“事件循环”的东西。
但问题是,asyncio要求你的函数必须是异步的——也就是说,你要把requests换成aiohttp,把time.sleep换成asyncio.sleep,代码几乎要重写。
那有没有办法让同步代码跑在asyncio里?
有。asyncio.to_thread。
import asyncio import requests def sync_fetch(url): # 这是一个同步函数,没法直接await return requests.get(url).status_code async def main(): urls = [...] # 十个网址 # 把同步函数扔到线程池里跑,但用异步的方式等待 tasks = [asyncio.to_thread(sync_fetch, url) for url in urls] results = await asyncio.gather(*tasks) print(results) asyncio.run(main())这个方法的本质还是线程池,但写法更优雅,可以和真正的异步代码混用。
再深一层:事件循环是怎么骗过你的
如果你想知道“异步到底是怎么做到的”,我们得聊聊事件循环。
事件循环就像一个调度中心。它手里维护一个任务列表。每个任务要么在运行,要么在等某个事情(比如网络数据)。当一个任务说“我在等”,事件循环就把它挂起,去执行下一个任务。
等那个网络数据到了,事件循环再把任务唤醒,从刚才停下的地方继续。
听起来复杂,但Python的asyncio已经帮你封装好了。你只需要把函数写成async def,里面用await表示“这里要等”。
但问题是,我们手头有大量同步代码,不可能全改写成async def。
有没有一个黑科技,能把同步函数直接变成异步的?
有,但不太完美。asyncio提供了一个loop.run_in_executor,本质上还是线程池。真正的“把同步代码变成纯异步”是不可能的,因为同步代码里如果有time.sleep(10),那就是实打实地阻塞线程,谁也救不了你。
实战:给一个同步爬虫提速
假设你写了一个爬虫,大概是这样的:
def crawl_one(url): # 发请求 r = requests.get(url) # 解析 soup = BeautifulSoup(r.text, 'html.parser') # 提取数据 title = soup.find('title').text # 存数据库 db.insert({'url': url, 'title': title}) return title def crawl_all(urls): results = [] for url in urls: results.append(crawl_one(url)) return results要提速,最简单的改动:
from concurrent.futures import ThreadPoolExecutor, as_completed def crawl_all_parallel(urls, workers=10): results = [] with ThreadPoolExecutor(max_workers=workers) as executor: # 提交所有任务 future_to_url = {executor.submit(crawl_one, url): url for url in urls} # 谁先完成就处理谁 for future in as_completed(future_to_url): url = future_to_url[future] try: result = future.result() results.append(result) print(f"完成: {url}") except Exception as e: print(f"失败: {url}, 错误: {e}") return results就这么几行改动,速度提升接近workers倍(受限于网络带宽和对方服务器的承受能力)。
但要注意:如果你的crawl_one里用了数据库连接,得确保数据库连接是线程安全的。很多数据库驱动不是,这时候你可能需要每个线程单独创建连接。
真正的异步:怎么把同步库改成异步?
有时候你不得不面对一个现实:你想用的库只有同步版本,比如requests、pymysql、redis-py(老版本)。
三个办法:
方法一:线程池包装
async def async_get(url): loop = asyncio.get_running_loop() return await loop.run_in_executor(None, requests.get, url)这个None表示使用默认的线程池。简单,但每个调用都会占用一个线程。
方法二:找异步替代品
requests→aiohttp或httpx(支持异步)pymysql→aiomysqlredis-py→aredis或redis.asyncio(新版自带)open(文件读写) →aiofiles
改代码是麻烦,但一旦改完,性能提升显著,而且不占线程。
方法三:用anyio或trio
这两个库提供了更高级的抽象,可以让同步代码在异步环境中运行得更自然。但学习曲线比较陡,不推荐新手尝试。
一个容易踩的坑:假异步
很多人写异步代码,写着写着就变成这样了:
async def fetch(url): response = requests.get(url) # 同步操作! return response.text async def main(): tasks = [fetch(url) for url in urls] await asyncio.gather(*tasks)你猜怎么着?完全没有提速。
因为requests.get是同步阻塞的。当你await一个任务时,这个任务如果内部阻塞了,整个事件循环都会被卡住。
记住一句话:异步的传染性。一旦你用了async,从调用链的根到叶子,所有涉及I/O的地方都必须是异步的。中间混了一个同步阻塞调用,整个异步就废了。
检查方法很简单:在代码里搜requests、time.sleep、open这些同步操作,看它们是否出现在async函数里。
有没有更激进的方案?有,但不太推荐
方案一:gevent
gevent是一个第三方库,它通过“打补丁”的方式,把Python标准库里的同步I/O操作(比如socket、time.sleep)偷偷替换成异步版本。你不需要写async/await,代码看起来完全是同步的,但实际是异步执行的。
from gevent import monkey monkey.patch_all() # 这行会替换标准库 import requests # 现在requests是异步的了 from gevent.pool import Pool def fetch(url): return requests.get(url).status_code pool = Pool(10) urls = [...] results = pool.map(fetch, urls)看起来很美好,但问题也不少:
调试困难(堆栈信息乱七八糟)
很多C扩展库不兼容
已经慢慢过时了,社区活跃度下降
方案二:curio或trio
这两个是比asyncio更现代、更易用的异步库。但生态不如asyncio,第三方支持少。
实际项目里怎么选?
我见过很多团队纠结这个问题。给你一个决策树:
你的任务主要是I/O密集(网络请求、文件读写、数据库查询)→ 考虑异步或并发
代码量小,愿意重写→ 直接用
aiohttp+asyncio,性能最好代码量大,不想大改→
ThreadPoolExecutor,简单粗暴有效既要又要:部分异步部分同步→
asyncio.to_thread混用CPU密集型任务→ 别折腾异步了,用
multiprocessing(多进程)
一个完整的例子:混合方案
假设你有这样一个需求:从1000个API抓数据,然后对每个数据做一次CPU密集计算(比如图像处理)。
混合方案最合适:
import asyncio from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import requests import time # CPU密集函数 def process_data(data): # 假设这里有复杂的计算 time.sleep(0.1) # 模拟计算 return data * 2 # I/O密集函数 def fetch_data(url): return requests.get(url).json() async def main(): urls = [...] # 1000个URL # 用线程池处理I/O with ThreadPoolExecutor(max_workers=50) as io_executor: loop = asyncio.get_running_loop() fetch_tasks = [ loop.run_in_executor(io_executor, fetch_data, url) for url in urls ] raw_data = await asyncio.gather(*fetch_tasks) # 用进程池处理CPU密集任务 with ProcessPoolExecutor(max_workers=8) as cpu_executor: process_tasks = [ loop.run_in_executor(cpu_executor, process_data, data) for data in raw_data ] results = await asyncio.gather(*process_tasks) return results asyncio.run(main())这个方案里:
网络请求并发50个,不浪费带宽
计算部分用多进程,避开GIL
整体用异步协调,代码清晰
总结一句话
同步代码想异步执行,最简单的就是线程池。想要更高效更优雅,就用asyncio配合to_thread。但记住:没有银弹,真正的异步需要你从底层改起。
回到小李的爬虫。他最后选择了ThreadPoolExecutor,改了10行代码,速度提升了8倍。虽然不完美,但够用了。
“够用就好”这四个字,在工程里往往比“最优”更重要。