从 `gather` 到 `TaskGroup`:写给 Python 开发者的结构化并发与可取消业务设计指南
2026/4/25 8:27:03 网站建设 项目流程

gatherTaskGroup:写给 Python 开发者的结构化并发与可取消业务设计指南

很多 Python 新人第一次写异步代码时,喜欢这样启动一组任务:

results=awaitasyncio.gather(fetch_user(),fetch_order(),fetch_payment(),)

代码看起来优雅,三个任务并发执行,最后一次性拿到结果。但当业务进入真实世界,问题马上出现:如果fetch_payment()失败了,另外两个任务还要继续跑吗?如果接口超时了,后台扣费任务是否已经停下?如果任务吞掉了CancelledError,系统会不会表面超时、背后继续扣费?

这篇文章要讲的不是“如何把代码写成 async”,而是更关键的工程能力:

如何让一组并发任务有边界、有归属、能失败、能取消、能清理。

Python 的简洁语法让它从早期脚本语言成长为 Web 开发、自动化、数据科学和 AI 工程中的通用工具。Python 官方文档记录,Python 由 Guido van Rossum 于 20 世纪 90 年代初在 CWI 创建,继承了 ABC 语言的一些思想;而 Stack Overflow 2025 调查也提到,Python 从 2024 到 2025 的采用率继续明显增长,尤其受 AI、数据科学和后端开发推动。(Python documentation)

也正因为 Python 已经深入生产系统,我们不能只会写“能跑”的异步代码,还要写“出错时也可靠”的异步代码。


一、先给结论:我会优先选TaskGroup

在 Python 3.11 及以上版本中,如果场景是:

一组并发子任务中,任何一个失败都应取消其他任务。

我会优先选择:

asyncio.TaskGroup

而不是:

asyncio.gather

原因很简单:

TaskGroup默认就是为“相关任务一起成功、一起失败、一起收尾”设计的。

Python 官方文档明确说明,TaskGroup是一个异步上下文管理器,组内任务会在退出上下文时全部等待完成;第一次有任务抛出非CancelledError异常时,组内剩余任务会被取消,之后异常会以ExceptionGroupBaseExceptionGroup的形式抛出。(Python documentation)

gather()的默认语义不同:如果return_exceptions=False,第一个异常会立即传播给等待gather()的调用方,但其他 awaitable 默认不会被取消,而是继续运行。Python 文档也特别指出,相比gatherTaskGroup对嵌套子任务提供更强的安全保证:子任务出错时,TaskGroup会取消剩余任务,而gather不会。(Python documentation)

这句话在生产环境里价值很大:失败不是一个局部事件,它会影响整组任务的生命周期。


二、asyncio.gather:适合“收集结果”,但不是天然失败边界

gather的优势是简单。它适合一组互相独立的任务,并且你想按输入顺序拿到结果。

importasyncioasyncdeffetch_user():awaitasyncio.sleep(0.2)return{"user":"Tina"}asyncdeffetch_orders():awaitasyncio.sleep(0.3)return["order-1","order-2"]asyncdefmain():user,orders=awaitasyncio.gather(fetch_user(),fetch_orders(),)print(user,orders)asyncio.run(main())

如果两个任务都成功,gather会返回一个结果列表,顺序与传入 awaitable 的顺序一致。Python 文档也说明,gather会并发运行传入的 awaitable;如果传入的是协程,会自动调度为 Task。(Python documentation)

但问题出现在异常场景。

importasyncioasyncdefcharge():awaitasyncio.sleep(0.2)raiseRuntimeError("payment failed")asyncdefsend_coupon():try:awaitasyncio.sleep(2)print("coupon sent")finally:print("send_coupon cleanup")asyncdefmain():try:awaitasyncio.gather(charge(),send_coupon(),)exceptRuntimeErrorasexc:print("caught:",exc)asyncio.run(main())

很多人以为charge()抛异常后,send_coupon()会自动停掉。实际上,默认情况下并不会。gather()会把第一个异常抛给调用方,但其他任务可能继续运行。

在支付、库存、优惠券、消息推送这类业务中,这可能非常危险。比如扣费失败了,但发券任务还在继续;订单取消了,但发货通知还在后台偷偷跑。


三、TaskGroup:让并发任务拥有清晰边界

同样的场景,用TaskGroup改写:

importasyncioasyncdefcharge():awaitasyncio.sleep(0.2)raiseRuntimeError("payment failed")asyncdefsend_coupon():try:awaitasyncio.sleep(2)print("coupon sent")finally:print("send_coupon cleanup")asyncdefmain():try:asyncwithasyncio.TaskGroup()astg:tg.create_task(charge())tg.create_task(send_coupon())except*RuntimeErroraseg:print("caught errors:",eg.exceptions)asyncio.run(main())

这段代码的关键不是语法变化,而是语义变化:

