分布式系统架构:分布式锁与并发控制的设计模式
2026/6/12 1:54:55 网站建设 项目流程

分布式系统架构:分布式锁与并发控制的设计模式

一、单机锁的失效:分布式环境下的并发困境

在单机应用中,使用sync.Mutexsynchronized就能解决并发问题。但当服务部署到多个节点时,单机锁只能保护本进程内的资源,无法阻止其他节点同时操作共享资源。典型的场景包括:防止重复下单、库存扣减的超卖、定时任务的重复执行。

分布式锁看似简单——在 Redis 里设一个 key 就行,但生产级的分布式锁远比想象中复杂。锁的获取、续期、释放、异常恢复,每一个环节都有边界条件需要处理。更关键的是,不同业务场景对锁的要求不同——有的要求强一致性,有的允许偶尔失效,选错方案会导致系统级故障。

flowchart TB subgraph 单机锁失效场景 N1[节点A] -->|本地锁保护| DB[(数据库)] N2[节点B] -->|本地锁保护| DB N3[节点C] -->|本地锁保护| DB Note1[三个节点的本地锁互不感知<br/>无法防止并发冲突] -.-> DB end subgraph 分布式锁方案 N4[节点A] -->|获取锁| Redis[(Redis<br/>分布式锁)] N5[节点B] -->|获取锁| Redis N6[节点C] -->|获取锁| Redis Redis -->|仅一个节点获得锁| DB2[(数据库)] end

二、分布式锁的核心机制

2.1 锁的四个基本操作

分布式锁需要四个基本操作:获取(Acquire)、续期(Renew)、释放(Release)和强制释放(Force Release)。获取操作需要保证原子性——检查 key 是否存在和设置 key 必须在同一个命令中完成。续期操作用于长时间任务,防止锁因超时而被其他节点抢占。释放操作必须验证持有者身份,防止误删其他节点的锁。

2.2 Redlock 算法与单节点锁的取舍

Redis 官方推荐的 Redlock 算法在多个独立 Redis 实例上获取锁,只有大多数实例获取成功才算锁获取成功。这种方案提供了更强的安全性保证,但延迟更高(需要与多个实例通信),且在时钟漂移场景下仍有极小概率失效。对于大多数业务场景,单节点 Redis 锁 + 合理的超时时间已经足够。

sequenceDiagram participant Client as 客户端 participant Redis as Redis单节点 Client->>Redis: SET lock_key unique_value NX PX 30000 Note over Client,Redis: NX=不存在时才设置<br/>PX=过期时间30秒 Redis-->>Client: OK(获取锁成功) Note over Client: 执行业务逻辑(耗时较长) Client->>Redis: EXPIRE lock_key 30000 Note over Client,Redis: 续期:重置过期时间 Client->>Redis: GET lock_key Redis-->>Client: unique_value(确认是自己的锁) Client->>Redis: DEL lock_key Note over Client,Redis: 释放:先验证再删除

三、生产级代码实现

3.1 Redis 分布式锁

