python: Bulkheads Pattern
2026/6/26 23:52:42 网站建设 项目流程

项目结构:

# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:15 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py # 舱壁资源配置:各业务域独立线程池参数 BULKHEAD_CONFIG = { "inventory": { "max_workers": 2, "timeout": 3.0, "thread_prefix": "bulkhead-inventory" }, "order_pay": { "max_workers": 3, "timeout": 3.0, "thread_prefix": "bulkhead-order-pay" }, "logistics": { "max_workers": 2, "timeout": 3.0, "thread_prefix": "bulkhead-logistics" } } # 业务常量 BUSINESS_TIMEOUT_MSG = "请求超时,舱壁隔离熔断" # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:14 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : exceptions.py class BulkheadTimeoutException(Exception): """ 舱壁执行超时异常 """ pass class BulkheadPoolFullException(Exception): """ 舱壁线程池耗尽异常 """ pass # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:15 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : bulkhead_executor.py import time from concurrent.futures import ThreadPoolExecutor, TimeoutError from functools import wraps from typing import Dict, Any from .exceptions import BulkheadTimeoutException class BulkheadExecutor: """ 舱壁执行器:每个业务域独立实例,隔离线程资源 """ def __init__(self, config: Dict[str, Any]): self.max_workers = config["max_workers"] self.timeout = config["timeout"] self.pool = ThreadPoolExecutor( max_workers=self.max_workers, thread_name_prefix=config["thread_prefix"] ) def execute(self, func, *args, **kwargs): """ 提交任务至独立舱壁线程池执行 :param func: :param args: :param kwargs: :return: """ try: future = self.pool.submit(func, *args, **kwargs) return future.result(timeout=self.timeout) except TimeoutError: raise BulkheadTimeoutException(f"执行超时 {self.timeout}s") except Exception as e: raise e def bulkhead_decorator(self): """ 装饰器:绑定当前舱壁实例执行能力 :return: """ def decorator(target_func): @wraps(target_func) def wrapper(*args, **kwargs): try: return self.execute(target_func, *args, **kwargs) except BulkheadTimeoutException as te: return {"success": False, "code": 504, "msg": str(te)} except Exception as e: return {"success": False, "code": 500, "msg": f"业务异常: {str(e)}"} return wrapper return decorator def shutdown(self, wait: bool = True): """ 程序退出释放线程池资源 :param wait: :return: """ self.pool.shutdown(wait=wait) # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:17 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logger.py import logging import sys # 全局日志配置 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(threadName)s] %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", stream=sys.stdout ) logger = logging.getLogger("jewelry-bulkhead") # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:17 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : inventory_service.py import time from BulkheadsPattern.config.settings import BULKHEAD_CONFIG from BulkheadsPattern.core.bulkhead_executor import BulkheadExecutor from BulkheadsPattern.common.logger import logger # 初始化独立库存舱壁 inventory_bulkhead = BulkheadExecutor(BULKHEAD_CONFIG["inventory"]) bulkhead = inventory_bulkhead.bulkhead_decorator() class InventoryService: """ 珠宝库存服务:钻石、黄金、铂金库存查询,独立舱壁隔离 """ @bulkhead def query_gem_stock(self, jewelry_id: str): """ 查询宝石类珠宝库存(模拟阻塞故障场景) :param jewelry_id: :return: """ logger.info(f"查询珠宝库存 ID: {jewelry_id}") # 模拟数据库慢查询/第三方接口卡死,超过超时阈值 time.sleep(5) return { "success": True, "code": 200, "data": { "jewelry_id": jewelry_id, "stock_num": 15, "material": "钻石", "store": "深圳总店" } } @bulkhead def query_gold_stock(self, jewelry_id: str): """ 黄金素圈库存查询(正常逻辑) :param jewelry_id: :return: """ logger.info(f"查询黄金库存 ID: {jewelry_id}") time.sleep(0.3) return { "success": True, "code": 200, "data": { "jewelry_id": jewelry_id, "stock_num": 32, "material": "足金999", "store": "全国门店总仓" } } # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:29 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : order_service.py import time from BulkheadsPattern.config.settings import BULKHEAD_CONFIG from BulkheadsPattern.core.bulkhead_executor import BulkheadExecutor from BulkheadsPattern.common.logger import logger # 订单支付独立舱壁 order_bulkhead = BulkheadExecutor(BULKHEAD_CONFIG["order_pay"]) bulkhead = order_bulkhead.bulkhead_decorator() class OrderPayService: """ 珠宝订单与支付服务:下单、收银、结算 """ @bulkhead def create_order(self, order_no: str, amount: float, user_id: str): """创建珠宝订单""" logger.info(f"创建订单 {order_no} 用户:{user_id} 金额:{amount}") time.sleep(0.5) return { "success": True, "code": 200, "data": { "order_no": order_no, "pay_amount": amount, "status": "待支付" } } @bulkhead def order_settle(self, order_no: str, pay_channel: str): """ 订单支付结算 :param order_no: :param pay_channel: :return: """ logger.info(f"订单结算 {order_no} 渠道:{pay_channel}") time.sleep(0.4) return { "success": True, "code": 200, "data": { "order_no": order_no, "pay_status": "已结清", "transaction_id": f"TX{hash(order_no) % 9999999}" } } # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:31 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : logistics_service.py import time from BulkheadsPattern.config.settings import BULKHEAD_CONFIG from BulkheadsPattern.core.bulkhead_executor import BulkheadExecutor from BulkheadsPattern.common.logger import logger # 物流独立舱壁 logistics_bulkhead = BulkheadExecutor(BULKHEAD_CONFIG["logistics"]) bulkhead = logistics_bulkhead.bulkhead_decorator() class LogisticsService: """ 珠宝物流服务:保价出库、快递揽收、物流跟踪 """ @bulkhead def ship_jewelry(self, order_no: str, receiver_addr: str): """ 珠宝保价发货 :param order_no: :param receiver_addr: :return: """ logger.info(f"订单{order_no}安排发货,收件地址:{receiver_addr}") time.sleep(0.4) return { "success": True, "code": 200, "data": { "order_no": order_no, "express_no": f"SF{hash(order_no) % 1000000}", "insured_value": 20000, "status": "已揽收" } } @bulkhead def track_express(self, express_no: str): """ 物流轨迹查询 :param express_no: :return: """ logger.info(f"查询物流单号 {express_no}") time.sleep(0.2) return { "success": True, "code": 200, "data": {"express_no": express_no, "location": "深圳转运中心"} }

