1. 概念
在分析 Java 并发包java.util.concurrent源码的时候,少不了需要了解AbstractQueuedSynchronizer(以下简写AQS)这个抽象类,因为它是Java并发包的基础工具类,是实现ReentrantLock、CountDownLatch、Semaphore、FutureTask等类的基础,许多实现都依赖其所提供的队列式同步器。
AQS的功能可以分为两类:独占锁和共享锁。它的所有子类中,要么实现并使用了它独占锁的API,要么使用了共享锁的API,而不会同时使用两套API,即便是它最有名的子类ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的,到目前为止,我们只需要明白AQS在功能上有独占锁和共享锁两种功能即可。
1.1 如何使用AQS
AQS管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如, Semaphore用它来表现剩余的许可数,ReentrantLock用它来表现拥有它的线程已经请求了多少次锁;FutureTask用它来表现任务的状态(尚未开始、运行、完成和取消)。
AQS有众多方法,大致可以分为两类:
1.1.1 需要子类实现的方法(模板方法)
tryAcquire(int arg):独占式的获取锁,返回值是boolean类型的,true代表获取锁,false代表获取失败。
tryRelease(int arg):释放独占式同步状态,释放操作会唤醒其后继节点获取同步状态。
tryAcquireShared(int arg):共享式的获取同步状态,返回大于0代表获取成功,否则就是获取失败。
tryReleaseShared(int arg):共享式的释放同步状态。
isHeldExclusively():判断当前的线程是否已经获取到了同步状态。
这些方法是子类实现时可能实现的方法,通过上面的这些方法来判断是否获取了锁,然后再通过AQS本身的方法执行获取锁与未获取锁的过程。
以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法,支持独占(排他)获取锁的同步器应该实现tryAcquire、 tryRelease、isHeldExclusively而支持共享获取的同步器应该实现tryAcquireShared、tryReleaseShared。
1.1.2 AQS本身的实现的方法
acquire(int arg)/acquireInterruptibly(int arg):独占式的获取锁操作,独占式获取同步状态都调用这个方法,通过子类实现的tryAcquire方法判断是否获取了锁。Interruptibly后缀的方法带有中断异常的签名,表示可以响应中断异常,无此后缀的acquire方法则通过重新标记中断状态的方式响应中断。
acquireShared(int arg)/acquireSharedInterruptibly:共享式的获取锁操作,在读写锁中用到,通过tryAcquireShared方法判断是否获取到了同步状态。Interruptibly后缀的方法带有中断异常的签名,表示可以响应中断异常,无此后缀的acquire方法则通过重新标记中断状态的方式响应中断。
release(int arg):独占式的释放同步状态,通过tryRelease方法判断是否释放了独占式同步状态。
releaseShared(int arg):共享式的释放同步状态,通过tryReleaseShared方法判断是否已经释放了共享同步状态。
从这两类方法可以看出,AQS为子类定义了一套获取锁和释放锁以后的操作,而具体的如何判断是否获取锁和释放锁都是交由不同的子类自己去实现其中的逻辑,这也是Java设计模式之一:模板模式的实现。有了AQS我们就可以实现一个属于自己的Lock。
2.源码分析(基于jdk1.8)
2.1 AQS类
首先是类图↓
从图中可以看出来,AbstractQueuedSynchronizer内部维护了一个Node节点类和一个ConditionObject内部类。Node内部类是一个双向的FIFO队列,用来保存阻塞中的线程以及获取同步状态的线程,而ConditionObject对应的是后面要讲的Lock中的等待和通知机制。↓
我们可以看到,AQS类是JUC框架的基石。为什么这么说?我们以ReentrantLock为例,ReentrantLock把所有Lock接口的操作都委派到一个自定义的内部类Sync类上,该类继承自AbstractQueuedSynchronizer。同时该类又有两个子类,NonfairSync 和 FairSync,实现非公平锁和公平锁。↓
1 | 公平锁:线程获取锁的顺序和调用lock的顺序一样,FIFO,先到先得; |
同样的,CountDownLatch、Semaphore等其他类,也自定义了自己的Sync类和NonfairSync和FairSync,以达到功能的差异化。
2.2 AQS的属性
2.2.1 状态位state
AQS用的是一个32位的整型来表示同步状态的,它是用volatile修饰的:
1 | /** |
2.2.2 当前持有独占锁的线程
1 | // 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入 |
2.2.3 获取锁的阻塞队列——CLH同步队列
2.2.3.1 head和tail属性
AQS内部维护着一个FIFO的CLH队列,用来保存阻塞中的线程以及获取同步状态的线程,每个node都封装着一个独立的线程,head指向的node可以简单理解为当前持有锁的线程,tail指向了等待队列的链尾。正因为head指向当前持有锁的线程,所以,真正的等待队列,不包括head。
1 |
|
因为是CLH队列,所以AQS并不支持基于优先级的同步策略。至于为何要选择CLH队列,主要在于CLH锁相对于MSC锁,他更加容易处理cancel和timeout,同时他进出队列快、检查是否有线程在等待也非常容易(head != tail,头尾指针不同)。当然相对于原始的CLH队列锁,ASQ采用的是一种变种的CLH队列锁:
原始CLH使用的locked自旋,而AQS的CLH则是在每个node里面使用一个状态字段来控制阻塞,而不是自旋。
为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。
head结点使用的是傀儡结点。虽然node对象(封装了线程)在获取到锁的时候,逻辑会将这个node置为head,看起来head表示的是当前正在拥有锁的node节点的意思。但看setHead方法就能知道,node赋值为head后,node封装的thread对象被清空,node成为一个空对象。
我们来看看这个队列结点的实现:
2.2.3.2 node实现
我们来看看node的源码 ↓
众多字段,我们一个一个来看:
1 | /**共享模式是允许多个线程可以获取同一个锁,而独占模式则一个锁只能被一个线程持有,其他线程必须要等待。**/ |
下面的几个int常量是给waitStatus字段使用的,表示节点现在的状态
1 | /**代表此线程取消了争抢这个锁 |
1 | /** waitStatus value to indicate successor's thread needs unparking |
1 | /** waitStatus value to indicate thread is waiting on condition |
1 | /** |
然后就是状态字段的主角了,上面的这些常量,都是给该字段赋值用的 ↓
1 | /** |
前面说过,在AQS中,我们维护了一个链表,故而node节点中,也定义了前后驱。
1 | /** |
当然,还有node节点的最重要主角:被封装的线程
1 | /** |
还有一个很特殊的后驱节点,这个后驱,负责维护node节点参与的第二个链表(第一个就是AQS的同步等待链表)——condition等待链表,至于什么是condition,我们最后再来讨论。
1 | /** |
你可以把node节点简单看作 thread + waitStatus + pre + next 四个属性的封装,从本质上来说,这是没错的,node几乎所有的api也都服务于这四个属性。
2.3 AQS的独占锁实现(以ReentrantLock的公平锁和非公平锁为例)
从上图可以看到,AQS的实现有许多种,我们以最典型的在ReentrantLock类内部定义的公平锁FairSync和非公平锁NonFairSync为例,来探讨一下AQS独占模式的同步原理。(ReentrantLock是典型的独占锁,真正管理锁的也是其内部实现类FairSync或者NonFairSync)
独占锁是独占的,排他的,因此在独占锁中有一个exclusiveOwnerThread属性,用来记录当前持有锁的线程。
我们一般怎么使用ReentrantLock呢?很简单:
1 | public void needLockFunction() { |
所以,我们可以从lock方法看起
2.3.1 公平锁的加锁逻辑
reentrantLock的lock方法调用的是reentrantLock内部的sync字段的lock方法,sync字段在reentrantLock的构造方法中就开始初始化默认是非公平锁:
ReentrantLock默认使用非公平锁是基于性能考虑,公平锁为了保证线程规规矩矩地排队,需要增加阻塞和唤醒的时间开销。如果直接插队获取非公平锁,跳过了对队列的处理,速度会更快
1 | /** |
2.3.1.1 FairSync.lock()
我们来看看FairSync的lock(),很简单直接调用了acquire(); ↓
1 | final void lock() { |
2.3.1.2 AQS.acquire()
所以lock()的重点都在acquire(),FairSync调用了AQS类中实现的acquire();
1 | /** |
一句话总结,AbstractQueuedSynchronizer.acquire()方法的作用是:先尝试获取锁,若成功则不用进队列阻塞,逻辑往下走(其实就是返回了)。否则封装当前线程为node,塞进队列,然后在acquireQueued方法中一直尝试,先期会自旋,如果在自旋期间内获得锁了,那么返回,返回结果是false,表示不需要调用selfInterrupt()做自我中断。如果是阻塞后才获取锁,返回,返回结果是true,表示要设置自我中断。(只是设置中断状态,至于到底何时中断,由线程本身决定)
2.3.1.3 FairSync.tryAcquire()
AQS类中实现的acquire()又调用了FairSync中实现的tryAcquire(1),我们来看看:
1 | /** |
一句话总结,FairSync.tryAcquire()方法的作用是:如果未加锁,那么判断自己是不是队列的头名,若是,设置独占锁线程,获得锁。否则,判断加锁的人是不是自己,如果是,那么重入,status+1,设置独占锁线程,获得锁。再否则,返回false,占锁失败。
2.3.1.2 AQS.addWaiter()
看完了tryAcquire方法,我们知道在acquire方法中,如果tryAcquire方法返回false,即没有获取到锁,那么将会执行addWaiter,将当前线程封装为node,addWaiter()是AbstractQueuedSynchronizer的方法:
1 | /** |
1 | private Node enq(final Node node) { |
一句话总结,AbstractQueuedSynchronizer.addWaiter()方法的作用是将当前线程封装为node,并将node节点塞入等待队列,塞入逻辑包括节点前后驱,head和tail指针的维护,以及必要时对空列表的初始化。然后返回封装好的node。
2.3.1.3 AQS.acquireQueued()
回到acquire方法
1 | if (!tryAcquire(arg) && |
addWaiter()将当前线程封装为node,并将node节点塞入等待队列,紧接着,执行的是AbstractQueuedSynchronizer.acquireQueued()方法,这个方法就是入列后的node节点在队列中等待的逻辑,是自旋等待还是阻塞等待。
1 | /** |
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
最后我们回到acquireQueued方法的最后一步,finally模块。这里是针对锁资源获取失败以后做的一些善后工作,翻看上面的代码,其实能进入这里的就是tryAcquire()方法抛出异常,也就是说AQS框架针对开发人员自己实现的获取锁操作如果抛出异常,也做了妥善的处理,一起来看下源码:
1 | //传入的方法参数是当前获取锁资源失败的节点 |
一句话总结,AbstractQueuedSynchronizer.acquireQueued()方法会一直循环来尝试获取锁,但并非一直自旋,而是会在每一次循环判断是否要进入阻塞,如果通过判断前置节点状态得知无法很快得到锁(这其中会将cancel状态的node踢出队列),那么该node会进入阻塞。
阻塞被唤醒后,如果是中断唤醒的,那么会将这个中断唤醒的标记往外层传,并再次尝试获取锁,如果还是失败,继续进入上述判断阻塞逻辑。直到获取到锁。
同时,如果tryAcquire()方法抛出异常,也会有体面的退出逻辑。
2.3.2 非公平锁的加锁逻辑
2.3.2.1 NonFairSync.lock()
再来看看NonFairSync的lock();
1 | /** |
2.3.2.2 AQS.acquire()
调用AQS.acquire()方法,前文已说过,不再赘述。可通过侧边导航快速回看。
2.3.2.3 NonFairSync.nonfairTryAcquire()
AQS.acquire()中重要的tryAcquire方法,非公平锁定义了自己的实现:
1 | protected final boolean tryAcquire(int acquires) { |
1 | /** |
一句话总结,NonFairSync.nonfairTryAcquire方法的作用是:如果未加锁,那么直接抢锁,而不是像公平锁一样去检查是否轮到自己。若AQS已经上锁,判断加锁的人是不是自己,如果是,那么重入,status+1,设置独占锁线程,获得锁。再否则,返回false,占锁失败。
剩下的加锁逻辑,则完全和公平锁没有区别了,因为实际都是调用的AQS的addWaiter()和acquireQueued()方法,不再赘述。
2.3.3 释放锁逻辑
释放锁的逻辑,公平锁和非公平锁没有区别,本质都是调用的AQS.release()方法和Sync.tryRelease()方法
2.3.3.1 ReentrantLock.unlock()
1 | public void unlock() { |
2.3.3.2 AQS.release()
1 | public final boolean release(int arg) { |
2.3.3.3 AQS.tryRelease()
1 | protected final boolean tryRelease(int releases) { |
2.3.3.4 AQS.unparkSuccessor()
unparkSuccessor负责在释放锁的时候,唤醒head的后继节点。
1 | private void unparkSuccessor(Node node) { |
调用了unpark方法后,进行lock操作被阻塞的线程就恢复到运行状态,就会再次执行acquireQueued中的无限for循环中的操作,再次尝试获取锁。
2.4 AQS的共享锁实现(以Semaphore为例)
前面我们学习了AQS独占锁的逻辑,对于独占锁而言,锁只能被一个线程独占持有,而对于共享锁而言,由于锁是可以被共享的,因此它可以被多个线程同时持有。
AQS的共享锁应用于Semaphore、ReentrantReadWriteLock等实现中,本次我们以Semaphore为例来剖析一下AQS的共享锁
共享锁的实现和独占锁是对应的,我们可以从下面这张表中看出
独占锁 | 共享锁 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
除了最后一个属于共享锁的doReleaseShared()方法没有对应外,其他的方法,独占锁和共享锁都是一一对应的。 |
2.4.1 Semaphore
在解析之前,我们先来了解一下Semaphore是什么。
Semaphore也叫信号量,在JDK1.5被引入,用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。
打个比喻,Semaphore就像一道阀门,可以控制同时进入某一逻辑的线程数量(构造方法中指定),我们使用acquire方法来争取通行票,使用release方法来归还通行票。通行票只是一个比喻,一般我们称之为许可。
- Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
1
2
3public Semaphore(int permits) {
sync = new NonfairSync(permits);
} - 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
- 访问资源后,使用release释放许可。
- Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。
1
2
3public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
} - 当初始值为1时,可以用作互斥锁,并具备不可重入的加锁语义。
- Semaphore将AQS的同步状态(status字段)用于保存当前可用许可的数量。
我们调用Semaphore方法时,其实是在间接调用其内部类或AQS方法执行的。Semaphore类结构与ReetrantLock类相似,内部类Sync继承自AQS,然后其子类FairSync和NoFairSync分别实现公平锁和非公平锁的获取锁方法tryAcquireShared(int arg),而释放锁的tryReleaseShared(int arg)方法则有Sync类实现,因为非公平或公平锁的释放过程都是相同的。
2.4.2 Semaphore公平锁争锁逻辑
不论是公平锁还是非公平锁,Semaphore使用acquire()方法来争锁
1 | public void acquire() throws InterruptedException { |
acquireSharedInterruptibly方法,是定义在AQS中的,它可以响应中断异常,
1 | public final void acquireSharedInterruptibly(int arg) |
2.4.2.1 Semaphore.FairSync.tryAcquireShared
我们知道AQS中,try开头的几个方法都是模板方法,需要各个实现自己重写,Semaphore的公平锁实现类FairSync同样实现了自己的tryAcquireShared。
tryAcquire的返回值是个boolean类型,表示是否成功获取到了锁,而tryAcquireShared的返回值是一个int类型,这表示tryAcquireShared的返回含义绝不止是或者否这么简单,它的返回有三种情况:
- 小于0 : 表示获取锁失败,需要进入等待队列。
- 等于0 : 表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的。
- 大于0 : 表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功。
1 | protected int tryAcquireShared(int acquires) { |
一句话总结,FairSync.tryAcquireShared方法的作用是:重复判断是否轮到自己来获取许可了,如果不是,返回获取失败。否则检查剩余量,若许可的剩余量满足索取量,那么CAS获取许可,返回索取后的剩余量。
2.4.2.2 AQS.doAcquireSharedInterruptibly()
acquireSharedInterruptibly中,如果tryAcquireShared获取许可失败,那么逻辑就进入了doAcquireSharedInterruptibly方法中。
1 | /** |
一句话总结,AQS.doAcquireSharedInterruptibly方法的作用是:调用addWaiter封装当前线程,然后重复执行取锁逻辑,直到取到锁为止,如果取到锁,设置各个状态并唤醒后继线程,如果没有获取到锁,改变前驱节点状态,将其设置为signal,然后阻塞,等待唤醒。
2.4.2.3 AQS.setHeadAndPropagate()
获取到许可时,逻辑调用了AQS.setHeadAndPropagate(),从方法名就可以看出除了设置新的头结点以外还有一个传递动作:
1 | private void setHeadAndPropagate(Node node, int propagate) { |
1 | final boolean isShared() { |
我们知道,在条件队列中,nextWaiter是指向条件队列中的下一个节点的,它将条件队列中的节点串起来,构成了单链表。但是在同步队列中,我们只用prev/next属性来串联节点,形成双向链表,nextWaiter属性在这里只起到一个标记作用,不会串联节点,这里不要被Node SHARED = new Node()所指向的空节点迷惑,这个空节点并不属于同步队列,不代表任何线程,它只起到标记作用,仅仅用作判断节点是否处于共享模式的依据。
一句话总结,AQS.setHeadAndPropagate方法的作用是:设置head节点,并在许可还有剩余或者后继新旧head节点的后驱都应该被唤醒时(waitStatus < 0),唤醒head的后继,让其参与争锁。
2.4.3 Semaphore非公平锁争锁逻辑
前文说过,不论是公平锁还是非公平锁,Semaphore都使用acquire()方法来争锁
1 | public void acquire() throws InterruptedException { |
acquireSharedInterruptibly方法,是定义在AQS中的,它可以响应中断异常,这个前文介绍过了,不再多说,拷贝过来
1 | public final void acquireSharedInterruptibly(int arg) |
2.4.3.1 Semaphore.nonfairTryAcquireShared()
acquireSharedInterruptibly方法中的tryAcquireShared是模板方法,在Semaphore的两个内部类NonfairSync和FairSync中有各自的实现,FairSync.tryAcquireShared我们讲过了,我们来看下NonfairSync.tryAcquireShared
1 | protected int tryAcquireShared(int acquires) { |
直接调用了nonfairTryAcquireShared方法,该方法定义在Semaphore类中
1 | final int nonfairTryAcquireShared(int acquires) { |
一句话总结,Semaphore.nonfairTryAcquireShared方法的作用是:重复检查剩余量,若许可的剩余量满足索取量,那么CAS获取许可,返回索取后的剩余量。
2.4.4 Semaphore释放锁逻辑
2.4.4.1 AQS.releaseShared()
我们使用releaseShared(int arg)方法来释放共享锁:
1 | public final boolean releaseShared(int arg) { |
在独占锁模式下,由于头节点就是持有独占锁的节点,在它释放独占锁后,如果发现自己的waitStatus不为0,则它将负责唤醒它的后继节点。
在共享锁模式下,头节点也是持有共享锁的节点(每个获得共享锁的node都会当一段时间的head),在它释放共享锁后,它也应该唤醒它的后继节点,但是值得注意的是,我们在之前的setHeadAndPropagate方法中可能已经调用过该方法了,也就是说它可能会被同一个头节点调用两次,也有可能在我们从releaseShared方法中调用它时,当前的头节点已经易主了。
2.4.4.2 Semaphore.tryReleaseShared()
1 | protected final boolean tryReleaseShared(int releases) { |
一句话总结,Semaphore.tryReleaseShared方法的作用是:一直尝试将锁释放,CAS控制并发,将state值加回来
2.4.4.3 AQS.doReleaseShared()
doReleaseShared是共享锁中最难理解的部分,我们来看一下。
1 | private void doReleaseShared() { |
该方法最难理解的是
1 | else if (ws == 0 && |
为什么要有这个continue呢??
根据上述的注解,我们知道要进入continue,得满足如下条件:
- 队列中的最后一个节点成为了head,不然head的ws不会为0;
- 判断
if (h != null && h != tail)
时,队列中不能只有head一个节点,否则不会进入if语句。 - 判断
else if (ws == 0
时,ws还要为0,但紧接着compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
时却不能成功,说明此时ws不为0;
所以总结下来,只有一个极其短暂的瞬间,逻辑有可能走到这里:
- 队列中的最后一个节点成为了head
- 在当前线程中(我们称线程A)判断
if (h != null && h != tail)
之前,等待队列中新加入一个节点(执行入列逻辑的线程我们称为线程B),该节点即head的后驱(等待队列不包括head),使得if (h != null && h != tail)
通过。 - 此时线程B中,head的后驱尝试获取锁失败,于是准备入列,刚刚执行addWaiter加入队列,还没有将head的ws改写,ws还是为0
- 然后线程A中我们判断
if (ws == Node.SIGNAL)
不成立,进而判断else if (ws == 0
,逻辑通过 - 紧接着线程A判断
compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
之前的一瞬间,ws在线程B中被head后驱通过调用shouldParkAfterFailedAcquire
改写 - 改写后,线程B中,head后驱在下个循环就会进入阻塞了,而此时线程A执行
compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
失败,触发了这个continue,而不是去进行if (h == head)
判断。- 为什么要有这个continue?因为此时如果没有continue,那么进行
if (h == head)
判断,可能(其实是大概率)会通过,逻辑直接break了(break后就不会再唤醒后继了),逻辑A将唤醒后驱的逻辑break,而head的后驱却要阻塞,这使得head的后驱短时间内可能无人唤醒。 - 有了这个continue,那么下次循环,
if (ws == Node.SIGNAL)
通过,会对head的后继进行一次unpark,如果此时线程B已经park,那么唤醒正好。如果线程B慢悠悠的还没执行到park,那么我们知道,当我们unpark一个并没有被park的线程时,该线程在下一次调用park方法时就不会被阻塞,这时候如果线程B再执行park,也不会被阻塞了。
- 为什么要有这个continue?因为此时如果没有continue,那么进行
(上述只是个人猜想总结,如有缺漏或错误,还请指正)
一句话总结,AQS.doReleaseShared方法的作用是:运用精细的逻辑控制,重复尝试唤醒head节点的后继。
2.5 AQS condition实现
2.5.1 condition是什么
在了解condition之前,我们先来说说java源生的condition:
用对象锁时,我们有Object.wait()、Object.notify()和Object.notifyAll()这三个api来操作对象的内部条件队列:
它的用法一般是这样(一个典型的生产者消费者模型,mBuf表示缓存队列):
1 | static Object lockObj = new Object(); |
以Producer生产者为例,在实际中,我们会创建多个Producer实例出来,表示多个的生产者,他们并发的往mBuf里面add产品。
此时,使用一个静态的对象lockObj作为锁,那么produce产品的时候,所有的生产者都在争这个锁。
如果mBuf已经满了,那么lockObj.wait()方法会将获取到锁的当前线程休眠,并且释放锁。当前线程会被封装,然后进入到lockObj锁对象对应的monitor对象的waitSet队列中。
当消费者消费了一个产品,导致mBuf不满了,消费者会调用lockObj.notifyAll()唤醒所有在lockObj锁对象对应monitor对象的waitSet队列中的线程。
和notifyAll()类似,lockObj.notify()表示唤醒一个线程,只不过notify()方法到底唤醒哪一个,则由操作系统决定。
总结一下:
- 调用某个锁对象的wait()方法,会释放当前的锁,然后让出CPU,最后让当前线程阻塞,有个前提,当前线程必须拥有此锁对象的monitor(即锁)。
- 调用某个锁对象的notify()方法能够唤醒一个正在等待这个锁对象的monitor的线程,如果有多个线程都在等待这个锁对象的monitor,则只能唤醒其中一个线程;
- 调用notifyAll()方法能够唤醒所有正在等待这个锁对象的monitor的线程;
2.5.1.1 简介
condition又叫做条件队列,是AQS的一个内部实现,它能实现线程之间的通信,condition对象维护了一个FIFO的单向node链表,我们称之为条件等待队列(单向体现在只有后驱)(上文中争锁的队列我们叫做同步队列或者阻塞队列,以示区分,但其实他们的元素都是AQS.Node对象)。
我们在利用condition可以特定的场景下使线程休眠或被唤醒,和wait、notify实现的功能是一样的,但condition将休眠的对象放入等待队列,使其变得更为灵活。比如我们知道notify无法唤醒特定的一个线程,而是随机唤醒一个线程,但condition基于等待队列就能做到唤醒特定的一个线程(队首的线程),甚至我们还可以定义多个condition,使其能够互不干扰的休眠或唤醒。
condition是AQS的一个内部类
1 | public abstract class AbstractQueuedSynchronizer |
2.5.1.2 语义
从语义上来看,AQS实现的锁机制对应的是对象内置锁(synchronized语义对应的同步机制)的语义,这很好理解,在AQS这种显性锁出现前,我们使用java内置的对象的monitor来当做锁(对象锁或者类锁,即synchronized关键字和 (lock)等 ),AQS使用精妙的逻辑,重新显性的实现了锁机制。
同理的,AQS.condition也是相对于内置锁的条件队列的一种显性存在。
- 用AQS实现上述条件锁时,我们有condition.await(),condition.signal(),condition.signalAll()这三个api来操作AQS实现的锁的内部条件队列,分别对应Object的wait(),notify()和notifyAll()方法:
- Condition提供了await()方法将当前线程阻塞,对应Object的wait()。线程调用await()方法前必须获取锁,调用await()方法时,将线程构造成节点加入条件等待队列,同时释放锁,并挂起当前线程。
- Condition提供了signal()方法支持当前线程将已经阻塞的队首线程唤醒,让他们重新争锁。对应Object的notify()。当前线程调用signal()方法前也必须获取锁,当执行signal()方法时将条件等待队列的节点移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点。
- Condition提供了signalAll()方法支持当前线程将已经阻塞的线程全部唤醒,让他们重新争锁。对应Object的notifyAll()方法。当前线程调用signalAll()方法前也必须获取锁,当执行signalAll()方法时将条件等待队列的节点全部移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点。
2.5.1.3 condition的使用
1 | static Lock lock = new ReentrantLock(); |
得到返回
1 | T-1线程,获取到锁了 |
2.5.2 condition源码解析
看完demo,我们来看源码,以ReentrantLock实现的condition为例,看看ReentrantLock是如何在AQS的condition上继承和实现的。
2.5.2.1 ReentrantLock.newCondition
如之前的demo所示,我们使用如下方式来获得condition对象
1 | Lock lock = new ReentrantLock(); |
来看下newCondition()
1 | final ConditionObject newCondition() { |
2.5.2.2 AQS.ConditionObject
newCondition()返回的就是一个ConditionObject对象
1 | public class ConditionObject implements Condition, java.io.Serializable { |
condition通过firstWaiter字段和lastWaiter字段组成了一个单向队列,即等待队列,并且和AQS的同步队列相比,他们虽然都是FIFO的,但等待队列的首节点并不具备同步队列首节点的传播通知的功能。而且首节点是第一个阻塞的线程节点。
- firstWaiter字段
- 首个等待节点
- lastWaiter字段
- 最后一个等待节点
2.5.2.3 AQS.await()
我们先来看一下condition将当前线程挂起的方法:await()
1 | public final void await() throws InterruptedException { |
1 | /** |
1 | /** |
2.5.2.4 AQS.signal()
看完了挂起线程的方法,我们来看下唤醒线程的方法,signal方法会唤醒condition等待队列中队首的那个线程
1 | /** |
1 | /** |
doSignal进来就是一个do-while,我们先看transferForSignal,回头再看doSignal
1 | /** |
总结:传进transferForSignal的node节点被取消,会返回false,成功入同步队列了,会返回true。这时我们再看doSignal
1 |
|
2.5.2.5 AQS.signalAll()
signalAll可唤醒等待队列中的全部线程:
1 | /** |
不赘述,直接看doSignalAll
1 | /** |
逻辑很简单,将等待队列从队首无脑移入同步队列。