前言
- CountDownLatch:一个单调递减的同步计数器,但无法重置计数的数量。
- 比喻1:CountDownLatch可以看做一个过山车,假设10个座位,那么司机这个线程,就要调用await()方法,等待10个线程到齐。
- 每个线程坐进来,就调用一下countDown()方法,表示占用个位置,然后做自己事情去了。最后一个线程进来的时候,调用一下countDown()方法,计数为0了,最后一个线程就会唤醒await的线程,也就是司机线程。
- Semaphore:一个同步的许可证资源池,负责同步给线程提供一个分发许可和归还许可的地方。
- 做个比喻,老师让同学们上来在黑板上解题,只有两根粉笔,也就是说,同时只能有两个同学在答题。两根粉笔就可以用Semaphore控制,许可是2。
- 学生们调用acquire()方法争夺粉笔,同时只有两个人争夺到,答完题后,调用release()方法归还粉笔,这时才会有其他人能获取到粉笔。
- CyclicBarrier:同步的栅栏,拦住规定数量的线程,让他们阻塞,等到线程数量齐了,让他们执行我们指定的逻辑。
- 做个比喻,CyclicBarrier可以比作一个牌局,需要固定的参与者都到场才行,先到的人阻塞在那。人到齐后,执行一遍我们指定的逻辑,此时其他线程还在阻塞,由最后一个到场的线程完成指定的逻辑,完成后,这局散场,所有人醒来,各自离开。
- CyclicBarrier可以完成一局后再开启新的一局,即等待新的参与者到来,并达到指定的数量,然后开局,从此往复。
1 CountDownLatch
CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。
CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。调用该类await()方法的线程会一直处于阻塞状态,直到其他线程调用countDown()方法使当前计数器的值逐渐减少,到0为止,每次调用countDown计数器的值减1。
当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier。
CountDownLatch实现的是AQS的共享锁机制。
CountDownLatch出现以前,类似功能我们使用线程的join()方法实现。
1.1 重要方法
1.1.1 构造器
1 | public CountDownLatch(int count) { |
其实CountDownLatch的构造器很简单,count入参表示计数器的次数,CountDownLatch将其赋给了state字段,使用AQS的状态值来表示计数器值。
1.1.2 await()
当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:
- 当指定数量的线程都调用了CountDownLatch对象的countDown方法后,也就是说计时器值为0的时候。
- 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。tryAcquireShared方法是CountDownLatch.sync类实现AQS的接口,只判断了getState()是否等于0,这是计数器有别于其他传统共享锁的核心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//这个1,其实在CountDownLatch里面没有用
}
//AQS的获取共享资源时候可被中断的方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
//如果线程被中断则抛异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试看当前是否计数值为0,为0则直接返回,否则进入AQS的队列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);//该方法前文已经论述,详见2.4.2.2 AQS.doAcquireSharedInterruptibly()
}
//CountDownLatch.sync类实现的AQS的接口
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}1.1.3 await(long timeout, TimeUnit unit)
当线程调用了CountDownLatch对象的await(long timeout, TimeUnit unit)方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回: - 当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计时器值为0的时候,这时候返回true
- 设置的timeout时间到了,因为超时而返回false;
- 其它线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException 异常后返回。
也就是相比于await,引入了一个timeout的概念
1 | public boolean await(long timeout, TimeUnit unit) |
1.1.4 countDown()
当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为0则会唤醒所有调用await方法而被阻塞的线程,否则什么都不做,接下来看一下countDown()方法内部是如何调用AQS的方法的,源码如下:
1 | public void countDown() { |
1.2 CountDownLatch的用法
CountDownLatch典型用法:
- 某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1,即调用countDown()。当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
1.3 使用demo
1 | package com.lscherish; |
运行结果
1 | 等待斗地主玩家进场 |
1.4 与join()相比
CountDownLatch与join方法的区别,一个是调用一个子线程的 join()方法后,该线程会一直被阻塞直到该线程运行完毕,而CountDownLatch则使用计数器允许子线程运行完毕或者运行中时候递减计数,也就是CountDownLatch可以在子线程运行任何时候让await方法返回而不一定必须等到线程结束;
另外使用线程池来管理线程时候一般都是直接添加 Runable到线程池这时候就没有办法在调用线程的join方法了,countDownLatch相比Join方法让我们对线程同步有更灵活的控制。
2 Semaphore
Semaphore也叫信号量,在JDK1.5被引入,用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。
打个比喻,Semaphore就像一道阀门,可以控制同时进入某一逻辑的线程数量(构造方法中指定),我们使用acquire方法来争取通行票,使用release方法来归还通行票。通行票只是一个比喻,一般我们称之为许可。
Semaphore和countDownLatch挺像,但countDownLatch的许可只能减少,而Semaphore可以获取/归还
- Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
public Semaphore(int permits) { sync = new NonfairSync(permits); }
- 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
- 访问资源后,使用release释放许可。
- Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
- 当初始值为1时,可以用作互斥锁,并具备不可重入的加锁语义。
- Semaphore将AQS的同步状态(status字段)用于保存当前可用许可的数量。
我们调用Semaphore方法时,其实是在间接调用其内部类或AQS方法执行的。
Semaphore类结构与ReetrantLock类相似,内部类Sync继承自AQS,然后其子类FairSync和NoFairSync分别实现公平锁和非公平锁的获取锁方法tryAcquireShared(int arg)。
而释放锁的tryReleaseShared(int arg)方法则有Sync类实现,因为非公平或公平锁的释放过程都是相同的。
2.1 重要方法
Semaphore在JAVA并发之AQS详解2.4节中有过描述,不论是其公平锁实现还是非公平锁实现,故本文不再赘述,欲了解源码可以阅读JAVA并发之AQS详解2.4节
2.2 使用demo
场景:老师需要4个学生到讲台上填写一张表,但是老师只有2支笔,因此同一时刻只能保证2个学生拿到笔进行填写,没有拿到笔的学生只能等前面的学生填写完毕,再去拿笔进行填写。
1 | package com.lscherish; |
得到结果
1 | 同学10想要拿到笔=== |
3 CyclicBarrier
在之前的介绍中,我们知道CountDownLatch可以实现多个线程协调,在所有指定线程完成后,主线程才执行任务。
和CountDownLatch一样,在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。
但是,CountDownLatch有个缺陷,这点JDK的文档中也说了:他只能使用一次。在有些场合,似乎有些浪费,需要不停的创建 CountDownLatch实例,JDK在CountDownLatch的文档中向我们介绍了CyclicBarrier——循环栅栏,或者叫循环屏障,同步屏障。
CyclicBarrier采用一种屏障的方式来控制线程,让所有线程停在某一点。先到的线程将处于阻塞的状态,直到许可都发出去了才会往下执行。
CyclicBarrier并不是严格意义上的基于AQS实现的,他只不过持有一个ReentrantLock和Condition,重用了AQS的部分逻辑来完善自身。
3.1 属性
1 | /** 定义一个排他锁 */ |
如果将CyclicBarrier比作一个牌桌,需要xx位选手都到齐后才能够发牌
- condition就像是为已经到达,在等待开局的玩家准备的休息室或者等待区
- lock就是进入等待区大门的锁
- parties是表示一共需要多少位选手才能开局
- count表示现在还差多少位选手才能开局。
- generation表示当前的牌局
- barrierCommand表示人到齐后进行的活动,可自定义,如果是斗地主,那么barrierCommand就是斗地主的逻辑,如果是炸金花,那就是炸金花的逻辑。
3.2 构造器
1 | public CyclicBarrier(int parties, Runnable barrierAction) { |
3.3 重要方法
CyclicBarrier 的最重要的方法就是await方法,和CountDownLatch的await方法一样,该方法会将当前线程阻塞,就像是树立了一个栅栏,将线程挡住了,只有所有的线程都到了这个栅栏上,栅栏才会打开。
下面的注解,我们都用斗地主的比喻来描述,使得代码逻辑更加便于理解
3.3.1 await()
1 | public int await() throws InterruptedException, BrokenBarrierException { |
1 | private int dowait(boolean timed, long nanos) |
3.3.2 reset()
reset方法可以重置CyclicBarrier至初始状态
1 | public void reset() { |
1 |
|
1 | /** |