并发编程AQS之ReentrantLock/Semaphore/CountDownLatch/CyclicBarrier
2026/4/30 16:10:58 网站建设 项目流程

一、管程——Java线程同步的设计思想

  • 管程:指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。
  • 互斥:同一时刻只允许一个线程访问共享资源;
  • 同步:线程之间如何通信、协作。

MESA模型

在管程的发展史上,先后出现过三种不同的管程模型,分别是Hasen模型、Hoare模型和MESA模型。现在正在广泛使用的是MESA模型

管程中引入了条件变量的概念,而且每个条件变量都对应有一个等待队列。条件变量等待队列的作用是解决线程之间的同步问题。

Java中针对管程有两种实现

  • 一种是基于Object的Monitor机制,用于synchronized内置锁的实现
  • 一种是抽象队列同步器AQS,用于JUC包下Lock锁机制的实现
@Slf4j public class ConditionDemo2 { private static final ReentrantLock lock = new ReentrantLock(); private static final Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { new Thread(() -> { log.debug("t1开始执行...."); lock.lock(); try { log.debug("t1获取锁...."); // 让线程在obj上一直等待下去 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); log.debug("t1执行完成...."); } }, "t1").start(); new Thread(() -> { log.debug("t2开始执行...."); lock.lock(); try { log.debug("t2获取锁...."); // 让线程在obj上一直等待下去 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); log.debug("t2执行完成...."); } }, "t2").start(); // 主线程两秒后执行 Thread.sleep(2000); log.debug("准备获取锁,去唤醒 condition上阻塞的线程"); lock.lock(); try { // 唤醒condition上所有阻塞的线程 condition.signalAll(); log.debug("唤醒condition上阻塞的线程"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }

二、AQS原理分析

1、什么是AQS

java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列条件队列独占获取共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。

JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的

  • 一般是通过一个内部类Sync继承 AQS
  • 将同步器所有调用都映射到Sync对应的方法

AQS具备的特性:

  • 阻塞等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

2、AQS核心结构

private volatile int state;//共享变量,使用volatile修饰保证线程可见性 //返回同步状态的当前值 protected final int getState() { return state; } // 设置同步状态的值 protected final void setState(int newState) { state = newState; } //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值) protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

2.1、AQS内部维护属性volatile int state

  • state表示资源的可用状态

2.2、State访问方式

  • getState()
  • setState()
  • compareAndSetState()

2.3、资源访问方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

2.4、AQS实现方法

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

2.5、AQS定义两种队列

  • 同步等待队列: 主要用于维护获取锁失败时入队的线程。
  • 条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁。

2.6、AQS定义了5个队列中节点状态

  • 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
  • CANCELLED,值为1,表示当前的线程被取消;
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

3、同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

AQS 依赖CLH同步队列来完成同步状态的管理:

  • 当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
  • 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
  • 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)

4、条件等待队列

AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:

  • 调用await方法阻塞线程;
  • 当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)

5、基于AQS实现一把独占锁

/** * @author * 基于AQS实现一把独占锁 */ public class TulingLock extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int unused) { //cas 加锁 state=0 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int unused) { //释放锁 setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return getState() != 0; } }

三、ReentrantLock

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。

1、ReentrantLock使用方式

public class ReentrantLockTest { private final ReentrantLock lock = new ReentrantLock(); // ... public void doSomething() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock(); } } }

2、ReentrantLock原理

ReentrantLock基于 AQS + CAS 实现

  • lock()流程图

ReentrantLock基于抽象队列同步器AQS + CAS 实现的加锁、释放锁。ReentrantLock实现了公平锁、非公平锁,公平锁与非公平锁唯一的区别在于,非公平锁不会判断等待队列中是否节点等待获取锁,而是直接尝试获取锁,获取不到,再将当前线程节点添加进等待队列的尾节点,判断当前线程节点是否挂起

  • unlock()流程图

ReentrantLock释放锁的流程较为简单,优先判断持有锁资源的线程是否为当前线程,若不为当前线程抛出异常;若为当前线程,AQS的state的属性值减1,再判断减1后的值是否为0,若为0表示当前线程彻底释放锁资源,唤醒等待队列中的挂起线程节点,开始抢占锁资源。

3、ReentrantLock源码分析

3.1 构造函数

private final Sync sync; // 默认使用非公平锁 public ReentrantLock() { sync = new NonfairSync(); } // fair=true,公平锁;否则,非公平锁 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

Sync是ReentrantLock的抽象静态内部类,继承自AQS(AbstractQueuedSynchronizer) - 抽象队列同步器,AQS中定义了锁的基本行为,AQS中用volatile修饰的state表示当前锁重入的次数。

NonfairSync、FairSync是ReentrantLock的静态内部类,继承ReentrantLock$Sync,NonfairSync实现非公平锁,FairSync实现公平锁。

3.2 lock()加锁

private final Sync sync; // 加锁 public void lock() { sync.lock(); }

3.3 公平锁

调用AQS的acquire方法。ReentrantLock$FairSync#lock() 核心代码:

// 加锁 final void lock() { acquire(1); }

3.4 非公平锁

通过CAS尝试获取锁(将AQS的state由0修改为1),若成功,代表当前线程获取锁资源成功;若失败调用AQS的acquire方法。ReentrantLock$NonfairSync#lock() 核心代码:

// 加锁 final void lock() { // 获取锁资源,CAS 修改 AQS 的 state 属性值,,获取成功,设置当前线程 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); // 获取失败,执行AQS的acquire else acquire(1); }

3.5 acquire()

acquire()方法是Sync父类AQS中的方法,AbstractQueuedSynchronizer#acquire() 核心代码:

// 获取锁资源 public final void acquire(int arg) { // 尝试获取锁资源 if (!tryAcquire(arg) && // 当前线程为获取到锁资源,加入等待队列,同时挂起线程,等待唤醒 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

3.6 tryAcquire()

tryAcquire()方法在FairSync、NonFairSync中均有实现,尝试获取锁资源,核心代码如下:

// 公平锁 FairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取AQS的 state int c = getState(); // state == 0 当前没有线程占用锁资源 if (c == 0) { // 判断是否有线程在排队,若有线程在排队,返回true if (!hasQueuedPredecessors() && // 尝试抢锁 compareAndSetState(0, acquires)) { // 无线程排队,将线程属性设置为当前线程 setExclusiveOwnerThread(current); return true; } } // state != 0 有线程占用锁资源 // 占用锁资源的线程是否为当前线程 else if (current == getExclusiveOwnerThread()) { // state + 1 int nextc = c + acquires; // 锁重入超出最大限制 (int的最大值),抛异常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 将 state + 1 设置给 state setState(nextc); // 当前线程拿到锁资源,返回true return true; } return false; } // 非公平锁 NonFairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 非公平锁 Sync#nonfairTryAcquire() 方法 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取AQS的 state int c = getState(); // 无线程占用锁资源 if (c == 0) { // CAS 修改 state 的值,修改成功,设置线程属性为当前线程,返回占用锁资源标识 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 有线程占用锁资源 // 占用锁资源的线程是当前线程(重入) else if (current == getExclusiveOwnerThread()) { // AQS 的 state + acquires int nextc = c + acquires; // 超出锁重入的上限(int的最大值),抛异常 if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 将 state + acquires 设置到 state 属性 setState(nextc); return true; } return false; }

获取当前线程AQS的state:

  • AQS的state属性值为0,表示无线程占用锁资源,判断等待队列中是否有线程在排队,若有线程在排队,返回尝试抢锁失败标识,将线程添加进等待队列中。
  • 若state属性值不为0,判断持有锁资源的线程是否为当前线程,若为当前线程,AQS的state属性值 + 1,返回尝试抢锁成功标识。

公平锁与非公平锁的整体实现流程类似,唯一不同的是,AQS的state属性值为0,无线程占用锁资源时,非公平锁不会判断是否有线程在等待队列中排队,而是直接通过CAS抢锁。

3.7 addWaiter()

为当前线程创建入队节点AbstractQueuedSynchronizer$Node,入参mode表示锁类型,在AQS的静态内部类Node中有SHARE、EXCLUSIVE两个属性,SHARE代表共享锁、EXCLUSIVE代表排它锁。

AbstractQueuedSynchronizer#addWaiter() 核心代码:

// 等待队列的尾节点,懒加载,只能通过enq方法添加节点 private transient volatile Node tail; private Node addWaiter(Node mode) { // 当前线程、获取的锁类型封装为Node对象 Node node = new Node(Thread.currentThread(), mode); // 获取等待队列的尾节点 Node pred = tail; // 尾节点不为null if (pred != null) { // 将当前节点设置为等待队列的尾节点 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 等待队列为空,初始化等待队列节点信息 enq(node); // 返回当前线程节点 return node; }

等待队列不为空,将当前线程封装的Node节点添加进队列尾部;

若等待队列为空,先初始化等待队列,然后在将Node节点添加进队列尾部。

3.8 enq()

等待队列尾节点为空时,执行enq()方法初始化等待队列,并将Node节点添加进等待队列中。

private Node enq(final Node node) { for (;;) { // 获取等待队列的尾节点 Node t = tail; // 等待队列为空,初始化等待队列 if (t == null) { // 初始化等待队列头尾节点 if (compareAndSetHead(new Node())) tail = head; } else { // 当前线程的Node添加到等待队列中 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

3.9 acquireQueued()

当前线程是否挂起,AbstractQueuedSynchronizer#acquireQueued() 核心代码:

final boolean acquireQueued(final Node node, int arg) { // 获取锁资源标识 boolean failed = true; try { boolean interrupted = false; // 自旋 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 当前节点的前驱节点为头节点,并获取锁资源成功 if (p == head && tryAcquire(arg)) { // 将当前节点设置到head - 头节点 setHead(node); // 原头节点的下一节点指向设置为null,GC回收 p.next = null; // 设置获取锁资源成功 failed = false; // 不管线程GC return interrupted; } // 如果当前节点不是head的下一节点,获取锁资源失败,尝试将线程挂起 if (shouldParkAfterFailedAcquire(p, node) && // 线程挂起, UNSAFE.park() parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

查看当前排队的Node是否是head的next, 如果是,尝试获取锁资源, 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())。

3.10 shouldParkAfterFailedAcquire

检查并更新未成功获取锁资源的状态,返回true表示线程被挂起。

AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() 核心代码:

static final class Node { // 线程被取消 static final int CANCELLED = 1; // 等待队列中存在待被唤醒的挂起线程 static final int SIGNAL = -1; // 当前线程在Condition队列中,未在AQS对列中 static final int CONDITION = -2; // 解决JDK1.5的BUG。共享锁在释放资源后,若头节点为0,无法确定真的没有后继节点 // 如果头节点为0,需要将头节点的状态改为 -3 ,当最新拿到锁资源的线程查看 // 是否有后继节点并且为当前锁为共享锁,需唤醒排队的线程。 static final int PROPAGATE = -3; } // 获取锁资源失败,挂起线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取当前节点的上一个节点的状态 int ws = pred.waitStatus; // 上一节点被挂起 if (ws == Node.SIGNAL) // 返回true,挂起当前线程 return true; if (ws > 0) { // 上一节点被取消,获取最近的线程挂起节点, // 并将当前节点的上一节点指向最近的线程挂起节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 最近线程挂起节点的下一节点指向当前节点 pred.next = node; } else { // 上一节点状态小于等于0,存在线程处于等待状态,但未被挂起的场景 // 通过CAS将处于等待的线程挂起,避免在挂起前节点获取到锁资源 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回true,不挂起当前线程 return false; }

在挂起线程前,确认当前节点的上一个节点的状态。

  • 若为1,代表是取消的节点,不能挂起;
  • 若为-1,代表后续节点中有挂起的线程;
  • 若为-2 (线程在等待队列 - Condition队列中)、-3 (避免线程无法唤醒的一个状态),需要将状态改为-1之后,才能挂起当前线程。

3.11 unlock()释放锁

释放锁,ReentrantLock#unlock() 核心代码:

// 释放锁 public void unlock() { sync.release(1); }

unlock方法实际调用的是AQS的release方法,AbstractQueuedSynchronizer#release() 核心代码:

// 等待队列的头节点,懒加载,通过setHead方法初始化 private transient volatile Node head; // 释放锁 public final boolean release(int arg) { // 当前线程释放锁资源的计数值 if (tryRelease(arg)) { // 当前线程玩去释放锁资源,获取等待队列头节点 Node h = head; if (h != null && h.waitStatus != 0) // 唤醒等待队列中待唤醒的节点 unparkSuccessor(h); // 完全释放锁资源 return true; } // 当前线程未完全释放锁资源 return false; }

3.12 tryRelease()

释放锁,Reenttrant$Sync#tryRelease()的核心代码:

// 释放锁 protected final boolean tryRelease(int releases) { // 修改 AQS 的 state int c = getState() - releases; // 当前线程不是持有锁的线程,抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否成功的将锁资源完全释放标识 (state == 0) boolean free = false; // 锁资源完全释放 if (c == 0) { // 修改标识 free = true; // 将占用锁资源的属性设置为null setExclusiveOwnerThread(null); } // state赋值 setState(c); // 返回true表示当前线程完全释放锁资源; // 返回false标识当前线程是由锁资源,持有计数值减少 return free; }

4、ReentrantLock非公平锁执行流程

四、Semaphore

Semaphore基于 AQS + CAS 实现的,可根据构造参数的布尔值,选择使用公平锁,还是非公平锁。Semaphore默认使用非公平锁。

Semaphore详情如下:

1、Semaphore构造函数

// AQS的实现 private final Sync sync; // 默认使用非公平锁 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 根据fair布尔值选择使用公平锁还是非公平锁 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

2、公平锁与非公平锁

Semaphore中公平锁与非公平锁的实现,可以在tryAcquireShared()方法中找到两种锁的区别。

3、NonfairSync

Semaphore#NonfairSync#tryAcquireShared() 详情如下

// 非公平锁 获取信号量 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }

Semaphore#Sync#nonfairTryAcquireShared() 详情如下

// 非公平锁 获取信号量 final int nonfairTryAcquireShared(int acquires) { // 自旋 for (;;) { // 获取Semaphore中可用的信号量数 int available = getState(); // 当前可用信号量数 - acquires int remaining = available - acquires; // 可用信号量数不足 或 CAS操作获取信号量失败,返回 当前可用信号量数 - acquires if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

4、FairSync

Semaphore#FairSync#tryAcquireShared() 详情如下

protected int tryAcquireShared(int acquires) { // 自旋 for (;;) { // 等待队列中挂起线程,返回-1 (根据返回的-1,将当前线程添加到等待队列中) if (hasQueuedPredecessors()) return -1; // 尝试获取Semaphore的信号量,下面与非公平锁逻辑相同 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

5、acquire()

Semaphore默认实现的是非公平锁,acquire()按非公平锁的实现进行源码分析。

Semaphore 中获取一个信号量,Semaphore#acquire() 详情如下:

// Semaphore 中无信号量,阻塞 public void acquire() throws InterruptedException { // 获取 Semaphore 信号量 sync.acquireSharedInterruptibly(1); }

AbstractQueuedSynchronizer#acquireSharedInterruptibly() 详情如下:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 线程中断,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取Semaphore的信号量 if (tryAcquireShared(arg) < 0) // 尝试获取信号量失败,再次获取Semaphore信号量 doAcquireSharedInterruptibly(arg); }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 自旋 for (;;) { final Node p = node.predecessor(); // 当前节点的前驱节点为等待队列头节点 if (p == head) { // 尝试获取信号量 int r = tryAcquireShared(arg); // 获取信号量成功 if (r >= 0) { // 唤醒等待队列中的待唤醒线程 setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } // 获取信号量失败,挂起线程 ==> 线程阻塞,待唤醒进行下一轮自旋 if (shouldParkAfterFailedAcquire(p, node) && // 若当前线程被中断,抛出InterruptedException异常 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

AbstractQueuedSynchronizer#setHeadAndPropagate()

// node: 当前节点;propagate 剩余资源 private void setHeadAndPropagate(Node node, int propagate) { // 获取等待队列中的头节点 Node h = head; // 将当前Node节点设置为等待队列的头节点 setHead(node); // 剩余资源大于0 || 原等待队列中的头节点为null || 原等待队列中 Node 的 ws 为 -1 或者 -3(共享锁) if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取当前等待队列头节点的后继节点 Node s = node.next; // 当前节点的后继节点为null 或 当前节点的后继节点为共享锁 if (s == null || s.isShared()) doReleaseShared(); } }

6、release()

Semaphore默认实现的是非公平锁,release()按非公平锁的实现进行源码分析。

归还Semaphore的信号量,Semaphore#release() 详情如下:

// 归还Semaphore的信号量 public void release() { sync.releaseShared(1); }

Semaphore#Sync#releaseShared() 详情如下:

public final boolean releaseShared(int arg) { // 尝试归还信号量 if (tryReleaseShared(arg)) { // 归还信号量 doReleaseShared(); // 归还成功 return true; } // 归还失败 return false; }

Semaphore#Sync#releaseShared() 详情如下:

// 尝试归还信号量 protected final boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取Semaphore中可用的信号量数 int current = getState(); // 当前可用信号量数 + 归还的信号量 releases int next = current + releases; // 超出了int的最大值,变成了负数 if (next < current) throw new Error("Maximum permit count exceeded"); // cas操作,将信号量归还给Semaphore if (compareAndSetState(current, next)) return true; } }

归还信号量成功,唤醒等待队列中的挂起线程,AbstractQueuedSynchronizer#doReleaseShared() :

private void doReleaseShared() { // 自旋 for (;;) { // 获取等待队列头节点 Node h = head; // 等待队列中有排队的线程 if (h != null && h != tail) { int ws = h.waitStatus; // 等待队列头节点ws = -1,说明其后继节点中有待唤醒的线程 if (ws == Node.SIGNAL) { // cas 操作,等待队列头节点的 ws 由 -1 更新为 0 ,cas失败,继续下一次自旋 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒头节点的后继节点中待唤醒线程 unparkSuccessor(h); } // 解决共享锁JDK1.5的bug,头节点的 ws 为0,将头节点的 ws 设置为 -3 ,代表后继节点中可能有待唤醒的线程 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }

7、总结

不难看出,公平锁与非公平锁的区别在于当线程尝试获取Semaphore中的信号量时:

  • 公平锁,优先判断等待队列中是否有挂起的线程,如果有,则将当前线程添加到等待队列中,等待唤醒后抢夺信号量;
  • 非公平锁,不管等待队列中是否有挂起线程,优先尝试获取信号量,获取失败,将当前线程添加到等待队列。

五、CountDownLatch

CountDownLatch让一个或多个线程等待其他线程执行完成后再执行。在创建CountDownLatch对象时,必须指定线程数count,每当一个线程执行完成调用countDown()方法,线程数count减1,当count减到0时,await()方法就不再阻塞。

CountDownLatch详情

1、构造函数

CountDownLatch没有无参构造函数,在有参构造函数中初始化了sync属性。

public CountDownLatch(int count) { // count 合法校验 if (count < 0) throw new IllegalArgumentException("count < 0"); // 初始化sync属性 this.sync = new Sync(count); }

2、Sync - 队列同步器

// 抽象队列同步器 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 将 count 赋值给 AQS 的 state 属性 Sync(int count) { setState(count); } // 获取 AQS 的 state 属性 int getCount() { return getState(); } // 判断所有线程是否都执行完成, 1 -> 全部执行完成;-1 -> 仍有线程在执行 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 释放锁 protected boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取 AQS 的 state int c = getState(); // 锁资源已经释放完毕,再次进入,直接返回false,什么也不做 if (c == 0) return false; // state - 1 int nextc = c-1; // CAS 赋值操作 if (compareAndSetState(c, nextc)) // 最后一个线程执行完,state = 0 ,返回true。 // countDown() 唤醒等待队列中的其他挂起线程 return nextc == 0; } } }

3、await() - 阻塞等待

// AQS的state属性不为0, 阻塞 public void await() throws InterruptedException { // 调用AQS提供的获取共享锁并允许中断的方法 sync.acquireSharedInterruptibly(1); }

AbstractQueuedSynchronizer#acquireSharedInterruptibly(),详情如下:

// 获取共享锁,并允许其中断 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 线程中断,抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 获取共享锁,由CountDownLatch实现 if (tryAcquireShared(arg) < 0) // state > 0,说明有线程在持有锁资源,将当前线程添加到AQS等待队列中 doAcquireSharedInterruptibly(arg); }

CountDownLatch#Sync#tryAcquireShared(),详情如下:

// 获取共享锁 protected int tryAcquireShared(int acquires) { // 线程全部执行完成,返回 1;未全部执行完成,返回-1 return (getState() == 0) ? 1 : -1; }

AbstractQueuedSynchronizer#acquireSharedInterruptibly(),详情如下:

// 将当前线程添加到AQS等待队列中 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 当前线程封装成Node,添加到AQS等待队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 自旋 for (;;) { // 获取当前线程节点的前驱节点 final Node p = node.predecessor(); // 前驱节点为等待队列头节点 if (p == head) { // 调用 CountDownLatch 实现的方法 int r = tryAcquireShared(arg); // 返回值为1,表示 state 为 0 ,所有线程都释放了锁,无其他线程持有锁资源 if (r >= 0) { // state = 0,将当前线程和后面所有排队的线程都唤醒。 setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } // *** 线程在此处被挂起,待所有线程释放锁资源后,即state = 0 ,线程被唤醒,再继续往下执行 // 挂起获取锁资源失败的线程,并且挂起的线程被中断,抛出InterruptedException异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

4、countDown() - 释放锁资源

// countDown方法, 实际上调用了AQS的释放共享锁操作 public void countDown() { sync.releaseShared(1); }

AbstractQueuedSynchronizer#releaseShared(),详情如下:

// AQS提供的释放共享锁方法,CountDownLatch实现了 tryReleaseShared 方法 public final boolean releaseShared(int arg) { // 尝试释放锁资源 if (tryReleaseShared(arg)) { // 没有线程持有锁资源,唤醒等待队列中的其他挂起线程 doReleaseShared(); return true; } return false; }

CountDownLatch#Sync#tryReleaseShared(),详情如下:

protected boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取当前持有锁资源的线程数 int c = getState(); // state已为0,返回false,那么再次执行countDown,什么事情也不做 if (c == 0) return false; // count - 1 int nextc = c-1; // CAS 完成赋值操作 if (compareAndSetState(c, nextc)) // 没有线程持有锁资源,返回true return nextc == 0; } }

AbstractQueuedSynchronizer#doReleaseShared(),详情如下:

// 没有线程持有锁资源的处理 private void doReleaseShared() { // 自旋 for (;;) { // 获取等待队列的头节点 Node h = head; // 等待队列中有挂起线程待唤醒 if (h != null && h != tail) { int ws = h.waitStatus; // 线程待唤醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒线程 unparkSuccessor(h); } // CAS失败 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 等待队列头节点被改变,结束循环 if (h == head) break; } }

5、总结

CountDownLatch基于 AQS + CAS 实现,CountDownLatch的构造函数中必须指定count,同时初始继承AQS的内部类Sync,通过Sync对象将count赋值给AQS的state属性,这样就可以基于AQS提供的方法完成CountDownLatch的功能。

  • 调用countDown()方法,实际上是将AQS中 state 减 1。所有线程执行完成,state 会被修改为 0 ,在countDown()中会唤醒等待队列中挂起的线程。
  • 调用await()方法,实际上是判断AQS中的 state 是否为 0。state > 0,表示有线程仍在执行,此时await()会阻塞线程。当最后一个线程执行结束,state 变为 0,countDown()唤醒线程后,await()正常执行结束,不再阻塞。

六、CyclicBarrier

与CountDownLatch、Semaphore直接基于AQS实现不同,CyclicBarrier 是基于 ReentrantLock + ConditionObject 实现的,间接基于AQS实现的。

1、CyclicBarrier内部结构

  • Generation,静态内部类,持有布尔类型的属性broken,默认为false,只有在重置方法reset()、执行出现异常或中断调用breakBarrier() ,属性会被设置为true。
  • nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。
  • breakBarrier() 任务执行中断、异常、被重置,将Generation中的布尔类型属性设置为true,将Waiter队列中的线程转移到AQS队列中,待执行完unlock方法后,唤醒AQS队列中的挂起线程。
  • await() :CyclicBarrier的核心方法,计数器递减处理。

2、构造函数

构造参数重载,最终调用的是CyclicBarrier(int, Runnable),详情如下:

public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { // 参数合法性校验 if (parties <= 0) throw new IllegalArgumentException(); // final修饰,所有线程执行完成归为或重置时 使用 this.parties = parties; // 在await方法中计数值,表示还有多少线程待执行await this.count = parties; // 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程 this.barrierCommand = barrierAction; }

CyclicBarrier属性

3、await()

在CyclicBarrier中,await有重载方法。

  • await()表示会一直等待指定数量的线程未准备就绪(执行await方法)
  • await(timout, unit)表示等待timeout时间后,指定数量的线程未准备就绪,抛出TimeoutException超时异常

CyclicBarrier#await 详情如下:

// 执行没有超时时间的await public int await() throws InterruptedException, BrokenBarrierException { try { // 执行dowait() return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } // 执行有超时时间的await public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }

await最终调用dowait()方法,CyclicBarrier#dowait 详情如下:

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 获取锁对象 final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { // 获取generation对象 final Generation g = generation; // 这组线程中在执行过程中是否异常、超时、中断、重置 if (g.broken) throw new BrokenBarrierException(); // 这组线程被中断,重置标识与计数值, // 将Waiter队列中的线程转移到AQS队列,抛出InterruptedException if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 计数值 - 1 int index = --count; // 这组线程都已准备就绪 if (index == 0) { // 执行结果标识 boolean ranAction = false; try { // 若使用2个参数的有参构造,就传入了自实现任务,index == 0,先执行CyclicBarrier有参的任务 // 此处设计与 FutureTask 构造参数设计类似 final Runnable command = barrierCommand; if (command != null) // 执行任务 command.run(); // 执行完成,设置为true ranAction = true; // CyclicBarrier属性归位 nextGeneration(); return 0; } finally { // 执行过程中出现问题 if (!ranAction) // 重置标识与计数值,将Waiter队列中的线程转移到AQS队列 breakBarrier(); } } // -- 之后,count不为0,表示还有线程在等待 // 自旋 直到被中断、超时、异常、count = 0 for (;;) { try { // 未设置超时时间 if (!timed) // 挂起线程,将线程转移到 Condition 队列 trip.await(); // 未达到等待时间 else if (nanos > 0L) // 挂起线程,并返回剩余等待时间 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 中断异常 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 线程中断 Thread.currentThread().interrupt(); } } // 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; // 超时,抛出异常TimeoutException if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放锁资源 lock.unlock(); } }

4、breakBarrier()

结束CyclicBarrier的执行

// 结束CyclicBarrier的执行 private void breakBarrier() { // 设置线程执行过程中是否异常、中断、重置标识 generation.broken = true; // 重置计数值 count = parties; // 将Condition队列中的Node转移到AQS队列中,等到执行完unlock,AQS队列中的挂起线程会被唤醒 // 有后继节点的,设置ws = -1; // 无后继节点的,设置ws = 0 trip.signalAll(); }

5、reset()

重置CyclicBarrier

// 重置CyclicBarrier public void reset() { // 获取锁对象 final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { // 设置当前generation属性,并将Waiter队列中线程转移到AQS队列 breakBarrier(); // 重置generation 属性、计数值 nextGeneration(); } finally { // 释放锁 lock.unlock(); } }

6、nextGeneration()

CyclicBarrier归位

private void nextGeneration() { // 将Waiter队列中线程转移到AQS队列 trip.signalAll(); // 计数值、generation 归位 count = parties; generation = new Generation(); }

7、总结

CyclicBarrier基于 ReentrantLock + ConditionObject实现,CyclicBarrier的构造函数中必须指定parties,同时对象generation,内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。

  • parties是初始待执行线程数,在构造函数中会将parties赋给计数值count,每当一个线程执行await(),count就会减1。
  • 当count被减为0时,代表所有线程都准备就绪,此时判断构造函数是否初始化了barrierCommand属性,若对barrierCommand属性做了赋值,优先执行barrierCommand任务;
  • barrierCommand任务执行完成,再将Waiter队列中的线程转移到AQS队列中,执行完unlock,唤醒AQS队列中的线程;计数值count、generation归位。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询