AQS实现之CountDownLatch/Semaphore/CyclicBarrier

前言

  • CountDownLatch:一个单调递减的同步计数器,但无法重置计数的数量。
    • 比喻1:CountDownLatch可以看做一个过山车,假设10个座位,那么司机这个线程,就要调用await()方法,等待10个线程到齐。
    • 每个线程坐进来,就调用一下countDown()方法,表示占用个位置,然后做自己事情去了。最后一个线程进来的时候,调用一下countDown()方法,计数为0了,最后一个线程就会唤醒await的线程,也就是司机线程。
  • Semaphore:一个同步的许可证资源池,负责同步给线程提供一个分发许可和归还许可的地方。
    • 做个比喻,老师让同学们上来在黑板上解题,只有两根粉笔,也就是说,同时只能有两个同学在答题。两根粉笔就可以用Semaphore控制,许可是2。
    • 学生们调用acquire()方法争夺粉笔,同时只有两个人争夺到,答完题后,调用release()方法归还粉笔,这时才会有其他人能获取到粉笔。
  • CyclicBarrier:同步的栅栏,拦住规定数量的线程,让他们阻塞,等到线程数量齐了,让他们执行我们指定的逻辑。
    • 做个比喻,CyclicBarrier可以比作一个牌局,需要固定的参与者都到场才行,先到的人阻塞在那。人到齐后,执行一遍我们指定的逻辑,此时其他线程还在阻塞,由最后一个到场的线程完成指定的逻辑,完成后,这局散场,所有人醒来,各自离开。
    • CyclicBarrier可以完成一局后再开启新的一局,即等待新的参与者到来,并达到指定的数量,然后开局,从此往复。

1 CountDownLatch

  • CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。

  • CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。调用该类await()方法的线程会一直处于阻塞状态,直到其他线程调用countDown()方法使当前计数器的值逐渐减少,到0为止,每次调用countDown计数器的值减1。

  • 当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier。

  • CountDownLatch实现的是AQS的共享锁机制。

  • CountDownLatch出现以前,类似功能我们使用线程的join()方法实现。

1.1 重要方法

1.1.1 构造器

1
2
3
4
5
6
7
8
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

Sync(int count) {
setState(count);
}

其实CountDownLatch的构造器很简单,count入参表示计数器的次数,CountDownLatch将其赋给了state字段,使用AQS的状态值来表示计数器值。

1.1.2 await()

当前线程调用了CountDownLatch对象的await方法后,当前线程会被阻塞,直到下面的情况之一才会返回:

  • 当指定数量的线程都调用了CountDownLatch对象的countDown方法后,也就是说计时器值为0的时候。
  • 其他线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException异常后返回。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);//这个1,其实在CountDownLatch里面没有用
    }

    //AQS的获取共享资源时候可被中断的方法
    public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    //如果线程被中断则抛异常
    if (Thread.interrupted())
    throw new InterruptedException();
    //尝试看当前是否计数值为0,为0则直接返回,否则进入AQS的队列等待
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);//该方法前文已经论述,详见2.4.2.2 AQS.doAcquireSharedInterruptibly()
    }

    //CountDownLatch.sync类实现的AQS的接口
    protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
    }
    tryAcquireShared方法是CountDownLatch.sync类实现AQS的接口,只判断了getState()是否等于0,这是计数器有别于其他传统共享锁的核心

    1.1.3 await(long timeout, TimeUnit unit)

    当线程调用了CountDownLatch对象的await(long timeout, TimeUnit unit)方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
  • 当所有线程都调用了CountDownLatch对象的countDown方法后,也就是计时器值为0的时候,这时候返回true
  • 设置的timeout时间到了,因为超时而返回false;
  • 其它线程调用了当前线程的interrupt()方法中断了当前线程,当前线程会抛出InterruptedException 异常后返回。

也就是相比于await,引入了一个timeout的概念

1
2
3
4
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

1.1.4 countDown()

