Python concurrent.futures:并发编程实战指南
2026/6/1 1:49:53 网站建设 项目流程

Python concurrent.futures:并发编程实战指南

引言

在后端开发中,并发编程是提高系统性能的关键技术。Python的concurrent.futures模块提供了简洁的高级API,让开发者能够轻松实现多线程和多进程并发。作为一名从Python转向Rust的后端开发者,我在实践中总结了concurrent.futures的最佳实践。本文将深入探讨这个强大的并发工具,帮助你编写高效的并发代码。

一、concurrent.futures基础

1.1 模块概述

concurrent.futures模块提供了两个主要的执行器:

  • ThreadPoolExecutor:线程池执行器
  • ProcessPoolExecutor:进程池执行器

1.2 基本使用模式

from concurrent.futures import ThreadPoolExecutor def task(n): return n * n with ThreadPoolExecutor(max_workers=4) as executor: future = executor.submit(task, 5) result = future.result() print(result)

1.3 核心组件

组件说明
Executor抽象执行器基类
ThreadPoolExecutor线程池执行器
ProcessPoolExecutor进程池执行器
Future表示异步计算的结果

二、ThreadPoolExecutor详解

2.1 创建线程池

from concurrent.futures import ThreadPoolExecutor # 创建线程池 executor = ThreadPoolExecutor( max_workers=4, thread_name_prefix='worker-' ) # 使用上下文管理器 with ThreadPoolExecutor(max_workers=4) as executor: # 提交任务 pass

2.2 提交任务

from concurrent.futures import ThreadPoolExecutor def download_file(url): # 模拟下载 import time time.sleep(1) return f"Downloaded: {url}" with ThreadPoolExecutor(max_workers=3) as executor: # 提交单个任务 future = executor.submit(download_file, "http://example.com/file1.txt") # 获取结果(阻塞) result = future.result(timeout=5) print(result)

2.3 批量提交任务

urls = [ "http://example.com/file1.txt", "http://example.com/file2.txt", "http://example.com/file3.txt", ] with ThreadPoolExecutor(max_workers=3) as executor: # 批量提交 futures = [executor.submit(download_file, url) for url in urls] # 获取所有结果 for future in futures: print(future.result())

三、ProcessPoolExecutor详解

3.1 创建进程池

from concurrent.futures import ProcessPoolExecutor def compute_heavy(n): # CPU密集型任务 return sum(i * i for i in range(n)) with ProcessPoolExecutor(max_workers=4) as executor: future = executor.submit(compute_heavy, 1_000_000) print(future.result())

3.2 线程池 vs 进程池

特性ThreadPoolExecutorProcessPoolExecutor
GIL限制受GIL限制不受GIL限制
适用场景IO密集型CPU密集型
启动开销
内存开销
数据共享容易困难

3.3 选择建议

# IO密集型任务 → ThreadPoolExecutor # CPU密集型任务 → ProcessPoolExecutor # 混合任务 → 结合使用 def process_task(data): # IO操作 raw_data = fetch_from_api(data) # CPU操作 result = compute(raw_data) return result

四、Future对象详解

4.1 Future状态

from concurrent.futures import Future future = Future() # 检查状态 print(future.done()) # False print(future.running()) # False print(future.cancelled()) # False # 设置结果 future.set_result(42) print(future.done()) # True print(future.result()) # 42

4.2 添加回调

def callback(future): print(f"Task completed: {future.result()}") with ThreadPoolExecutor() as executor: future = executor.submit(task, 5) future.add_done_callback(callback)

4.3 超时处理

try: result = future.result(timeout=2) except concurrent.futures.TimeoutError: print("Task timed out")

五、高级用法

5.1 as_completed

from concurrent.futures import ThreadPoolExecutor, as_completed def task(id): import time time.sleep(id) return f"Task {id} completed" with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(task, i) for i in range(1, 4)] # 按完成顺序获取结果 for future in as_completed(futures): print(future.result())

5.2 map函数

with ThreadPoolExecutor(max_workers=3) as executor: results = executor.map(task, [1, 2, 3, 4, 5]) # 按输入顺序返回结果 for result in results: print(result)

5.3 wait函数

from concurrent.futures import wait, FIRST_COMPLETED with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(task, i) for i in range(1, 4)] # 等待第一个完成 done, not_done = wait(futures, return_when=FIRST_COMPLETED) print(f"Completed: {len(done)}") print(f"Not completed: {len(not_done)}")