进入 TaskGroup ↓ 启动 charge 和 send_coupon ↓ charge 失败 ↓ TaskGroup 取消 send_coupon ↓ 等待 send_coupon 清理完成 ↓ 统一抛出 ExceptionGroup

TaskGroup让并发任务像普通函数调用一样有作用域。进入async with,任务开始;退出async with,任务必须已经完成、失败或被取消。不会留下没人管的“孤儿任务”。

这就是结构化并发。


四、结构化并发为什么重要?

所谓结构化并发,可以用一句话解释:

谁创建任务,谁负责等待、取消、收尾和处理异常。

它解决的是异步系统里最难排查的一类问题:任务生命周期失控。

没有结构化并发时,代码常常变成这样:

asyncio.create_task(send_email())asyncio.create_task(sync_crm())asyncio.create_task(update_metrics())return{"ok":True}

看起来很快,接口马上返回。但这些后台任务谁负责?失败了谁知道?请求超时了它们还要不要继续?进程退出时数据是否写完?

结构化并发强制我们把任务放进一个可见的作用域:

asyncwithasyncio.TaskGroup()astg:tg.create_task(send_email())tg.create_task(sync_crm())tg.create_task(update_metrics())

它带来的工程收益包括:

维度非结构化任务结构化并发
生命周期分散、难追踪由作用域管理
失败处理容易遗漏集中传播
取消语义手动维护自动级联
资源清理容易泄漏退出上下文前完成
可读性越写越乱像同步代码一样有边界

对初学者来说,结构化并发像“给异步代码加括号”;对资深开发者来说,它是可靠系统设计的基础。


五、CancelledError:取消不是普通失败,而是一种控制信号

理解TaskGroup,必须理解 cancellation。

asyncio中,取消一个 Task,并不是粗暴杀线程,而是在协程的下一个可取消点注入asyncio.CancelledError。Python 文档说明,当任务被取消时,CancelledError会在任务中“下一次机会”被抛出;文档也建议协程使用try/finally做清理,如果显式捕获CancelledError,通常应在清理完成后继续传播。(Python documentation)

正确写法:

importasyncioasyncdefworker():resource=awaitopen_resource()try:awaitdo_work(resource)exceptasyncio.CancelledError:print("worker cancelled, cleaning up")raisefinally:awaitresource.close()

注意这里最重要的一行:

raise

它表示:我知道自己被取消了,我做完清理后,会把取消信号继续传出去。

错误写法:

asyncdefworker():try:awaitdo_work()exceptasyncio.CancelledError:print("cancelled, but I swallowed it")return"ok"

这段代码非常危险。它把“取消”伪装成“成功返回”。调用方以为任务已经按取消流程结束,但实际上协程可能掩盖了系统想中断它的意图。

Python 文档特别提醒,TaskGroupasyncio.timeout()这类实现结构化并发的组件内部依赖 cancellation;如果协程吞掉CancelledError,这些组件可能行为异常。(Python documentation)


六、真实事故:接口超时后,后台任务仍偷偷扣费

设想一个支付接口:

asyncdefpay(order_id:str):asyncio.create_task(charge_card(order_id))return{"status":"processing"}

这段代码看起来响应很快,但有巨大隐患:charge_card()成了游离任务。接口超时、客户端重试、上游取消请求,都不一定能影响它。

更糟的是,如果请求超时后用户再次发起支付,第一次的后台扣费任务还在跑,第二次请求又启动新的扣费任务,就可能出现重复扣费。

再看一个更隐蔽的坏例子:

asyncdefcharge_card(order_id:str):try:awaitpayment_client.charge(order_id)exceptasyncio.CancelledError:# 错误:吞掉取消,继续做“补偿”awaitpayment_client.charge(order_id)return

这段代码几乎是事故制造机:系统要求取消,它却在取消分支里继续扣费。


七、如何设计“可取消”的业务流程?

可取消不是简单地给代码加task.cancel()。业务上真正可取消,通常需要状态机、幂等、超时、补偿和审计一起设计。

1. 给每次业务操作分配幂等键

扣费、发券、创建订单这类外部副作用操作必须有 idempotency key。

defbuild_idempotency_key(order_id:str,action:str)->str:returnf"{order_id}:{action}:v1"

调用支付系统时传入:

awaitpayment_client.charge(order_id=order_id,amount=amount,idempotency_key=build_idempotency_key(order_id,"charge"),)

这样即使上游重试,也不会因为重复请求导致重复扣费。

2. 用TaskGroup管住相关子任务

importasyncioasyncdefprocess_payment(order_id:str,amount:int):asyncwithasyncio.TaskGroup()astg:charge_task=tg.create_task(charge(order_id,amount))audit_task=tg.create_task(write_audit_log(order_id))notify_task=tg.create_task(prepare_notification(order_id))return{"charge":charge_task.result(),"audit":audit_task.result(),"notify":notify_task.result(),}

