JAVA并发之AQS详解

1. 概念

在分析 Java 并发包java.util.concurrent源码的时候,少不了需要了解AbstractQueuedSynchronizer(以下简写AQS)这个抽象类,因为它是Java并发包的基础工具类,是实现ReentrantLock、CountDownLatch、Semaphore、FutureTask等类的基础,许多实现都依赖其所提供的队列式同步器。

AQS的功能可以分为两类:独占锁和共享锁。它的所有子类中,要么实现并使用了它独占锁的API,要么使用了共享锁的API,而不会同时使用两套API,即便是它最有名的子类ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的,到目前为止,我们只需要明白AQS在功能上有独占锁和共享锁两种功能即可。

1.1 如何使用AQS

AQS管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如, Semaphore用它来表现剩余的许可数,ReentrantLock用它来表现拥有它的线程已经请求了多少次锁;FutureTask用它来表现任务的状态(尚未开始、运行、完成和取消)。

AQS有众多方法,大致可以分为两类:

1.1.1 需要子类实现的方法(模板方法)

  1. tryAcquire(int arg):独占式的获取锁,返回值是boolean类型的,true代表获取锁,false代表获取失败。

  2. tryRelease(int arg):释放独占式同步状态,释放操作会唤醒其后继节点获取同步状态。

  3. tryAcquireShared(int arg):共享式的获取同步状态,返回大于0代表获取成功,否则就是获取失败。

  4. tryReleaseShared(int arg):共享式的释放同步状态。

  5. isHeldExclusively():判断当前的线程是否已经获取到了同步状态。

这些方法是子类实现时可能实现的方法,通过上面的这些方法来判断是否获取了锁,然后再通过AQS本身的方法执行获取锁与未获取锁的过程。

以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法,支持独占(排他)获取锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively而支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared。

1.1.2 AQS本身的实现的方法

  1. acquire(int arg)/acquireInterruptibly(int arg):独占式的获取锁操作,独占式获取同步状态都调用这个方法,通过子类实现的tryAcquire方法判断是否获取了锁。Interruptibly后缀的方法带有中断异常的签名,表示可以响应中断异常,无此后缀的acquire方法则通过重新标记中断状态的方式响应中断。

  2. acquireShared(int arg)/acquireSharedInterruptibly:共享式的获取锁操作,在读写锁中用到,通过tryAcquireShared方法判断是否获取到了同步状态。Interruptibly后缀的方法带有中断异常的签名,表示可以响应中断异常,无此后缀的acquire方法则通过重新标记中断状态的方式响应中断。

  3. release(int arg):独占式的释放同步状态,通过tryRelease方法判断是否释放了独占式同步状态。

  4. releaseShared(int arg):共享式的释放同步状态,通过tryReleaseShared方法判断是否已经释放了共享同步状态。

从这两类方法可以看出,AQS为子类定义了一套获取锁和释放锁以后的操作,而具体的如何判断是否获取锁和释放锁都是交由不同的子类自己去实现其中的逻辑,这也是Java设计模式之一:模板模式的实现。有了AQS我们就可以实现一个属于自己的Lock。

2.源码分析(基于jdk1.8)

2.1 AQS类

首先是类图↓

从图中可以看出来,AbstractQueuedSynchronizer内部维护了一个Node节点类和一个ConditionObject内部类。Node内部类是一个双向的FIFO队列,用来保存阻塞中的线程以及获取同步状态的线程,而ConditionObject对应的是后面要讲的Lock中的等待和通知机制。↓

我们可以看到,AQS类是JUC框架的基石。为什么这么说?我们以ReentrantLock为例,ReentrantLock把所有Lock接口的操作都委派到一个自定义的内部类Sync类上,该类继承自AbstractQueuedSynchronizer。同时该类又有两个子类,NonfairSync 和 FairSync,实现非公平锁和公平锁。↓

1
2
公平锁:线程获取锁的顺序和调用lock的顺序一样,FIFO,先到先得;  
非公平锁:线程获取锁的顺序和调用lock的顺序无关,全凭运气。

同样的,CountDownLatch、Semaphore等其他类,也自定义了自己的Sync类和NonfairSync和FairSync,以达到功能的差异化。

2.2 AQS的属性

2.2.1 状态位state

AQS用的是一个32位的整型来表示同步状态的,它是用volatile修饰的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* The synchronization state.
* 在互斥锁中它表示着线程是否已经获取了锁,0未获取,1已经获取了,大于1表示重入数。
* 同时AQS提供了getState()、setState()、compareAndSetState()方法来获取和修改该值:
*/
private volatile int state;

protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.2.2 当前持有独占锁的线程

1
2
3
4
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

2.2.3 获取锁的阻塞队列——CLH同步队列

2.2.3.1 head和tail属性

AQS内部维护着一个FIFO的CLH队列,用来保存阻塞中的线程以及获取同步状态的线程,每个node都封装着一个独立的线程,head指向的node可以简单理解为当前持有锁的线程,tail指向了等待队列的链尾。正因为head指向当前持有锁的线程,所以,真正的等待队列,不包括head

1
2
3
4
5
6
7
8
9
10
11
12
13
14

/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail

因为是CLH队列,所以AQS并不支持基于优先级的同步策略。至于为何要选择CLH队列,主要在于CLH锁相对于MSC锁,他更加容易处理cancel和timeout,同时他进出队列快、检查是否有线程在等待也非常容易(head != tail,头尾指针不同)。当然相对于原始的CLH队列锁,ASQ采用的是一种变种的CLH队列锁

  1. 原始CLH使用的locked自旋,而AQS的CLH则是在每个node里面使用一个状态字段来控制阻塞,而不是自旋。

  2. 为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。

  3. head结点使用的是傀儡结点。虽然node对象(封装了线程)在获取到锁的时候,逻辑会将这个node置为head,看起来head表示的是当前正在拥有锁的node节点的意思。但看setHead方法就能知道,node赋值为head后,node封装的thread对象被清空,node成为一个空对象。

我们来看看这个队列结点的实现:

2.2.3.2 node实现

我们来看看node的源码 ↓

众多字段,我们一个一个来看:

1
2
3
4
5
6
/**共享模式是允许多个线程可以获取同一个锁,而独占模式则一个锁只能被一个线程持有,其他线程必须要等待。**/

// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;

下面的几个int常量是给waitStatus字段使用的,表示节点现在的状态