调用:

# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Bulkheads Pattern 舱壁模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/26 23:31 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : BulkheadsBll.py import threading import time from BulkheadsPattern.common.logger import logger from BulkheadsPattern.business.inventory.inventory_service import InventoryService, inventory_bulkhead from BulkheadsPattern.business.order_pay.order_service import OrderPayService, order_bulkhead from BulkheadsPattern.business.logistics.logistics_service import LogisticsService, logistics_bulkhead class BulkheadsBll(object): """ """ # 实例化各业务服务 inv_svc = InventoryService() ord_svc = OrderPayService() log_svc = LogisticsService() def run_scene(self): """模拟珠宝门店高并发流量场景""" logger.info("===== 珠宝行业舱壁模式压测开始 =====") start = time.time() task_list = [ # 库存域(会超时故障) threading.Thread(target=lambda: logger.info(self.inv_svc.query_gem_stock("D001"))), threading.Thread(target=lambda: logger.info(self.inv_svc.query_gem_stock("D002"))), # 库存正常接口 threading.Thread(target=lambda: logger.info(self.inv_svc.query_gold_stock("G001"))), # 订单支付域 threading.Thread(target=lambda: logger.info(self.ord_svc.create_order("O2026062601", 16888, "U10086"))), threading.Thread(target=lambda: logger.info(self.ord_svc.create_order("O2026062602", 9999, "U10087"))), threading.Thread(target=lambda: logger.info(self.ord_svc.order_settle("O2026062601", "微信支付"))), # 物流域 threading.Thread(target=lambda: logger.info(self.log_svc.ship_jewelry("O2026062601", "深圳福田珠宝大厦"))), threading.Thread(target=lambda: logger.info(self.log_svc.track_express("SF12345678"))), ] # 启动所有并发任务 for t in task_list: t.start() for t in task_list: t.join() cost = round(time.time() - start, 2) logger.info(f"===== 压测结束,总耗时 {cost}s =====") def run_example(self): """ :return: """ try: self.run_scene() finally: # 优雅释放所有舱壁线程池资源 inventory_bulkhead.shutdown() order_bulkhead.shutdown() logistics_bulkhead.shutdown() logger.info("所有舱壁线程池已安全关闭") @staticmethod def demo(): bll = BulkheadsBll() bll.run_example()

输出:

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询