当前线程调用了该方法后,会递减计数器的值,递减后如果计数器为0则会唤醒所有调用await方法而被阻塞的线程,否则什么都不做,接下来看一下countDown()方法内部是如何调用AQS的方法的,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void countDown() {
sync.releaseShared(1);//委托sync调用AQS的方法
}


//AQS的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();//AQS的释放资源方法,会唤醒head的后继
//后继争锁成功再唤醒后继,使得所有挂起的线程都被唤醒。详见2.4.4.3 AQS.doReleaseShared()
return true;
}
return false;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//循环进行cas,直到当前线程成功完成cas使计数值(状态值state)减一并更新到state
for (;;) {
int c = getState();
if (c == 0)//如果当前状态值为0则直接返回返回false,
return false;//返回false就不用调用doReleaseShared()了
//这里的if (c == 0)貌似是多余的,其实不然,之所以添加if (c == 0)是为了防止计数器值为 0 后,其他线程又调用了countDown方法,如果没有这里,状态值就会变成负数。


int nextc = c-1;//否则,state-1
if (compareAndSetState(c, nextc))
return nextc == 0;//这里如果返回true,说明当前线程是最后一个调用countDown()方法的线程
//那么该线程除了让计数器减一外,还需要唤醒调用CountDownLatch的await方法而被阻塞的线程。
//所以它返回true,tryReleaseShared中则会调用doReleaseShared,唤醒其他节点。
}
}

1.2 CountDownLatch的用法

CountDownLatch典型用法:

  1. 某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1,即调用countDown()。当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  1. 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。

1.3 使用demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.lscherish;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class CountDownLatchTest {

private static AtomicInteger id = new AtomicInteger();

// 创建一个CountDownLatch实例,管理计数为ThreadNum
private static volatile CountDownLatch countDownLatch = new CountDownLatch(3);

public static void main(String[] args) throws InterruptedException {

Thread threadOne = new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
countDownLatch.countDown();
}
});

Thread threadTwo = new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
countDownLatch.countDown();

}
});

Thread threadThree = new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println("【玩家" + id.getAndIncrement() + "】已入场");
countDownLatch.countDown();

}
});

// 启动子线程
threadOne.start();
threadTwo.start();
threadThree.start();
System.out.println("等待斗地主玩家进场");

// 等待子线程执行完毕,返回
countDownLatch.await();

System.out.println("斗地主玩家已经满人,开始发牌.....");

}
}

运行结果

1
2
3
4
5
等待斗地主玩家进场
【玩家0】已入场
【玩家1】已入场
【玩家2】已入场
斗地主玩家已经满人,开始发牌.....

1.4 与join()相比

CountDownLatch与join方法的区别,一个是调用一个子线程的 join()方法后,该线程会一直被阻塞直到该线程运行完毕,而CountDownLatch则使用计数器允许子线程运行完毕或者运行中时候递减计数,也就是CountDownLatch可以在子线程运行任何时候让await方法返回而不一定必须等到线程结束;

另外使用线程池来管理线程时候一般都是直接添加 Runable到线程池这时候就没有办法在调用线程的join方法了,countDownLatch相比Join方法让我们对线程同步有更灵活的控制。

2 Semaphore

Semaphore也叫信号量,在JDK1.5被引入,用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。

打个比喻,Semaphore就像一道阀门,可以控制同时进入某一逻辑的线程数量(构造方法中指定),我们使用acquire方法来争取通行票,使用release方法来归还通行票。通行票只是一个比喻,一般我们称之为许可。

Semaphore和countDownLatch挺像,但countDownLatch的许可只能减少,而Semaphore可以获取/归还

  • Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
    • public Semaphore(int permits) { sync = new NonfairSync(permits); }
  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。
  • Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。
    • public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
  • 当初始值为1时,可以用作互斥锁,并具备不可重入的加锁语义。
  • Semaphore将AQS的同步状态(status字段)用于保存当前可用许可的数量。

我们调用Semaphore方法时,其实是在间接调用其内部类或AQS方法执行的。

Semaphore类结构与ReetrantLock类相似,内部类Sync继承自AQS,然后其子类FairSync和NoFairSync分别实现公平锁和非公平锁的获取锁方法tryAcquireShared(int arg)。