1
2
3
4
5
/**代表此线程取消了争抢这个锁
*场景:当该线程等待超时或者被中断,需要从同步队列中取消等待,则该线程被置1,即被取消(这里该线程在取消之前是等待状态)
*被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列
**/
static final int CANCELLED = 1;
1
2
3
4
5
6
/** waitStatus value to indicate successor's thread needs unparking 
* 场景:后继的节点处于等待状态,当前节点的线程如果释放了同步状态或者被取消(当前节点状态置为-1),
* 将会唤醒后继节点,使后继节点的线程得以运行;
* 当一个节点的状态为SIGNAL时就意味着在等待获取同步状态
**/
static final int SIGNAL = -1;
1
2
3
4
/** waitStatus value to indicate thread is waiting on condition
* 场景:节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,CONDITION状态的结点将从Condition等待队列转移到同步队列中,等待获取同步锁。
*/
static final int CONDITION = -2;
1
2
3
4
5
6
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
* 场景:表示下一次共享式同步状态获取将会被无条件的被传播下去(读写锁中存在的状态,代表后续还有资源,可以多个线程同时拥有同步状态)
*/
static final int PROPAGATE = -3;

然后就是状态字段的主角了,上面的这些常量,都是给该字段赋值用的 ↓

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

前面说过,在AQS中,我们维护了一个链表,故而node节点中,也定义了前后驱。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
* 前驱节点,当节点加入同步队列的时候被设置(尾部添加)
*/
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
* 后继节点
*/
volatile Node next;

当然,还有node节点的最重要主角:被封装的线程

1
2
3
4
5
6
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
* 节点封装的线程
*/
volatile Thread thread;

还有一个很特殊的后驱节点,这个后驱,负责维护node节点参与的第二个链表(第一个就是AQS的同步等待链表)——condition等待链表,至于什么是condition,我们最后再来讨论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
* 指向condition等待队列中的下一个节点,或特殊值SHARED。
* 因为condition等待队列只有在保持独占模式时才被访问,
* 所以我们只需要一个简单的链表来在节点等待condition时保存节点。
* 然后将它们转移到队列中以重新获取。 并且因为条件只能是独占的,所以我们通过使用特殊值来指示共享模式来保存字段。
*/

// nextWaiter还有一个作用,是区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记
// 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;
// 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。
Node nextWaiter;

你可以把node节点简单看作 thread + waitStatus + pre + next 四个属性的封装,从本质上来说,这是没错的,node几乎所有的api也都服务于这四个属性。

2.3 AQS的独占锁实现(以ReentrantLock的公平锁和非公平锁为例)

从上图可以看到,AQS的实现有许多种,我们以最典型的在ReentrantLock类内部定义的公平锁FairSync和非公平锁NonFairSync为例,来探讨一下AQS独占模式的同步原理。(ReentrantLock是典型的独占锁,真正管理锁的也是其内部实现类FairSync或者NonFairSync)

独占锁是独占的,排他的,因此在独占锁中有一个exclusiveOwnerThread属性,用来记录当前持有锁的线程。

我们一般怎么使用ReentrantLock呢?很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void needLockFunction() {
// 比如我们对下面这段逻辑加锁
reentrantLock.lock();
// 通常,lock 之后紧跟着 try 语句
try {
// 这块代码同一时间只能有一个线程进来(获取到锁的线程),
// 其他的线程在lock()方法上阻塞,等待获取到锁,再进来
// 执行代码...
// 执行代码...
// 执行代码...
} finally {
// 释放锁
reentrantLock.unlock();
}
}

所以,我们可以从lock方法看起

2.3.1 公平锁的加锁逻辑

reentrantLock的lock方法调用的是reentrantLock内部的sync字段的lock方法,sync字段在reentrantLock的构造方法中就开始初始化默认是非公平锁:

ReentrantLock默认使用非公平锁是基于性能考虑,公平锁为了保证线程规规矩矩地排队,需要增加阻塞和唤醒的时间开销。如果直接插队获取非公平锁,跳过了对队列的处理,速度会更快

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

2.3.1.1 FairSync.lock()

我们来看看FairSync的lock(),很简单直接调用了acquire(); ↓

1
2
3
final void lock() {
acquire(1);
}

2.3.1.2 AQS.acquire()

所以lock()的重点都在acquire(),FairSync调用了AQS类中实现的acquire();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {//记住此时arg=1
// 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。方法return,逻辑继续执行。
// 否则,acquireQueued方法会将当前线程压到队列中,阻塞在里面

//首先执行tryAcquire(1)一下,名字上就知道,这个只是试一试
// 因为有可能直接就成功了呢,也就不需要进队列排队了。
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//能到这里,说明当前线程已经从acquireQueued中返回了,即出阻塞队列了
//此时当前线程是被其他线程唤醒的,被设置了中断状态。

//selfInterrupt方法就是调用:Thread.currentThread().interrupt();中断当前线程
//之所以有这个逻辑,是因为acquire方法没有中断异常的签名,
//所以为了外层能继续响应中断,需要感应到acquireQueued内的中断,并在这里重新设置中断状态
//可抛出中断异常的acquire方法为acquireInterruptibly(),除了中断处理不同以外,其他实现与acquire大同小异。
//同理,其他带有Interruptibly后缀的方法,都是原方法的中断模式实现。
selfInterrupt();
}

一句话总结,AbstractQueuedSynchronizer.acquire()方法的作用是:先尝试获取锁,若成功则不用进队列阻塞,逻辑往下走(其实就是返回了)。否则封装当前线程为node,塞进队列,然后在acquireQueued方法中一直尝试,先期会自旋,如果在自旋期间内获得锁了,那么返回,返回结果是false,表示不需要调用selfInterrupt()做自我中断。如果是阻塞后才获取锁,返回,返回结果是true,表示要设置自我中断。(只是设置中断状态,至于到底何时中断,由线程本身决定)


2.3.1.3 FairSync.tryAcquire()