六、实战案例

6.1 并行下载文件

import requests from concurrent.futures import ThreadPoolExecutor def download_file(url, save_path): response = requests.get(url) with open(save_path, 'wb') as f: f.write(response.content) return save_path urls = [ ("https://example.com/image1.jpg", "images/image1.jpg"), ("https://example.com/image2.jpg", "images/image2.jpg"), ("https://example.com/image3.jpg", "images/image3.jpg"), ] with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(download_file, url, path) for url, path in urls] for future in as_completed(futures): print(f"Downloaded: {future.result()}")

6.2 并行数据库查询

import psycopg2 from concurrent.futures import ThreadPoolExecutor def query_user(user_id): conn = psycopg2.connect("dbname=example user=postgres") cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) result = cursor.fetchone() conn.close() return result user_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with ThreadPoolExecutor(max_workers=4) as executor: results = executor.map(query_user, user_ids) for user_id, user in zip(user_ids, results): print(f"User {user_id}: {user}")

6.3 混合IO和CPU任务

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def fetch_data(url): import requests return requests.get(url).json() def process_data(data): # CPU密集型处理 return sum(item['value'] for item in data) def pipeline(url): data = fetch_data(url) return process_data(data) urls = ["https://api.example.com/data1", "https://api.example.com/data2"] # IO阶段使用线程池 with ThreadPoolExecutor(max_workers=4) as io_executor: futures = [io_executor.submit(fetch_data, url) for url in urls] raw_data = [f.result() for f in futures] # CPU阶段使用进程池 with ProcessPoolExecutor(max_workers=4) as cpu_executor: results = list(cpu_executor.map(process_data, raw_data)) print(results)

七、最佳实践

7.1 合理设置worker数量

import os # CPU密集型任务 cpu_workers = os.cpu_count() or 4 # IO密集型任务 io_workers = min(32, (os.cpu_count() or 4) * 5)

7.2 避免共享状态

# 不好的做法:共享可变状态 counter = 0 def increment(): global counter counter += 1 # 好的做法:使用线程安全的数据结构或锁 from threading import Lock class ThreadSafeCounter: def __init__(self): self._count = 0 self._lock = Lock() def increment(self): with self._lock: self._count += 1

7.3 优雅关闭

executor = ThreadPoolExecutor(max_workers=4) try: # 提交任务 futures = [executor.submit(task, i) for i in range(10)] # 获取结果 for future in futures: print(future.result()) finally: # 关闭执行器 executor.shutdown(wait=True)

八、性能对比

8.1 同步vs异步

import time def sync_download(urls): for url in urls: download_file(url) def async_download(urls): with ThreadPoolExecutor(max_workers=5) as executor: executor.map(download_file, urls) # 性能对比 urls = ["https://example.com/file{}.txt".format(i) for i in range(10)] start = time.time() sync_download(urls) print(f"Sync time: {time.time() - start:.2f}s") start = time.time() async_download(urls) print(f"Async time: {time.time() - start:.2f}s")

8.2 线程池vs进程池

def cpu_intensive(n): return sum(i * i for i in range(n)) # 线程池(受GIL限制) with ThreadPoolExecutor(max_workers=4) as executor: start = time.time() executor.map(cpu_intensive, [10_000_000] * 4) print(f"ThreadPool time: {time.time() - start:.2f}s") # 进程池(不受GIL限制) with ProcessPoolExecutor(max_workers=4) as executor: start = time.time() executor.map(cpu_intensive, [10_000_000] * 4) print(f"ProcessPool time: {time.time() - start:.2f}s")

总结

concurrent.futures是Python并发编程的利器。通过本文的学习,你应该掌握了以下核心要点:

  1. ThreadPoolExecutor:适用于IO密集型任务
  2. ProcessPoolExecutor:适用于CPU密集型任务
  3. Future对象:管理异步计算结果
  4. 高级APIas_completedmapwait
  5. 实战案例:并行下载、数据库查询、混合任务
  6. 最佳实践:合理设置worker数量、避免共享状态
  7. 性能对比:同步vs异步、线程池vs进程池

作为从Python转向Rust的后端开发者,理解并发编程模式对于构建高性能系统至关重要。虽然Rust的并发模型更加安全,但Python的concurrent.futures提供了快速实现并发的便捷方式。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询