而释放锁的tryReleaseShared(int arg)方法则有Sync类实现,因为非公平或公平锁的释放过程都是相同的。

2.1 重要方法

Semaphore在JAVA并发之AQS详解2.4节中有过描述,不论是其公平锁实现还是非公平锁实现,故本文不再赘述,欲了解源码可以阅读JAVA并发之AQS详解2.4节

2.2 使用demo

场景:老师需要4个学生到讲台上填写一张表,但是老师只有2支笔,因此同一时刻只能保证2个学生拿到笔进行填写,没有拿到笔的学生只能等前面的学生填写完毕,再去拿笔进行填写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.lscherish;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
// 2支笔
private static Semaphore semaphore = new Semaphore(2, true);

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(5);
// 5个学生
for (int i = 1; i <=5; i++) {
service.execute(() -> {
try {
System.out.println("同学"+Thread.currentThread().getId() + "想要拿到笔===");
semaphore.acquire();
System.out.println("同学"+Thread.currentThread().getId() + "拿到笔---");
System.out.println("同学"+Thread.currentThread().getId() + "填写中...");
TimeUnit.SECONDS.sleep(2);
System.out.println("同学"+Thread.currentThread().getId() + "填写完毕,马上归还笔。。。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
service.shutdown();
}
}

得到结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
同学10想要拿到笔===
同学10拿到笔---
同学10填写中...
同学11想要拿到笔===
同学11拿到笔---
同学11填写中...
同学12想要拿到笔===
同学13想要拿到笔===
同学14想要拿到笔===
同学10填写完毕,马上归还笔。。。
同学11填写完毕,马上归还笔。。。
同学13拿到笔---
同学13填写中...
同学12拿到笔---
同学12填写中...
同学13填写完毕,马上归还笔。。。
同学14拿到笔---
同学14填写中...
同学12填写完毕,马上归还笔。。。
同学14填写完毕,马上归还笔。。。

3 CyclicBarrier

  • 在之前的介绍中,我们知道CountDownLatch可以实现多个线程协调,在所有指定线程完成后,主线程才执行任务。

  • 和CountDownLatch一样,在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。

  • 但是,CountDownLatch有个缺陷,这点JDK的文档中也说了:他只能使用一次。在有些场合,似乎有些浪费,需要不停的创建 CountDownLatch实例,JDK在CountDownLatch的文档中向我们介绍了CyclicBarrier——循环栅栏,或者叫循环屏障,同步屏障。

  • CyclicBarrier采用一种屏障的方式来控制线程,让所有线程停在某一点。先到的线程将处于阻塞的状态,直到许可都发出去了才会往下执行。

  • CyclicBarrier并不是严格意义上的基于AQS实现的,他只不过持有一个ReentrantLock和Condition,重用了AQS的部分逻辑来完善自身。

3.1 属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** 定义一个排他锁 */
private final ReentrantLock lock = new ReentrantLock();

/** 创建一个等待队列 ,利用它来对线程进行阻塞 */
private final Condition trip = lock.newCondition();

/** 等待线程的数量,该值在构造时进行赋值。*/
private final int parties;

/* 当栅栏被释放后执行的线程 */
private final Runnable barrierCommand;

/** 当前的一代线程组 他只有一个成员变量来标识当前的barrier是否已“损坏”
* Generation代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。*/
private Generation generation = new Generation();

private static class Generation {
boolean broken = false;
}

/**
* 内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒
* count值是在创建新的一代线程时被重置。
*/
private int count;

如果将CyclicBarrier比作一个牌桌,需要xx位选手都到齐后才能够发牌

  • condition就像是为已经到达,在等待开局的玩家准备的休息室或者等待区
  • lock就是进入等待区大门的锁
  • parties是表示一共需要多少位选手才能开局
  • count表示现在还差多少位选手才能开局。
  • generation表示当前的牌局
  • barrierCommand表示人到齐后进行的活动,可自定义,如果是斗地主,那么barrierCommand就是斗地主的逻辑,如果是炸金花,那就是炸金花的逻辑。

3.2 构造器

1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;//计数器count的初始值被设置为parties
this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
this(parties, null);
}

3.3 重要方法

CyclicBarrier 的最重要的方法就是await方法,和CountDownLatch的await方法一样,该方法会将当前线程阻塞,就像是树立了一个栅栏,将线程挡住了,只有所有的线程都到了这个栅栏上,栅栏才会打开。

下面的注解,我们都用斗地主的比喻来描述,使得代码逻辑更加便于理解

3.3.1 await()

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//上锁,表示每次只能有一个玩家会进入等待区
try {
final Generation g = generation;//获取“当前这局牌局”
if (g.broken)// 如果这局牌局损坏,取消了,抛出异常
throw new BrokenBarrierException();

if (Thread.interrupted()) {// 如果这位玩家临时不玩了(线程中断了),抛出异常
breakBarrier();// 将当前这局牌局(Generation)的损坏状态设置为 true
// 并将count重置为parties的值
// 唤醒其他阻塞在此栅栏上的线程

//相当于这局斗地主还没开就被强行取消了
//要把把这局游戏状态置为取消,然后将还差多少位选手的数量重置为需要的总数量,
//并告诉等待区的玩家:这局取消了,大家出来吧。
throw new InterruptedException();
}

int index = --count;//到这里,说明有一个玩家进入等待区了,表示还差的玩家数量减一
if (index == 0) { // 如果是0 ,离 开局还差的玩家数量是0,表示你刚好是最后一个到齐的人,大家就等你了
boolean ranAction = false;
try {
final Runnable command = barrierCommand;//既然人到齐了,开搞,将设置好的活动取出来
if (command != null)//如果有设置好的活动
command.run();//开搞这个活动,斗地主的斗地主,炸金花的炸金花
ranAction = true;//打个标记,command内容我们玩耍过了
//这里需要注意,我们在哪里搞呢?其实可以理解为在休息室里搞,因为执行command的时候,这局的玩家还困在休息室(阻塞)当中。


nextGeneration();
//开启新局,包括:请现在在等待区的玩家们醒来,你们的牌局结束了,快醒过来离开这里吧
//生成一个新的牌局
//将离新局还差的人数置为需要的总人数
return 0;
} finally {
if (!ranAction)//如果玩耍过程除了问题导致标记没打上
breakBarrier();//就认为有异常,把这局牌局取消,大伙解散
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {//如果到这里了,说明上面的if (index == 0) 你没进去,现在不是三缺一在等你,还有其他人要等
try {
if (!timed)//如果没有时间限制,那么久进入休息室等待
trip.await();//进休息室等吧,注意,到时候出来的时候,也是从这里出来
else if (nanos > 0L)//如果开局时间有时限,则等待指定时间
nanos = trip.awaitNanos(nanos);//时间一过,玩家等不到开局,我就走
} catch (InterruptedException ie) {//如果遇到中断异常,也就等同于遇到当前玩家有急事不能玩的情况
if (g == generation && ! g.broken) {//如果这个玩家是当前正在进行的牌局的玩家,并且当前牌局没有被取消
breakBarrier();//那么就手动取消,解散大伙,因为有个玩家退出了,大家玩不成了
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();//否则,说明这个玩家不是当前正在进行的牌局的玩家
//标记个中断状态,往上传递,不影响现在正在进行的牌局
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)//如果牌局过期了,新牌局已经开始了,说明是正常的
//我从休息室(trip.await()方法)出来,出来的时候新牌局开始了,那么返回index
return index;
// 如果有时间限制,且时间小于等于0,取消牌局,并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();//你进入休息室,不管是一进来就可以开局,立刻打完牌局,还是需要在里面继续等,都要在上锁后解锁。
}
}

3.3.2 reset()

reset方法可以重置CyclicBarrier至初始状态

1
2
3
4
5
6
7
8
9
10
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10

/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
1
2
3
4
5
6
7
8
9
10
11
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
0%