AQS类中实现的acquire()又调用了FairSync中实现的tryAcquire(1),我们来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
    /**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取AQS的状态,我们之前说过,0未加锁,1已经加锁了,大于1表示重入数。
int c = getState();
//如果AQS未加锁
if (c == 0) {
// 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
// 看看有没有别人在队列中等了半天了,没有才改变状态
//hasQueuedPredecessors()根据 当前线程是否是等待队列的第一个 来判断是否有等待更久的节点。
//记住,我们说过,等待队列不包括头结点head
//所以hasQueuedPredecessors判断的是当前线程是否为head的后驱next
if (!hasQueuedPredecessors() &&
//执行到这里,意味着队列中没有更老的节点,那么CAS置换状态为1或者大于1(重入时大于1)
compareAndSetState(0, acquires)) {
//设置拥有独占锁的线程是当前线程
//代码就一行,exclusiveOwnerThread = thread;
setExclusiveOwnerThread(current);
//获取到锁
return true;
}
}
//否则,说明AQS是被持有锁的状态,先判断持有锁的是不是自己
else if (current == getExclusiveOwnerThread()) {
//如果是自己,那么说明自己重入了,status+1
int nextc = c + acquires;
if (nextc < 0)
//nextc只有在重入次数超过int值上限,导致溢出为负时才会达到,报错超过最大锁计数
throw new Error("Maximum lock count exceeded");
setState(nextc);
//获取到锁
return true;
}
//上述逻辑都不满足,未获取到锁
return false;
}

一句话总结,FairSync.tryAcquire()方法的作用是:如果未加锁,那么判断自己是不是队列的头名,若是,设置独占锁线程,获得锁。否则,判断加锁的人是不是自己,如果是,那么重入,status+1,设置独占锁线程,获得锁。再否则,返回false,占锁失败。


2.3.1.2 AQS.addWaiter()

看完了tryAcquire方法,我们知道在acquire方法中,如果tryAcquire方法返回false,即没有获取到锁,那么将会执行addWaiter,将当前线程封装为node,addWaiter()是AbstractQueuedSynchronizer的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
//mode值有Node.EXCLUSIVE 和Node.SHARED,表示封装为独占模式还是分享模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//先使用快速入列法来尝试一下,如果失败,则进行更加完备的入列算法.
//只有在必要的情况下才会使用更加复杂耗时的算法,也就是乐观的态度
Node pred = tail;//tail为队尾指针
if (pred != null) {
//进行入列操作
node.prev = pred;//该节点的前趋指针指向tail
if (compareAndSetTail(pred, node)) {//cas将尾指针指向该节点
pred.next = node;//如果成功,让旧列尾节点的next指针指向该节点
return node;
}
}
//cas失败,或在pred == null时调用enq
enq(node);
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private Node enq(final Node node) {
for (;;) { //cas无锁算法的标准for循环,不停的尝试,直到成功入队
Node t = tail;
if (t == null) { //t == null为真的话,说明队列为空,要初始化一个空队列,即只存在一个哨兵node的队列
//还是那句话,head是一个哨兵的作用,并不代表某个要获取锁的线程节点,所以并没有将node赋给head
//而是new了一个无关紧要的新node
//compareAndSetHead方法,用cas,期望head为null时将其更新为new Node()
if (compareAndSetHead(new Node()))
tail = head;
//注意,这里没有return,执行完后,还是要继续for循环,下一次,必定走else逻辑。
} else {
//和addWaiter中一致,不过有了外侧的无限循环,不停的尝试,相当于自旋锁
//将node的前驱改为原来的队尾node
node.prev = t;
//新创建的节点指向队列尾节点,毫无疑问并发情况下这里会有多个新创建的节点指向队列尾节点
//基于这一步的CAS,不管前一步有多少新节点都指向了尾节点,这一步只有一个能真正入队成功
//其他的都必须重新执行循环体
if (compareAndSetTail(t, node)) {//改变队尾指针的值
t.next = node;//原本的队尾的后驱设为node
return t;
}
}
}
}

一句话总结,AbstractQueuedSynchronizer.addWaiter()方法的作用是将当前线程封装为node,并将node节点塞入等待队列,塞入逻辑包括节点前后驱,head和tail指针的维护,以及必要时对空列表的初始化。然后返回封装好的node。


2.3.1.3 AQS.acquireQueued()

回到acquire方法

1
2
3
4
5
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//该方法就是调用:Thread.currentThread().interrupt();中断当前线程
selfInterrupt();

addWaiter()将当前线程封装为node,并将node节点塞入等待队列,紧接着,执行的是AbstractQueuedSynchronizer.acquireQueued()方法,这个方法就是入列后的node节点在队列中等待的逻辑,是自旋等待还是阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//不会一直自旋,如果判断还没轮到自己,那么线程会阻塞在这个for循环中的parkAndCheckInterrupt里面
for (;;) {
final Node p = node.predecessor();
//node的前驱p是head,就说明,node是将要获取锁的下一个节点.
if (p == head && tryAcquire(arg)) {//所以再次尝试获取锁
setHead(node);//如果成功,那么就将自己设置为head,前文说过,node会被置为傀儡,然后赋给head
p.next = null; // help GC
failed = false;
return interrupted;
//此时,还没有进入阻塞状态,所以直接返回false,表示不需要中断调用后面的selfInterrupt函数
//此时方法返回后,在acquire方法内也走完了所有逻辑,acquire方法返回,执行lock操作后的业务逻辑。
}
//判断是否要进入阻塞状态。
//如果shouldParkAfterFailedAcquire返回true,表示需要进入阻塞,则调用parkAndCheckInterrupt,进行阻塞;
//否则表示还可以再次尝试获取锁,继续进行for循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //前一个节点在等待锁释放的通知,所以还没那么快轮到自己,当前节点可以阻塞
return true;
//static final int CANCELLED = 1;节点状态中,只有CANCELLED大于0
//前一个节点处于取消获取锁的状态,所以,可以跳过去
if (ws > 0) {
do {
//前驱的前驱变为自己的前驱,即前驱在链表中被剔除了。
node.prev = pred = pred.prev;
//如果前驱都是取消,那么一直剔除。
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将上一个节点的状态设置为signal,返回false。
//但因为前驱被置为SIGNAL,外面的for循环下一次获取还失败后,该node也会返回true了。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //将AQS对象自己传入
//LockSupport.park()也能响应中断信号,但是跟Thread.sleep()之类不同的是它不会抛出InterruptedException
//那怎么知道线程是被unpark还是被中断的呢,根据线程的interrupted status。
//如果线程是被中断退出阻塞的那么该值被设置为true,通过Thread的interrupted和isInterrupted方法都能获取该值,不再赘述。

//线程waiting在park方法中,这个return返回,肯定是线程被唤醒后才会执行
//此时它表示的意思为:在waiting中时,是否是被中断唤醒了。如果是unpark,那么Thread.interrupted()为false。中断唤醒才为true;
//我们知道处理中断时最好不要将其吃掉,要么抛出新的中断异常,要么重新设置interrupted status


//结合上文,我们知道这里如果返回true,那么外面acquireQueued方法在获取到锁的时候也会返回interrupted = true;
//表示是中断唤醒的,届时acquireQueued方法会调用selfInterrupt();
//selfInterrupt()方法其实就是调用:Thread.currentThread().interrupt();中断当前线程
return Thread.interrupted();
}

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);//设置阻塞对象,用来记录线程被谁阻塞的,用于线程监控和分析工具来定位
UNSAFE.park(false, 0L);//让当前线程不再被线程调度,就是当前线程不再执行.
setBlocker(t, null);
}

最后我们回到acquireQueued方法的最后一步,finally模块。这里是针对锁资源获取失败以后做的一些善后工作,翻看上面的代码,其实能进入这里的就是tryAcquire()方法抛出异常,也就是说AQS框架针对开发人员自己实现的获取锁操作如果抛出异常,也做了妥善的处理,一起来看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//传入的方法参数是当前获取锁资源失败的节点
private void cancelAcquire(Node node) {
// 如果节点不存在则直接忽略
if (node == null)
return;

node.thread = null;

// 跳过所有已经取消的前置节点,跟上面的那段跳转逻辑类似
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//这个是前置节点的后继节点,由于上面可能的跳节点的操作,所以这里可不一定就是当前节点
Node predNext = pred.next;

//把当前节点waitStatus置为取消,这样别的节点在处理时就会跳过该节点
node.waitStatus = Node.CANCELLED;
//如果当前是尾节点,则直接删除,即出队
//注:这里不用关心CAS失败,因为即使并发导致失败,该节点也已经被成功删除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//这里的判断逻辑很绕,具体就是如果当前节点的前置节点不是头节点且它后面的节点等待它唤醒(waitStatus小于0)
//再加上如果当前节点的后继节点没有被取消就把前置节点跟后置节点进行连接,相当于删除了当前节点
compareAndSetNext(pred, predNext, next);
} else {
//进入这里,要么当前节点的前置节点是头结点,要么前置节点的waitStatus是PROPAGATE,直接唤醒当前节点的后继节点
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

一句话总结,AbstractQueuedSynchronizer.acquireQueued()方法会一直循环来尝试获取锁,但并非一直自旋,而是会在每一次循环判断是否要进入阻塞,如果通过判断前置节点状态得知无法很快得到锁(这其中会将cancel状态的node踢出队列),那么该node会进入阻塞。

阻塞被唤醒后,如果是中断唤醒的,那么会将这个中断唤醒的标记往外层传,并再次尝试获取锁,如果还是失败,继续进入上述判断阻塞逻辑。直到获取到锁。

同时,如果tryAcquire()方法抛出异常,也会有体面的退出逻辑。

2.3.2 非公平锁的加锁逻辑

2.3.2.1 NonFairSync.lock()

再来看看NonFairSync的lock();

1
2
3
4
5
6
7
8
9
10
11
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {//不管三七二十一,先尝试获取一次锁
//CAS,成功了就设置拥有独占锁的线程是当前线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

2.3.2.2 AQS.acquire()

调用AQS.acquire()方法,前文已说过,不再赘述。可通过侧边导航快速回看。

2.3.2.3 NonFairSync.nonfairTryAcquire()

AQS.acquire()中重要的tryAcquire方法,非公平锁定义了自己的实现:

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//和公平锁一样,获取AQS的状态,我们之前说过,0未加锁,1已经加锁了,大于1表示重入数。
//如果AQS未加锁
if (c == 0) {
//注意,这里和公平锁的区别来了,还记得公平锁中这里的实现么?
//公平锁中,如果AQS未加锁,逻辑即便到了这里,也会调用hasQueuedPredecessors()来判断等待线程是否有等了更久的node
//但是非公平锁不管,我到了这里是我本事,本身不讲究公平,直接CAS设置锁状态,抢锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//否则,说明AQS是被持有锁的状态,先判断持有锁的是不是自己
else if (current == getExclusiveOwnerThread()) {
//如果是自己,那么说明自己重入了,status+1,同样的,如果int值溢出,抛异常
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//上述逻辑都不满足,未获取到锁
return false;
}

一句话总结,NonFairSync.nonfairTryAcquire方法的作用是:如果未加锁,那么直接抢锁,而不是像公平锁一样去检查是否轮到自己。若AQS已经上锁,判断加锁的人是不是自己,如果是,那么重入,status+1,设置独占锁线程,获得锁。再否则,返回false,占锁失败。

剩下的加锁逻辑,则完全和公平锁没有区别了,因为实际都是调用的AQS的addWaiter()和acquireQueued()方法,不再赘述。

2.3.3 释放锁逻辑

释放锁的逻辑,公平锁和非公平锁没有区别,本质都是调用的AQS.release()方法和Sync.tryRelease()方法

2.3.3.1 ReentrantLock.unlock()

1
2
3
public void unlock() {
sync.release(1);
}

2.3.3.2 AQS.release()

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {//release就是先调用tryRelease来释放独占性变量。
//释放独占性变量,起始就是将status的值减1,因为acquire时是加1
Node h = head;
if (h != null && h.waitStatus != 0)//是否有等待锁的阻塞线程,0为waitStatus的初始值,表示未赋值任何状态
unparkSuccessor(h);//唤醒head的后继节点
return true;
}
return false;
}

2.3.3.3 AQS.tryRelease()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
//由于只有一个线程可以获得独占先变量,也只有这个线程才能有效调用unlock,所以所有操作不需要考虑多线程并发
int c = getState() - releases;//对于重入场景,重入数-1,非重入场景,解锁。
if (Thread.currentThread() != getExclusiveOwnerThread())
//如果不是持有独占锁的线程执行unlock,抛出异常。
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//如果等于0,那么说明锁应该被释放了,否则表示当前线程有重入操作,该次解锁只是一次重入的释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

2.3.3.4 AQS.unparkSuccessor()

unparkSuccessor负责在释放锁的时候,唤醒head的后继节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
//注意这里的node是AQS.release()中塞进来的head节点
int ws = node.waitStatus;
if (ws < 0)//head的状态为非取消
compareAndSetWaitStatus(node, ws, 0);//将head的waitStatus置为0,即没有任何有实质意思的状态

//一般来说,需要唤醒的线程就是head的下一个节点,但是如果它获取锁的操作被取消,或在节点为null时
//就直接继续往后遍历,找到第一个未取消的后继节点.
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//将对应节点唤醒
}

调用了unpark方法后,进行lock操作被阻塞的线程就恢复到运行状态,就会再次执行acquireQueued中的无限for循环中的操作,再次尝试获取锁。

2.4 AQS的共享锁实现(以Semaphore为例)

前面我们学习了AQS独占锁的逻辑,对于独占锁而言,锁只能被一个线程独占持有,而对于共享锁而言,由于锁是可以被共享的,因此它可以被多个线程同时持有。

AQS的共享锁应用于Semaphore、ReentrantReadWriteLock等实现中,本次我们以Semaphore为例来剖析一下AQS的共享锁

共享锁的实现和独占锁是对应的,我们可以从下面这张表中看出

独占锁 共享锁
tryAcquire(int arg) tryAcquireShared(int arg)
tryAcquireNanos(int arg, long nanosTimeout) tryAcquireSharedNanos(int arg, long nanosTimeout)
acquire(int arg) acquireShared(int arg)
acquireQueued(final Node node, int arg) doAcquireShared(int arg)
acquireInterruptibly(int arg) acquireSharedInterruptibly(int arg)
doAcquireInterruptibly(int arg) doAcquireSharedInterruptibly(int arg)
doAcquireNanos(int arg, long nanosTimeout) doAcquireSharedNanos(int arg, long nanosTimeout)
release(int arg) releaseShared(int arg)
tryRelease(int arg) tryReleaseShared(int arg)
- doReleaseShared()
除了最后一个属于共享锁的doReleaseShared()方法没有对应外,其他的方法,独占锁和共享锁都是一一对应的。

2.4.1 Semaphore

在解析之前,我们先来了解一下Semaphore是什么。

Semaphore也叫信号量,在JDK1.5被引入,用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。

打个比喻,Semaphore就像一道阀门,可以控制同时进入某一逻辑的线程数量(构造方法中指定),我们使用acquire方法来争取通行票,使用release方法来归还通行票。通行票只是一个比喻,一般我们称之为许可。

  • Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
    1
    2
    3
    public Semaphore(int permits) {
    sync = new NonfairSync(permits);
    }
  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。
  • Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。
    1
    2
    3
    public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
  • 当初始值为1时,可以用作互斥锁,并具备不可重入的加锁语义。
  • Semaphore将AQS的同步状态(status字段)用于保存当前可用许可的数量。

我们调用Semaphore方法时,其实是在间接调用其内部类或AQS方法执行的。Semaphore类结构与ReetrantLock类相似,内部类Sync继承自AQS,然后其子类FairSync和NoFairSync分别实现公平锁和非公平锁的获取锁方法tryAcquireShared(int arg),而释放锁的tryReleaseShared(int arg)方法则有Sync类实现,因为非公平或公平锁的释放过程都是相同的。

2.4.2 Semaphore公平锁争锁逻辑

不论是公平锁还是非公平锁,Semaphore使用acquire()方法来争锁

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly方法,是定义在AQS中的,它可以响应中断异常,

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//如果当前线程被中断,抛出中断异常。
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//尝试获取共享锁,返回值小于0表示获取失败
doAcquireSharedInterruptibly(arg);//未获取成功则加入同步队列等待
}

2.4.2.1 Semaphore.FairSync.tryAcquireShared

我们知道AQS中,try开头的几个方法都是模板方法,需要各个实现自己重写,Semaphore的公平锁实现类FairSync同样实现了自己的tryAcquireShared。

tryAcquire的返回值是个boolean类型,表示是否成功获取到了锁,而tryAcquireShared的返回值是一个int类型,这表示tryAcquireShared的返回含义绝不止是或者否这么简单,它的返回有三种情况:

  • 小于0 : 表示获取锁失败,需要进入等待队列。
  • 等于0 : 表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的。
  • 大于0 : 表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected int tryAcquireShared(int acquires) {
for (;;) {//一直执行,直到要么失败,要么成功
//hasQueuedPredecessors我们在上文见过了,它实现在AQS中
//根据 当前线程是否是等待队列的第一个 来判断是否有等待更久的节点。
//因为是公平锁,所以要先判断先来后到
if (hasQueuedPredecessors())
return -1;//还没轮到你,所以获取锁失败,需要进入等待队列
int available = getState();//Semaphore的status表示许可总数量
int remaining = available - acquires;//总数量-索取量
//如果索取量超过剩余量,返回的是小于0,表示获取许可失败。
//如果CAS成功,表示获取许可成功,那么返回剩余量。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

一句话总结,FairSync.tryAcquireShared方法的作用是:重复判断是否轮到自己来获取许可了,如果不是,返回获取失败。否则检查剩余量,若许可的剩余量满足索取量,那么CAS获取许可,返回索取后的剩余量。

2.4.2.2 AQS.doAcquireSharedInterruptibly()

acquireSharedInterruptibly中,如果tryAcquireShared获取许可失败,那么逻辑就进入了doAcquireSharedInterruptibly方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//以共享模式调用addWaiter封装一个node,
//addWaiter前文已分析,具体可见右侧导航栏跳转2.3.1.2 AQS.addWaiter()
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {//重复执行
//获取node的前驱
final Node p = node.predecessor();
if (p == head) {//前驱如果是head,表示自己已经在队首
//tryAcquireShared再尝试一次
int r = tryAcquireShared(arg);
if (r >= 0) {
//这里是重点,获取到锁以后的唤醒操作,后面详细说
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;//获取许可成功
return;
}
}
//自己不在队首,或者取锁失败,调用AQS.shouldParkAfterFailedAcquire判断是否需要阻塞,
//若需要则调用parkAndCheckInterrupt,进行阻塞;
//否则表示还可以再次尝试获取锁,继续进行for循环
//这两个方法前文都已分析,具体可见右侧导航栏跳转2.3.1.3 AQS.acquireQueued()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//注意,非Interruptibly后缀的方法,在进入这里的时候是将它们定义的interrupted设为true;
//而Interruptibly后缀的方法则不需要向外传递中断状态,直接抛出中断异常即可
throw new InterruptedException();
}
} finally {
if (failed)
//cancelAcquire方法前文也已分析,体面的退出获取。只有报异常才会来到这里。
cancelAcquire(node);
}
}

一句话总结,AQS.doAcquireSharedInterruptibly方法的作用是:调用addWaiter封装当前线程,然后重复执行取锁逻辑,直到取到锁为止,如果取到锁,设置各个状态并唤醒后继线程,如果没有获取到锁,改变前驱节点状态,将其设置为signal,然后阻塞,等待唤醒。

2.4.2.3 AQS.setHeadAndPropagate()

获取到许可时,逻辑调用了AQS.setHeadAndPropagate(),从方法名就可以看出除了设置新的头结点以外还有一个传递动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void setHeadAndPropagate(Node node, int propagate) {
//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值
//注意上面说的,它可能大于0也可能等于0(小于0不可能来这)
Node h = head; //将老的头结点记录下来,用来下面if的check
//设置新的头节点,即把当前获取到锁的节点设置为头节点
//可以看到,每个获取到共享锁的线程,都会被设置为head
//获取到共享锁的线程有多个,head表示的是最近获取到共享锁的那个node
setHead(node);//此时head==node了
//这里意思是:有两种情况是需要执行唤醒操作
//1.propagate > 0 表示调用方指明了后继节点还可以被唤醒,因为许可还有
//2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点(h = head)还是新的头结点(node)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//此时node已经是head了,node的next就是在等待队列的队首
//如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
//这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
if (s == null || s.isShared())
//doReleaseShared不会释放锁,别被名字误导,它会唤醒后续节点,讲到释放锁时重点说
//为什么有新的node获取到共享锁之后,要唤醒后续的那个节点争锁呢?
//因为propagate>0,也就是说还有剩余的许可没被占用
doReleaseShared();
}
}
1
2
3
final boolean isShared() {
return nextWaiter == SHARED;
}

我们知道,在条件队列中,nextWaiter是指向条件队列中的下一个节点的,它将条件队列中的节点串起来,构成了单链表。但是在同步队列中,我们只用prev/next属性来串联节点,形成双向链表,nextWaiter属性在这里只起到一个标记作用,不会串联节点,这里不要被Node SHARED = new Node()所指向的空节点迷惑,这个空节点并不属于同步队列,不代表任何线程,它只起到标记作用,仅仅用作判断节点是否处于共享模式的依据。

一句话总结,AQS.setHeadAndPropagate方法的作用是:设置head节点,并在许可还有剩余或者后继新旧head节点的后驱都应该被唤醒时(waitStatus < 0),唤醒head的后继,让其参与争锁。

2.4.3 Semaphore非公平锁争锁逻辑

前文说过,不论是公平锁还是非公平锁,Semaphore都使用acquire()方法来争锁

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly方法,是定义在AQS中的,它可以响应中断异常,这个前文介绍过了,不再多说,拷贝过来

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//如果当前线程被中断,抛出中断异常。
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//尝试获取共享锁,返回值小于0表示获取失败
doAcquireSharedInterruptibly(arg);//未获取成功则加入同步队列等待
}

2.4.3.1 Semaphore.nonfairTryAcquireShared()

acquireSharedInterruptibly方法中的tryAcquireShared是模板方法,在Semaphore的两个内部类NonfairSync和FairSync中有各自的实现,FairSync.tryAcquireShared我们讲过了,我们来看下NonfairSync.tryAcquireShared

1
2
3
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

直接调用了nonfairTryAcquireShared方法,该方法定义在Semaphore类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final int nonfairTryAcquireShared(int acquires) {
for (;;) {//一直执行,直到要么失败,要么成功
//在FairSync.tryAcquireShared中,此时会判断if (hasQueuedPredecessors())
//如果为true,则返回抢锁失败
//但非公平锁不讲究先来后到,直接争锁
int available = getState();//Semaphore的status表示许可总数量
int remaining = available - acquires;//总数量-索取量
//如果索取量超过剩余量,返回的是小于0,表示获取许可失败。
//如果CAS成功,表示获取许可成功,那么返回剩余量。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

一句话总结,Semaphore.nonfairTryAcquireShared方法的作用是:重复检查剩余量,若许可的剩余量满足索取量,那么CAS获取许可,返回索取后的剩余量。

2.4.4 Semaphore释放锁逻辑

2.4.4.1 AQS.releaseShared()

我们使用releaseShared(int arg)方法来释放共享锁:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

在独占锁模式下,由于头节点就是持有独占锁的节点,在它释放独占锁后,如果发现自己的waitStatus不为0,则它将负责唤醒它的后继节点。

在共享锁模式下,头节点也是持有共享锁的节点(每个获得共享锁的node都会当一段时间的head),在它释放共享锁后,它也应该唤醒它的后继节点,但是值得注意的是,我们在之前的setHeadAndPropagate方法中可能已经调用过该方法了,也就是说它可能会被同一个头节点调用两次,也有可能在我们从releaseShared方法中调用它时,当前的头节点已经易主了。

2.4.4.2 Semaphore.tryReleaseShared()

1
2
3
4
5
6
7
8
9
10
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();//当前剩余的许可数
int next = current + releases;//剩余许可数+这次释放的许可数
if (next < current) // overflow //int型溢出才会有这种情况
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//成功释放锁,重置剩余许可数
return true;
}
}

一句话总结,Semaphore.tryReleaseShared方法的作用是:一直尝试将锁释放,CAS控制并发,将state值加回来

2.4.4.3 AQS.doReleaseShared()

doReleaseShared是共享锁中最难理解的部分,我们来看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void doReleaseShared() {
for (;;) {//循环check
Node h = head;//当前的head节点,执行到这里时,head可能是当前线程之前绑定的节点,也可能节点已经易主了
//如果当前线程获取到锁后没有其他线程再获取到共享锁,那么这个head就是之前自己绑定的节点,否则,就不是了。
if (h != null && h != tail) {//等待队列不为空
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//将head节点状态置为0,0为默认值,无特殊含义。
//CAS不成功,重新执行for循环判断。
//正是因为这个CAS的存在,保证即便doReleaseShared入口有setHeadAndPropagate跟release两个
//但同一时间也只会唤醒一个后继节点。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//CAS成功,执行唤醒操作,唤醒h.next
//注意,这里唤醒h.next后,逻辑就到了if (h == head)这里
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
//head的ws什么情况下会等于0?
//1.上面的if会将ws从Node.SIGNAL置为0
//但执行了if,在本次迭代中不会执行else,得等到下次循环,如果期间head节点没有易主,那就没有下次循环。
//如果易主了,这里的h和ws就指向的是新head节点和其waitStatus。
//所以情况1不成立。
//2.当前队列的最后一个节点成为了头节点
//因为只要有新节点入列,都会在shouldParkAfterFailedAcquire把前置节点的waitStatus置为Node.SIGNAL

//当队列里唯一的节点成为了头节点,那什么情况下compareAndSetWaitStatus(h, 0, Node.PROPAGATE)会失败呢?
//答案是:并发时,ws == 0判断刚过,就有新节点将ws改为Node.SIGNAL。
//但别忘了,for循环的第一个if,也是最外层的if是if (h != null && h != tail)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果这个方法期间head没有易主,说明没有其他线程在这个期间获取到共享锁,它就可以break了。
//如果head易主,说明方法执行过程中其他线程获取到了锁,
if (h == head) // loop if head changed
break;
}
}

该方法最难理解的是

1
2
3
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;

为什么要有这个continue呢??

根据上述的注解,我们知道要进入continue,得满足如下条件:

  • 队列中的最后一个节点成为了head,不然head的ws不会为0;
  • 判断if (h != null && h != tail)时,队列中不能只有head一个节点,否则不会进入if语句。
  • 判断else if (ws == 0时,ws还要为0,但紧接着compareAndSetWaitStatus(h, 0, Node.PROPAGATE))时却不能成功,说明此时ws不为0;

所以总结下来,只有一个极其短暂的瞬间,逻辑有可能走到这里:

  • 队列中的最后一个节点成为了head
  • 在当前线程中(我们称线程A)判断if (h != null && h != tail)之前,等待队列中新加入一个节点(执行入列逻辑的线程我们称为线程B),该节点即head的后驱(等待队列不包括head),使得if (h != null && h != tail)通过。
  • 此时线程B中,head的后驱尝试获取锁失败,于是准备入列,刚刚执行addWaiter加入队列,还没有将head的ws改写,ws还是为0
  • 然后线程A中我们判断if (ws == Node.SIGNAL) 不成立,进而判断else if (ws == 0 ,逻辑通过
  • 紧接着线程A判断compareAndSetWaitStatus(h, 0, Node.PROPAGATE)之前的一瞬间,ws在线程B中被head后驱通过调用shouldParkAfterFailedAcquire改写
  • 改写后,线程B中,head后驱在下个循环就会进入阻塞了,而此时线程A执行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)失败,触发了这个continue,而不是去进行if (h == head)判断。
    • 为什么要有这个continue?因为此时如果没有continue,那么进行if (h == head)判断,可能(其实是大概率)会通过,逻辑直接break了(break后就不会再唤醒后继了),逻辑A将唤醒后驱的逻辑break,而head的后驱却要阻塞,这使得head的后驱短时间内可能无人唤醒。
    • 有了这个continue,那么下次循环,if (ws == Node.SIGNAL)通过,会对head的后继进行一次unpark,如果此时线程B已经park,那么唤醒正好。如果线程B慢悠悠的还没执行到park,那么我们知道,当我们unpark一个并没有被park的线程时,该线程在下一次调用park方法时就不会被阻塞,这时候如果线程B再执行park,也不会被阻塞了。

(上述只是个人猜想总结,如有缺漏或错误,还请指正)

一句话总结,AQS.doReleaseShared方法的作用是:运用精细的逻辑控制,重复尝试唤醒head节点的后继。

2.5 AQS condition实现

2.5.1 condition是什么

在了解condition之前,我们先来说说java源生的condition:

用对象锁时,我们有Object.wait()、Object.notify()和Object.notifyAll()这三个api来操作对象的内部条件队列:

它的用法一般是这样(一个典型的生产者消费者模型,mBuf表示缓存队列):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static Object lockObj = new Object();

public class Producer{

public void produce() {
synchronized (lockObj) {
while (mBuf.isFull()) {
try {
lockObj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
mBuf.add();
lockObj.notifyAll();
}
}
}

public class Consumer{
public void consume() {
synchronized (lockObj) {
while (mBuf.isEmpty()) {
try {
lockObj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
mBuf.remove();
lockObj.notifyAll();
}
}
}

以Producer生产者为例,在实际中,我们会创建多个Producer实例出来,表示多个的生产者,他们并发的往mBuf里面add产品。

此时,使用一个静态的对象lockObj作为锁,那么produce产品的时候,所有的生产者都在争这个锁。

如果mBuf已经满了,那么lockObj.wait()方法会将获取到锁的当前线程休眠,并且释放锁。当前线程会被封装,然后进入到lockObj锁对象对应的monitor对象的waitSet队列中。

当消费者消费了一个产品,导致mBuf不满了,消费者会调用lockObj.notifyAll()唤醒所有在lockObj锁对象对应monitor对象的waitSet队列中的线程

和notifyAll()类似,lockObj.notify()表示唤醒一个线程,只不过notify()方法到底唤醒哪一个,则由操作系统决定。

总结一下:

  • 调用某个锁对象的wait()方法,会释放当前的锁,然后让出CPU,最后让当前线程阻塞,有个前提,当前线程必须拥有此锁对象的monitor(即锁)。
  • 调用某个锁对象的notify()方法能够唤醒一个正在等待这个锁对象的monitor的线程,如果有多个线程都在等待这个锁对象的monitor,则只能唤醒其中一个线程;
  • 调用notifyAll()方法能够唤醒所有正在等待这个锁对象的monitor的线程;

2.5.1.1 简介

condition又叫做条件队列,是AQS的一个内部实现,它能实现线程之间的通信,condition对象维护了一个FIFO的单向node链表,我们称之为条件等待队列(单向体现在只有后驱)(上文中争锁的队列我们叫做同步队列或者阻塞队列,以示区分,但其实他们的元素都是AQS.Node对象)。

我们在利用condition可以特定的场景下使线程休眠或被唤醒,和wait、notify实现的功能是一样的,但condition将休眠的对象放入等待队列,使其变得更为灵活。比如我们知道notify无法唤醒特定的一个线程,而是随机唤醒一个线程,但condition基于等待队列就能做到唤醒特定的一个线程(队首的线程),甚至我们还可以定义多个condition,使其能够互不干扰的休眠或唤醒。

condition是AQS的一个内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

private static final long serialVersionUID = 7373984972572414691L;
...
...
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
...
...
}
}

2.5.1.2 语义

从语义上来看,AQS实现的锁机制对应的是对象内置锁(synchronized语义对应的同步机制)的语义,这很好理解,在AQS这种显性锁出现前,我们使用java内置的对象的monitor来当做锁(对象锁或者类锁,即synchronized关键字和 (lock)等 ),AQS使用精妙的逻辑,重新显性的实现了锁机制。

同理的,AQS.condition也是相对于内置锁的条件队列的一种显性存在。

  • 用AQS实现上述条件锁时,我们有condition.await(),condition.signal(),condition.signalAll()这三个api来操作AQS实现的锁的内部条件队列,分别对应Object的wait(),notify()和notifyAll()方法:
    • Condition提供了await()方法将当前线程阻塞,对应Object的wait()。线程调用await()方法前必须获取锁,调用await()方法时,将线程构造成节点加入条件等待队列,同时释放锁,并挂起当前线程。
    • Condition提供了signal()方法支持当前线程将已经阻塞的队首线程唤醒,让他们重新争锁。对应Object的notify()。当前线程调用signal()方法前也必须获取锁,当执行signal()方法时将条件等待队列的节点移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点。
    • Condition提供了signalAll()方法支持当前线程将已经阻塞的线程全部唤醒,让他们重新争锁。对应Object的notifyAll()方法。当前线程调用signalAll()方法前也必须获取锁,当执行signalAll()方法时将条件等待队列的节点全部移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点。

2.5.1.3 condition的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void doSomething(){
lock.lock();
System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName()));
try {
System.out.println(String.format("%s线程,await",Thread.currentThread().getName()));
TimeUnit.SECONDS.sleep(2L); //模拟耗时业务逻辑执行
condition.await(); //await
System.out.println(String.format("%s线程,await被唤醒",Thread.currentThread().getName()));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName()));
lock.unlock();
}

public static void main(String[] args) throws InterruptedException {
ReentrantLockTest test = new ReentrantLockTest();
int total = 1;
while (total>0){
Thread t = new Thread(()->{
test.doSomething();
},"T-"+total);
t.start();

TimeUnit.MILLISECONDS.sleep(200L); //让子线程T-1率先获取到锁
lock.lock();
System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName()));
test.condition.signal();
System.out.println(String.format("%s线程,signal",Thread.currentThread().getName()));
lock.unlock();
total--;
}
}

得到返回

1
2
3
4
5
6
T-1线程,获取到锁了
T-1线程,await
main线程,获取到锁了
main线程,signal
T-1线程,await被唤醒
T-1线程,业务执行完毕

2.5.2 condition源码解析

看完demo,我们来看源码,以ReentrantLock实现的condition为例,看看ReentrantLock是如何在AQS的condition上继承和实现的。

2.5.2.1 ReentrantLock.newCondition

如之前的demo所示,我们使用如下方式来获得condition对象

1
2
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

来看下newCondition()

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

2.5.2.2 AQS.ConditionObject

newCondition()返回的就是一个ConditionObject对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
...
...
...
}

condition通过firstWaiter字段和lastWaiter字段组成了一个单向队列,即等待队列,并且和AQS的同步队列相比,他们虽然都是FIFO的,但等待队列的首节点并不具备同步队列首节点的传播通知的功能。而且首节点是第一个阻塞的线程节点。

  • firstWaiter字段
    • 首个等待节点
  • lastWaiter字段
    • 最后一个等待节点

2.5.2.3 AQS.await()

我们先来看一下condition将当前线程挂起的方法:await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final void await() throws InterruptedException {
if (Thread.interrupted())//当前线程加入等待队列前先判断是否被中断,若是,得响应中断
throw new InterruptedException();
Node node = addConditionWaiter();//将同步队列中的当前线程构造成一个新的节点添加到等待队列尾部,后面详讲
//释放node的同步状态(即释放锁)并返回释放之前的同步状态,后面详讲
//因为condition的使用基于当前线程已经获取到锁了,所以release不会报错IllegalMonitorStateException
//IllegalMonitorStateException:未持有锁的线程去释放锁时报该异常
int savedState = fullyRelease(node);
int interruptMode = 0;
//第一次进入while,判断被唤醒的node是否已经转移到AQS的同步队列中,不在,则park线程,转移成功才退出循环
//后面被唤醒后的线程,将从await()方法中的while循环中退出
while (!isOnSyncQueue(node)) {//(isOnSyncQueue(Node node)方法返回true,表示节点状态不为condition,且已经在同步队列中)
//挂起线程,之后如果被被unpark或者发生中断时,也从此方法返回
LockSupport.park(this);
//被唤醒后来到这里,

//checkInterruptWhileWaiting方法有点绕,但其实不重要
//只需知道是为了发生中断的时候能够让node跳出while循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//如果看到这里,说明被唤醒了,但不是因为发生中断
//这时还需要继续判断是否进入同步队列,如果没有则继续循环,继续park。
}
//跳出循环了,说明被唤醒了,调用AQS的acquireQueued()方法加入到获取同步状态的竞争中。State还是之前释放锁时保存的status
//interruptMode != THROW_IE:如果不是因为中断异常而退出循环的话;
//AQS的acquireQueued()前文已分析,具体可见右侧导航栏跳转2.3.1.3 AQS.acquireQueued()
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//处理中断要么抛出中断异常,要么重设置中断态。
//这里排除了抛出异常,那么标记一下后面如果要处理中断,应该采用重置中断态的方式
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 从队首开始往后溯,清空条件等待队列中节点状态不为 CONDITION 的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 如果线程已经被中断,则根据之前获取的interruptMode的值来判断是继续中断还是抛出异常
//reportInterruptAfterWait方法作用:
//如果之前是抛出中断异常,那么这里要再次抛出
//如果之前是给自己设置中断状态,那么这里也要设置
reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {//该方法比较简单,构造node,并加入condition等待队列
Node t = lastWaiter;//获取等待队列队尾
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {//如果队尾被取消了,将其清理
unlinkCancelledWaiters();//该方法从队首开始往后清理被取消的节点
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);//构造node
if (t == null)//如果等待队列为空,加入队首
firstWaiter = node;
else
t.nextWaiter = node;//否则加入队尾
lastWaiter = node;
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {//fully的意思是将持有的锁完全释放:也就是说,即便有n次重入或许可,也要全部释放。
boolean failed = true;
try {
int savedState = getState();//因为要释放所有重入次数或者许可,所以要获取总量
if (release(savedState)) {//调用AQS的release一次释放总量,详见上文2.3.3.3 AQS.tryRelease()
failed = false;
return savedState;
} else {
//IllegalMonitorStateException:未持有锁的线程去释放锁时报该异常
//这里如果
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//释放锁失败的场景,只有IllegalMonitorStateException异常。这时可以认为该节点已经被取消。
node.waitStatus = Node.CANCELLED;
}
}

2.5.2.4 AQS.signal()

看完了挂起线程的方法,我们来看下唤醒线程的方法,signal方法会唤醒condition等待队列中队首的那个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())//该方法AQS未实现,ReentrantLock的实现是判断当前线程是不是持有锁的线程,是,则true
throw new IllegalMonitorStateException();
Node first = firstWaiter;//获取等待队列的队首
if (first != null)
doSignal(first);//下面我们来看看这个方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

doSignal进来就是一个do-while,我们先看transferForSignal,回头再看doSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 将一个node从等待队列转移至同步队列,
* 如果成功,返回true
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//尝试将node的状态置为无意义的0,如果失败,说明该节点已经被取消。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//node进入同步队列的入列操作,在前文2.3.1.2 AQS.addWaiter()有过介绍
//enq会返回入队前同步队列的队尾指针,即刚入队的node的前驱。
int ws = p.waitStatus;
//只有取消ws才会 大于0
//我们知道同步队列中的节点都是依靠前驱节点来唤醒
//如果入队后node的前驱已经被取消或者设置SIGNAL状态不成功,那么尝试唤醒当前node
//虽然唤醒了可能还是争不到锁,但该操作至少是无害的。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

总结:传进transferForSignal的node节点被取消,会返回false,成功入同步队列了,会返回true。这时我们再看doSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
//第一次进来,先把firstWaiter设置为first.nextWaiter,这相当于first已经脱离等待队列了。
//为什么这么果断呢,因为transferForSignal如果返回true,while循环结束
//那么说明first已经进入同步队列,确实应该取消

//如果transferForSignal返回false,那么说明first节点已经被canceled了,也应该脱离等待队列。
if ( (firstWaiter = first.nextWaiter) == null)//如果条件满足,说明等待队列已经为空了
lastWaiter = null;//则将lastWaiter指针也置空
first.nextWaiter = null;//first此时已经脱队,将其单向链表的后继也置空,first彻底脱离等待队列

//如果transferForSignal返回true,说明first入同步队列成功,while条件不满足,退出循环
//如果transferForSignal返回false,说明first被取消,如果此时等待队列已经空了,那么也退出循环,
//否则,将新的等待队列队首设置为first,执行重复逻辑。
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

2.5.2.5 AQS.signalAll()

signalAll可唤醒等待队列中的全部线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)//等待队列不为空,执行doSignalAll
doSignalAll(first);
}

不赘述,直接看doSignalAll

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;//先把标示性的指针lastWaiter和firstWaiter清空,因为等待队列即将要空了。
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);//现将first转移至同步队列,不管成功或者失败,都要继续执行。
first = next;//将first改为原来first的后继
} while (first != null);//只要等待队列不空,就一直将队首移入同步队列。
}

逻辑很简单,将等待队列从队首无脑移入同步队列。

0%