如果charge()失败,审计和通知准备任务会被取消。不会出现“扣费失败但通知成功”的脏状态。

3. 用asyncio.timeout()管住整体耗时

Python 文档说明,asyncio.timeout()是一个异步上下文管理器,可以限制等待某个操作的时间;超时后会取消当前任务,并把内部的CancelledError转换为可在上下文外捕获的TimeoutError。(Python documentation)

asyncdefhandle_request(order_id:str,amount:int):try:asyncwithasyncio.timeout(3):returnawaitprocess_payment(order_id,amount)exceptTimeoutError:awaitmark_order_pending(order_id)return{"status":"pending"}

注意:超时后不要简单返回“失败”。对于支付这种外部副作用,你可能不知道第三方系统是否已经收到请求。更稳妥的做法是把订单标记为pending,由对账或查询任务确认最终状态。

4. 在取消点做清理,但不要吞掉取消

asyncdefcharge(order_id:str,amount:int):try:awaitmark_order_charging(order_id)result=awaitpayment_client.charge(order_id=order_id,amount=amount,idempotency_key=build_idempotency_key(order_id,"charge"),)awaitmark_order_paid(order_id,result.transaction_id)returnresultexceptasyncio.CancelledError:awaitmark_order_pending(order_id)raise

这里的raise必不可少。它告诉外层TaskGrouptimeout:任务确实被取消了。

5. 对不可逆步骤保持敬畏

有些操作一旦发出,就不能假装取消。例如支付请求已经发送到第三方,网络断了,并不代表扣费没发生。

所以可取消流程应该区分:

可安全取消阶段: 参数校验、库存预检查、风控预检查、准备日志 不可确定阶段: 第三方支付请求已发出,但响应未知 已完成阶段: 明确成功或明确失败

对于“不可确定阶段”,不要盲目重试扣费,而要查询、对账、使用幂等键和状态机收敛结果。


八、gather什么时候仍然有价值?

虽然我在“失败取消其他任务”的场景优先选TaskGroup,但gather并没有过时。它适合这些场景:

第一,任务彼此独立,某个失败不影响其他任务。

results=awaitasyncio.gather(fetch_banner(),fetch_recommendations(),fetch_profile(),return_exceptions=True,)

第二,你需要兼容 Python 3.10 或更早版本。

第三,你明确想把异常当作结果收集,再逐个处理。

forresultinresults:ifisinstance(result,Exception):log_error(result)else:use(result)

但如果业务语义是“一荣俱荣,一损俱损”,例如支付流水、批量写一致性数据、同一请求下的多个强相关子任务,请优先用TaskGroup


九、最佳实践清单:生产级异步代码怎么写

  1. 相关任务放进同一个TaskGroup
    不要随手create_task()后就不管。

  2. 失败需要级联取消时,不要用裸gather()
    gather()默认不会因为一个任务失败而取消其他任务。

  3. 捕获CancelledError后通常必须重新抛出。
    清理可以做,吞掉不应该。

  4. 超时不是业务最终状态。
    超时只代表调用方等不到结果,不代表外部操作没发生。

  5. 所有外部副作用操作都要幂等。
    支付、发券、发货、创建账户、写消息队列都应如此。

  6. 用状态机表达业务阶段。
    created -> charging -> paid / failed / pending比一个布尔值可靠得多。

  7. 把不可逆操作和可取消操作分开。
    先做可取消准备,再进入受控的外部副作用阶段。

  8. 保留审计日志和对账任务。
    异步系统不是不出错,而是出错后能收敛。


十、总结:结构化并发是异步代码的秩序感

asyncio.gatherTaskGroup的差异,不只是 API 写法不同,而是工程哲学不同。

gather像是“把几个任务放在一起等结果”;
TaskGroup像是“创建一个有边界的并发作用域”。

当任务彼此独立时,gather很方便。
当任务彼此相关,尤其是“任何一个失败都应取消其他任务”时,TaskGroup更安全、更清晰,也更符合现代 Python 的结构化并发方向。

最后,把本文浓缩成几句话:

gather适合收集独立结果,TaskGroup适合管理相关任务。
结构化并发的核心是:任务不能成为孤儿。
取消不是异常噪声,而是系统传递控制权的严肃信号。
吞掉CancelledError,等于剪断了调用方对任务生命周期的控制线。
支付、扣费、发券这类业务必须用幂等键、状态机和可恢复流程设计,而不是只靠超时和取消。

写 Python 异步代码,真正的成熟不是“我会并发”,而是“我知道失败时谁该停下,谁该清理,谁该负责最终一致”。这也是从 Python教程 走向 Python实战、从语法使用走向 Python最佳实践 的关键一步。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询