从gather到TaskGroup:写给 Python 开发者的结构化并发与可取消业务设计指南
很多 Python 新人第一次写异步代码时,喜欢这样启动一组任务:
results=awaitasyncio.gather(fetch_user(),fetch_order(),fetch_payment(),)代码看起来优雅,三个任务并发执行,最后一次性拿到结果。但当业务进入真实世界,问题马上出现:如果fetch_payment()失败了,另外两个任务还要继续跑吗?如果接口超时了,后台扣费任务是否已经停下?如果任务吞掉了CancelledError,系统会不会表面超时、背后继续扣费?
这篇文章要讲的不是“如何把代码写成 async”,而是更关键的工程能力:
如何让一组并发任务有边界、有归属、能失败、能取消、能清理。
Python 的简洁语法让它从早期脚本语言成长为 Web 开发、自动化、数据科学和 AI 工程中的通用工具。Python 官方文档记录,Python 由 Guido van Rossum 于 20 世纪 90 年代初在 CWI 创建,继承了 ABC 语言的一些思想;而 Stack Overflow 2025 调查也提到,Python 从 2024 到 2025 的采用率继续明显增长,尤其受 AI、数据科学和后端开发推动。(Python documentation)
也正因为 Python 已经深入生产系统,我们不能只会写“能跑”的异步代码,还要写“出错时也可靠”的异步代码。
一、先给结论:我会优先选TaskGroup
在 Python 3.11 及以上版本中,如果场景是:
一组并发子任务中,任何一个失败都应取消其他任务。
我会优先选择:
asyncio.TaskGroup而不是:
asyncio.gather原因很简单:
TaskGroup默认就是为“相关任务一起成功、一起失败、一起收尾”设计的。
Python 官方文档明确说明,TaskGroup是一个异步上下文管理器,组内任务会在退出上下文时全部等待完成;第一次有任务抛出非CancelledError异常时,组内剩余任务会被取消,之后异常会以ExceptionGroup或BaseExceptionGroup的形式抛出。(Python documentation)
而gather()的默认语义不同:如果return_exceptions=False,第一个异常会立即传播给等待gather()的调用方,但其他 awaitable 默认不会被取消,而是继续运行。Python 文档也特别指出,相比gather,TaskGroup对嵌套子任务提供更强的安全保证:子任务出错时,TaskGroup会取消剩余任务,而gather不会。(Python documentation)
这句话在生产环境里价值很大:失败不是一个局部事件,它会影响整组任务的生命周期。
二、asyncio.gather:适合“收集结果”,但不是天然失败边界
gather的优势是简单。它适合一组互相独立的任务,并且你想按输入顺序拿到结果。
importasyncioasyncdeffetch_user():awaitasyncio.sleep(0.2)return{"user":"Tina"}asyncdeffetch_orders():awaitasyncio.sleep(0.3)return["order-1","order-2"]asyncdefmain():user,orders=awaitasyncio.gather(fetch_user(),fetch_orders(),)print(user,orders)asyncio.run(main())如果两个任务都成功,gather会返回一个结果列表,顺序与传入 awaitable 的顺序一致。Python 文档也说明,gather会并发运行传入的 awaitable;如果传入的是协程,会自动调度为 Task。(Python documentation)
但问题出现在异常场景。
importasyncioasyncdefcharge():awaitasyncio.sleep(0.2)raiseRuntimeError("payment failed")asyncdefsend_coupon():try:awaitasyncio.sleep(2)print("coupon sent")finally:print("send_coupon cleanup")asyncdefmain():try:awaitasyncio.gather(charge(),send_coupon(),)exceptRuntimeErrorasexc:print("caught:",exc)asyncio.run(main())很多人以为charge()抛异常后,send_coupon()会自动停掉。实际上,默认情况下并不会。gather()会把第一个异常抛给调用方,但其他任务可能继续运行。
在支付、库存、优惠券、消息推送这类业务中,这可能非常危险。比如扣费失败了,但发券任务还在继续;订单取消了,但发货通知还在后台偷偷跑。
三、TaskGroup:让并发任务拥有清晰边界
同样的场景,用TaskGroup改写:
importasyncioasyncdefcharge():awaitasyncio.sleep(0.2)raiseRuntimeError("payment failed")asyncdefsend_coupon():try:awaitasyncio.sleep(2)print("coupon sent")finally:print("send_coupon cleanup")asyncdefmain():try:asyncwithasyncio.TaskGroup()astg:tg.create_task(charge())tg.create_task(send_coupon())except*RuntimeErroraseg:print("caught errors:",eg.exceptions)asyncio.run(main())这段代码的关键不是语法变化,而是语义变化:
进入 TaskGroup ↓ 启动 charge 和 send_coupon ↓ charge 失败 ↓ TaskGroup 取消 send_coupon ↓ 等待 send_coupon 清理完成 ↓ 统一抛出 ExceptionGroupTaskGroup让并发任务像普通函数调用一样有作用域。进入async with,任务开始;退出async with,任务必须已经完成、失败或被取消。不会留下没人管的“孤儿任务”。
这就是结构化并发。
四、结构化并发为什么重要?
所谓结构化并发,可以用一句话解释:
谁创建任务,谁负责等待、取消、收尾和处理异常。
它解决的是异步系统里最难排查的一类问题:任务生命周期失控。
没有结构化并发时,代码常常变成这样:
asyncio.create_task(send_email())asyncio.create_task(sync_crm())asyncio.create_task(update_metrics())return{"ok":True}看起来很快,接口马上返回。但这些后台任务谁负责?失败了谁知道?请求超时了它们还要不要继续?进程退出时数据是否写完?
结构化并发强制我们把任务放进一个可见的作用域:
asyncwithasyncio.TaskGroup()astg:tg.create_task(send_email())tg.create_task(sync_crm())tg.create_task(update_metrics())它带来的工程收益包括:
| 维度 | 非结构化任务 | 结构化并发 |
|---|---|---|
| 生命周期 | 分散、难追踪 | 由作用域管理 |
| 失败处理 | 容易遗漏 | 集中传播 |
| 取消语义 | 手动维护 | 自动级联 |
| 资源清理 | 容易泄漏 | 退出上下文前完成 |
| 可读性 | 越写越乱 | 像同步代码一样有边界 |
对初学者来说,结构化并发像“给异步代码加括号”;对资深开发者来说,它是可靠系统设计的基础。
五、CancelledError:取消不是普通失败,而是一种控制信号
理解TaskGroup,必须理解 cancellation。
在asyncio中,取消一个 Task,并不是粗暴杀线程,而是在协程的下一个可取消点注入asyncio.CancelledError。Python 文档说明,当任务被取消时,CancelledError会在任务中“下一次机会”被抛出;文档也建议协程使用try/finally做清理,如果显式捕获CancelledError,通常应在清理完成后继续传播。(Python documentation)
正确写法:
importasyncioasyncdefworker():resource=awaitopen_resource()try:awaitdo_work(resource)exceptasyncio.CancelledError:print("worker cancelled, cleaning up")raisefinally:awaitresource.close()注意这里最重要的一行:
raise它表示:我知道自己被取消了,我做完清理后,会把取消信号继续传出去。
错误写法:
asyncdefworker():try:awaitdo_work()exceptasyncio.CancelledError:print("cancelled, but I swallowed it")return"ok"这段代码非常危险。它把“取消”伪装成“成功返回”。调用方以为任务已经按取消流程结束,但实际上协程可能掩盖了系统想中断它的意图。
Python 文档特别提醒,TaskGroup和asyncio.timeout()这类实现结构化并发的组件内部依赖 cancellation;如果协程吞掉CancelledError,这些组件可能行为异常。(Python documentation)
六、真实事故:接口超时后,后台任务仍偷偷扣费
设想一个支付接口:
asyncdefpay(order_id:str):asyncio.create_task(charge_card(order_id))return{"status":"processing"}这段代码看起来响应很快,但有巨大隐患:charge_card()成了游离任务。接口超时、客户端重试、上游取消请求,都不一定能影响它。
更糟的是,如果请求超时后用户再次发起支付,第一次的后台扣费任务还在跑,第二次请求又启动新的扣费任务,就可能出现重复扣费。
再看一个更隐蔽的坏例子:
asyncdefcharge_card(order_id:str):try:awaitpayment_client.charge(order_id)exceptasyncio.CancelledError:# 错误:吞掉取消,继续做“补偿”awaitpayment_client.charge(order_id)return这段代码几乎是事故制造机:系统要求取消,它却在取消分支里继续扣费。
七、如何设计“可取消”的业务流程?
可取消不是简单地给代码加task.cancel()。业务上真正可取消,通常需要状态机、幂等、超时、补偿和审计一起设计。
1. 给每次业务操作分配幂等键
扣费、发券、创建订单这类外部副作用操作必须有 idempotency key。
defbuild_idempotency_key(order_id:str,action:str)->str:returnf"{order_id}:{action}:v1"调用支付系统时传入:
awaitpayment_client.charge(order_id=order_id,amount=amount,idempotency_key=build_idempotency_key(order_id,"charge"),)这样即使上游重试,也不会因为重复请求导致重复扣费。
2. 用TaskGroup管住相关子任务
importasyncioasyncdefprocess_payment(order_id:str,amount:int):asyncwithasyncio.TaskGroup()astg:charge_task=tg.create_task(charge(order_id,amount))audit_task=tg.create_task(write_audit_log(order_id))notify_task=tg.create_task(prepare_notification(order_id))return{"charge":charge_task.result(),"audit":audit_task.result(),"notify":notify_task.result(),}如果charge()失败,审计和通知准备任务会被取消。不会出现“扣费失败但通知成功”的脏状态。
3. 用asyncio.timeout()管住整体耗时
Python 文档说明,asyncio.timeout()是一个异步上下文管理器,可以限制等待某个操作的时间;超时后会取消当前任务,并把内部的CancelledError转换为可在上下文外捕获的TimeoutError。(Python documentation)
asyncdefhandle_request(order_id:str,amount:int):try:asyncwithasyncio.timeout(3):returnawaitprocess_payment(order_id,amount)exceptTimeoutError:awaitmark_order_pending(order_id)return{"status":"pending"}注意:超时后不要简单返回“失败”。对于支付这种外部副作用,你可能不知道第三方系统是否已经收到请求。更稳妥的做法是把订单标记为pending,由对账或查询任务确认最终状态。
4. 在取消点做清理,但不要吞掉取消
asyncdefcharge(order_id:str,amount:int):try:awaitmark_order_charging(order_id)result=awaitpayment_client.charge(order_id=order_id,amount=amount,idempotency_key=build_idempotency_key(order_id,"charge"),)awaitmark_order_paid(order_id,result.transaction_id)returnresultexceptasyncio.CancelledError:awaitmark_order_pending(order_id)raise这里的raise必不可少。它告诉外层TaskGroup或timeout:任务确实被取消了。
5. 对不可逆步骤保持敬畏
有些操作一旦发出,就不能假装取消。例如支付请求已经发送到第三方,网络断了,并不代表扣费没发生。
所以可取消流程应该区分:
可安全取消阶段: 参数校验、库存预检查、风控预检查、准备日志 不可确定阶段: 第三方支付请求已发出,但响应未知 已完成阶段: 明确成功或明确失败对于“不可确定阶段”,不要盲目重试扣费,而要查询、对账、使用幂等键和状态机收敛结果。
八、gather什么时候仍然有价值?
虽然我在“失败取消其他任务”的场景优先选TaskGroup,但gather并没有过时。它适合这些场景:
第一,任务彼此独立,某个失败不影响其他任务。
results=awaitasyncio.gather(fetch_banner(),fetch_recommendations(),fetch_profile(),return_exceptions=True,)第二,你需要兼容 Python 3.10 或更早版本。
第三,你明确想把异常当作结果收集,再逐个处理。
forresultinresults:ifisinstance(result,Exception):log_error(result)else:use(result)但如果业务语义是“一荣俱荣,一损俱损”,例如支付流水、批量写一致性数据、同一请求下的多个强相关子任务,请优先用TaskGroup。
九、最佳实践清单:生产级异步代码怎么写
相关任务放进同一个
TaskGroup。
不要随手create_task()后就不管。失败需要级联取消时,不要用裸
gather()。gather()默认不会因为一个任务失败而取消其他任务。捕获
CancelledError后通常必须重新抛出。
清理可以做,吞掉不应该。超时不是业务最终状态。
超时只代表调用方等不到结果,不代表外部操作没发生。所有外部副作用操作都要幂等。
支付、发券、发货、创建账户、写消息队列都应如此。用状态机表达业务阶段。
created -> charging -> paid / failed / pending比一个布尔值可靠得多。把不可逆操作和可取消操作分开。
先做可取消准备,再进入受控的外部副作用阶段。保留审计日志和对账任务。
异步系统不是不出错,而是出错后能收敛。
十、总结:结构化并发是异步代码的秩序感
asyncio.gather和TaskGroup的差异,不只是 API 写法不同,而是工程哲学不同。
gather像是“把几个任务放在一起等结果”;TaskGroup像是“创建一个有边界的并发作用域”。
当任务彼此独立时,gather很方便。
当任务彼此相关,尤其是“任何一个失败都应取消其他任务”时,TaskGroup更安全、更清晰,也更符合现代 Python 的结构化并发方向。
最后,把本文浓缩成几句话:
gather适合收集独立结果,TaskGroup适合管理相关任务。
结构化并发的核心是:任务不能成为孤儿。
取消不是异常噪声,而是系统传递控制权的严肃信号。
吞掉CancelledError,等于剪断了调用方对任务生命周期的控制线。
支付、扣费、发券这类业务必须用幂等键、状态机和可恢复流程设计,而不是只靠超时和取消。
写 Python 异步代码,真正的成熟不是“我会并发”,而是“我知道失败时谁该停下,谁该清理,谁该负责最终一致”。这也是从 Python教程 走向 Python实战、从语法使用走向 Python最佳实践 的关键一步。