import time import uuid import logging import asyncio from typing import Optional logger = logging.getLogger(__name__) class DistributedLock: """基于 Redis 的分布式锁 设计考量: - 使用 SET NX PX 保证获取操作的原子性 - 每个锁持有者使用唯一标识,防止误删 - 内置续期守护线程,防止长任务锁超时 - 释放时使用 Lua 脚本保证检查+删除的原子性 """ # Lua 脚本:原子性地检查锁持有者并删除 RELEASE_SCRIPT = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ # Lua 脚本:原子性地检查锁持有者并续期 RENEW_SCRIPT = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end """ def __init__( self, redis_client, lock_key: str, timeout_ms: int = 30000, retry_count: int = 3, retry_delay_ms: int = 200, ): self.redis = redis_client self.lock_key = lock_key self.timeout_ms = timeout_ms self.retry_count = retry_count self.retry_delay_ms = retry_delay_ms self.lock_value = str(uuid.uuid4()) self._renew_task: Optional[asyncio.Task] = None async def acquire(self) -> bool: """获取分布式锁,支持重试""" for attempt in range(self.retry_count): result = await self.redis.set( self.lock_key, self.lock_value, nx=True, # 仅在 key 不存在时设置 px=self.timeout_ms, # 过期时间(毫秒) ) if result: logger.info(f"获取锁成功: key={self.lock_key}, value={self.lock_value}") # 启动续期守护 self._start_renew_daemon() return True if attempt < self.retry_count - 1: jitter = int(self.retry_delay_ms * (0.5 + uuid.uuid4().int % 1000 / 2000)) await asyncio.sleep(jitter / 1000) logger.warning(f"获取锁失败: key={self.lock_key}, 尝试 {self.retry_count} 次") return False async def release(self) -> bool: """释放分布式锁,使用 Lua 脚本保证原子性""" # 停止续期守护 self._stop_renew_daemon() result = await self.redis.eval( self.RELEASE_SCRIPT, 1, # KEYS 数量 self.lock_key, # KEYS[1] self.lock_value, # ARGV[1] ) if result: logger.info(f"释放锁成功: key={self.lock_key}") else: logger.warning(f"释放锁失败(锁已过期或被其他持有者获取): key={self.lock_key}") return bool(result) def _start_renew_daemon(self) -> None: """启动续期守护协程,在锁过期前自动续期""" self._renew_task = asyncio.create_task(self._renew_loop()) def _stop_renew_daemon(self) -> None: """停止续期守护""" if self._renew_task: self._renew_task.cancel() self._renew_task = None async def _renew_loop(self) -> None: """续期循环:每过 timeout_ms/3 续期一次""" interval = self.timeout_ms / 3000 # 转换为秒,取 1/3 try: while True: await asyncio.sleep(interval) result = await self.redis.eval( self.RENEW_SCRIPT, 1, self.lock_key, self.lock_value, self.timeout_ms, ) if not result: logger.error(f"续期失败: key={self.lock_key},锁可能已被其他节点获取") break logger.debug(f"续期成功: key={self.lock_key}") except asyncio.CancelledError: pass # 正常取消,不需要处理 async def __aenter__(self): if not await self.acquire(): raise RuntimeError(f"无法获取分布式锁: {self.lock_key}") return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.release() return False

3.2 业务使用示例

async def deduct_inventory(product_id: str, quantity: int) -> bool: """库存扣减:使用分布式锁防止超卖""" lock_key = f"inventory:lock:{product_id}" async with DistributedLock(redis_client, lock_key, timeout_ms=10000) as lock: # 在锁保护下读取当前库存 current = await redis_client.get(f"inventory:{product_id}") if current is None: return False current_qty = int(current) if current_qty < quantity: return False # 扣减库存 new_qty = current_qty - quantity await redis_client.set(f"inventory:{product_id}", new_qty) return True

四、边界分析与架构权衡

4.1 Redis 锁的安全性边界

Redis 分布式锁不是绝对安全的。在 Redis 主从切换时,可能出现两个客户端同时持有锁的情况:客户端 A 从主节点获取锁后,主节点宕机,锁数据尚未同步到从节点;从节点升为主节点后,客户端 B 也能获取同一把锁。如果业务要求绝对安全,应使用 Redlock 算法或基于 etcd/ZooKeeper 的共识锁。

4.2 锁超时与任务时长的矛盾

锁超时时间设置过短,长任务未完成锁就过期了;设置过长,持有者宕机后其他节点等待时间太久。续期守护机制缓解了这个问题,但引入了新的风险——如果续期守护线程本身卡住(如 GC 停顿),锁仍可能过期。对于关键业务,应在业务层做幂等校验,不完全依赖锁的正确性。

4.3 锁粒度的权衡

粗粒度锁(如按商品 ID 加锁)实现简单,但并发度低——所有对同一商品的请求串行执行。细粒度锁(如按 SKU + 仓库加锁)并发度高,但锁管理复杂,且容易出现死锁。选择粒度的原则是:锁的范围应恰好覆盖需要保护的资源,不扩大也不缩小。

五、总结

分布式锁是分布式并发控制的基础设施,但其安全性有边界。Redis 单节点锁适合大多数业务场景,实现简单、性能优秀;Redlock 和共识锁提供更强的安全保证,但代价是更高的延迟和更复杂的运维。无论选择哪种方案,都应在业务层做幂等校验,不完全依赖锁的正确性。

落地路线建议:第一步,识别系统中需要分布式保护的共享资源,评估并发冲突的风险等级;第二步,对高风险资源(如库存、余额)接入 Redis 分布式锁;第三步,为长任务添加续期机制,为关键操作添加幂等校验;第四步,对安全性要求极高的场景,评估是否需要迁移到 Redlock 或 etcd 锁。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询