线程池源码分析--ThreadPoolExecutor

序言

我们知道,线程池帮我们重复管理线程,避免创建大量的线程增加开销。
合理的使用线程池能够带来3个很明显的好处:

  1. 降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
  2. 提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
  3. 提高线程的可管理性:线程池可以统一管理、分配、调优和监控。
    java源生的线程池,实现于ThreadPoolExecutor类,这也是我们今天讨论的重点

    1. ThreadPoolExecutor类构造方法

    Jdk使用ThreadPoolExecutor类来创建线程池,我们来看看它的构造方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • int corePoolSize, //核心线程的数量

  • int maximumPoolSize, //最大线程数量

  • long keepAliveTime, //超出核心线程数量以外的线程空闲时,线程存活的时间

  • TimeUnit unit, //存活时间的单位,有如下几种选择

    1
    2
    3
    4
    5
    6
    7
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS; //小时
    TimeUnit.MINUTES; //分钟
    TimeUnit.SECONDS; //秒
    TimeUnit.MILLISECONDS; //毫秒
    TimeUnit.MICROSECONDS; //微妙
    TimeUnit.NANOSECONDS; //纳秒
  • BlockingQueue workQueue, //保存待执行任务的队列,常见的也有如下几种:

    1
    2
    3
    4
    5
    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;
    PriorityBlockingQueue
    ...

  • ThreadFactory threadFactory, //创建新线程使用的工厂

  • RejectedExecutionHandler handler // 当任务无法执行时的处理器(线程拒绝策略)

    2. 核心类变量

    2.1 ctl变量

    ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量,它一个int值可以储存两个概念的信息:

  • workerCount:表明当前池中有效的线程数,通过workerCountOf方法获得,workerCount上限是(2^29)-1。(最后存放在ctl的低29bit)

  • runState:表明当前线程池的状态,通过workerCountOf方法获得,最后存放在ctl的高3bit中,他们是整个线程池的运行生命周期,有如下取值,分别的含义是:

    1. RUNNING:可以新加线程,同时可以处理queue中的线程。线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,
    2. SHUTDOWN:不增加新线程,但是可以处理queue中的线程。调用线程池的shutdown()方法时,线程池由RUNNING -> SHUTDOWN。
    3. STOP 不增加新线程,同时不处理queue中的线程,会中断正在处理任务的线程。调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
    4. TIDYING 当所有的任务已终止,ctl记录的”任务数量”为0,阻塞队列为空,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
    5. TERMINATED 线程池彻底终止,就变成TERMINATED状态。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

COUNT_BITS=32(integer的size)-3=29,于是五种状态左移29位分别是:

  • RUNNING: 11100000000000000000000000000000
  • SHUTDOWN: 00000000000000000000000000000000
  • STOP: 00100000000000000000000000000000
  • TIDYING: 01000000000000000000000000000000
  • TERMINATED:01100000000000000000000000000000
    而ThreadPoolExecutor是通过runStateOf和workerCountOf获得者两个概念的值的。

用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。

runStateOf和workerCountOf方法是如何剥离出ctl变量的两个有效值呢?这其中我们可以看到CAPACITY是实现一个字段存两个值的最重要的字段。

2.2 CAPACITY变量

CAPACITY=(1 << COUNT_BITS) – 1 转成二进制为:000 11111111111111111111111111111,他是线程池理论上可以允许的最大的线程数。
所以很明显,它的重点在于,其高3bit为0,低29bit为1;
这样,workderCountOf方法中,CAPACITY和ctl进行&运算时,它能获得高3位都是0,低29位和ctl低29位相同的值,这个值就是workerCount
同理,runStateOf方法,CAPACITY的取反和ctl进行&操作,获得高3位和ctl高三位相等,低29位都为0的值,这个值就是runState

2.3 workQueue

1
2
3
4
5
6
7
8
9
10
11
12
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;

一个BlockingQueue队列,本身的结构可以保证访问的线程安全(这里不展开了)。这是一个排队等待队列。当我们线程池里线程达到corePoolSize的时候,一些需要等待执行的线程就放在这个队列里等待。

2.4 workers

1
2
3
4
5
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

一个HashSet的集合。线程池里所有可以立即执行的线程都放在这个集合里。这也是我们直观理解的线程的池子

2.5 mainLock

1
private final ReentrantLock mainLock = new ReentrantLock();

mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

2.6 其他重要属性

1
2
3
4
5
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数

private long completedTaskCount; //用来记录已经执行完毕的任务个数

private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间

3 核心内部类

3.1 Worker

Worker类是线程池中具化一个线程的对象,线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。我们来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
// 在调用runWorker()前,禁止interrupt中断,在interruptIfStarted()方法中会判断 getState()>=0
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//根据当前worker创建一个线程对象
//当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,而是调用当前worker.run()时调用firstTask.run()
//后面在addworker中,我们会启动worker对象中组合的Thread,而我们的执行逻辑runWorker方法是在worker的run方法中被调用。
//为什么执行thread的run方法会调用worker的run方法呢,原因就是在这里进行了注入,将worker本身this注入到了thread中
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}//runWorker()是ThreadPoolExecutor的方法

// Lock methods
//
// The value 0 represents the unlocked state. 0代表“没被锁定”状态
// The value 1 represents the locked state. 1代表“锁定”状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 尝试获取锁
* 重写AQS的tryAcquire(),AQS本来就是让子类来实现的
*/
protected boolean tryAcquire(int unused) {
//尝试一次将state从0设置为1,即“锁定”状态,但由于每次都是state 0->1,而不是+1,那么说明不可重入
//且state==-1时也不会获取到锁
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 尝试释放锁
* 不是state-1,而是置为0
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 中断(如果运行)
* shutdownNow时会循环对worker线程执行
* 且不需要获取worker锁,即使在worker运行时也可以中断
*/
void interruptIfStarted() {
Thread t;
//如果state>=0、t!=null、且t没有被中断
//new Worker()时state==-1,说明不能中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

Worker这个工作线程实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;

firstTask用来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

之所以要将thread和firstTask封装成一个Worker,是因为线程池需要利用Worker来实现工作线程的加锁,以及控制中断。

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

4 核心方法

好了,基本上我们将线程池的几个主角,ctl,workQueue,workers,Worker简单介绍了一遍,现在,我们来看看线程池是怎么玩的。

4.1 线程的运行

我们先来看一个简单的线程池的运行流程图:

4.1.1 execute方法

这是线程池实现类外露供给外部实现提交线程任务command的核心方法,对于无需了解线程池内部的使用者来说,这个方法就是把某个任务交给线程池,正常情况下,这个任务会在未来某个时刻被执行,实现和注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
* 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了)
* 或者 从我们进入这个方法后,pool被关闭了
* 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启一个线程
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展)
* 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
*/
int c = ctl.get();
// 1、如果当前线程数少于corePoolSize(addWorker()操作已经包含对线程池状态的判断,所以此处没判断状态,而入workQueue前判断了)
if (workerCountOf(c) < corePoolSize) {
//则创建并启动一个线程来执行这个任务
//第一个参数为command,说明表示新建一个worker线程,指定firstTask初始任务为command
//第二个参数为true代表占用corePoolSize,false占用maxPoolSize
if (addWorker(command, true))
return;

/**
* 如果没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
* 失败的原因可能是:
* 1、线程池已经shutdown,shutdown的线程池不再接收新任务
* 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
*/
c = ctl.get();
}
/**
* 2、此时,workerCount >= corePoolSize,任务要加入队列,如果线程池是RUNNING状态,且入队列成功(阻塞队列未满)
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();

/**
* 再次校验放入workerQueue中的任务是否能被执行。
* 如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务,执行拒绝策略
*/
//如果再次校验过程中,线程池不是RUNNING状态,那么任务出列,remove(command)--workQueue.remove()成功
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);

//走到这里,说明线程池是RUNNING状态。
//如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
//为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢??
//只保证有一个worker线程可以从queue中获取任务执行就行了??
//是的,因为只要还有活动的worker线程,就可以消费workerQueue中的任务
else if (workerCountOf(recheck) == 0)
//第一个参数为null,说明只为新建一个worker线程,没有指定firstTask初始任务
//第二个参数为true代表占用corePoolSize,false占用maxPoolSize
addWorker(null, false);
}
/**
* 3、如果线程池不是running状态 或者 无法入队列
* 尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
*/
else if (!addWorker(command, false))
reject(command);
}

我们可以简单归纳如下:

4.1.2 addWorker

在execute方法中,我们看到核心的逻辑是由addWorker方法来实现的,当我们将一个任务提交给线程池,线程池会如何处理,就是主要由这个方法加以规范:

该方法有两个参数:

  1. firstTask: worker线程的初始任务,可以为空
  2. core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限

排列组合,addWorker方法有4种传参的方式:

  1. addWorker(command, true)
  2. addWorker(command, false)
  3. addWorker(null, false)
  4. addWorker(null, true)

在execute方法中就使用了前3种,结合这个核心方法进行以下分析

  • 第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
  • 第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
  • 第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
  • 第四个:这个方法就是放一个空的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
* 检查根据当前线程池的状态和给定的边界(core or maximum)是否可以创建一个新的worker
* 如果是这样的话,worker的数量做相应的调整,如果可能的话,创建一个新的worker并启动,参数中的firstTask作为worker的第一个任务
* 如果方法返回false,可能因为pool已经关闭或者调用过了shutdown
* 如果线程工厂创建线程失败,也会失败,返回false
* 如果线程创建失败,要么是因为线程工厂返回null,要么是发生了OutOfMemoryError
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//外层循环,负责判断线程池状态
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
/**
* 线程池的state越小越是运行状态,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
* 要想这个if为true,线程池state必须已经至少是shutdown状态了
* 这时候以下3个条件任意一个是false都会进入if语句,即无法addWorker():
* 1,rs == SHUTDOWN (隐含:rs>=SHUTDOWN)false情况: 线程池状态已经超过shutdown,
* 可能是stop、tidying、terminated其中一个,即线程池已经终止
* 2,firstTask == null (隐含:rs==SHUTDOWN)false情况: firstTask不为空,rs==SHUTDOWN 且 firstTask不为空,
* return false,场景是在线程池已经shutdown后,还要添加新的任务,拒绝
* 3,! workQueue.isEmpty() (隐含:rs==SHUTDOWN,firstTask==null)false情况: workQueue为空,
* 当firstTask为空时是为了创建一个没有任务的线程,再从workQueue中获取任务,
* 如果workQueue已经为空,那么就没有添加新worker线程的必要了
* return false,
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内层循环,负责worker数量+1
for (;;) {
int wc = workerCountOf(c);
//入参core在这里起作用,表示加入的worker是加入corePool还是非corepool,换句话说,受到哪个size的约束
//如果worker数量>线程池最大上限CAPACITY(即使用int低29位可以容纳的最大值)
//或者( worker数量>corePoolSize 或 worker数量>maximumPoolSize ),即已经超过了给定的边界,不添加worker
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS尝试增加线程数,,如果成功加了wc,那么break跳出检查
//如果失败,证明有竞争,那么重新到retry。
if (compareAndIncrementWorkerCount(c))
break retry;
//如果不成功,重新获取状态继续检查
c = ctl.get(); // Re-read ctl
//如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// else CAS失败时因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
}
}
//worker数量+1成功的后续操作
// 添加到workers Set集合,并启动worker线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建worker//构造方法做了三件事//1、设置worker这个AQS锁的同步状态state=-1
w = new Worker(firstTask); //2、将firstTask设置给worker的成员变量firstTask
//3、使用worker自身这个runnable,调用ThreadFactory创建一个线程,并设置给worker的成员变量thread
final Thread t = w.thread;
if (t != null) {
//获取重入锁,并且锁上
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// rs!=SHUTDOWN ||firstTask!=null
// 如果线程池在运行running<shutdown 或者
// 线程池已经shutdown,且firstTask==null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
// worker数量-1的操作在最后的addWorkerFailed()
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // // precheck that t is startable 线程已经启动,抛非法线程状态异常
throw new IllegalThreadStateException();
workers.add(w);
//设置最大的池大小largestPoolSize,workerAdded设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//如果往HashSet中添加worker成功,启动线程
//通过t.start()方法正式执行线程。在这里一个线程才算是真正的执行起来了。
t.start();
workerStarted = true;
}
}
} finally {
//如果启动线程失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

同样的,我们可以归纳一下:

4.1.3 runWorker 方法

在addWorker方法中,我们将一个新增进去的worker所组合的线程属性thread启动了,但我们知道,在worker的构造方法中,它将自己本身注入到了thread的target属性里,所以绕了一圈,线程启动后,调用的还是worker的run方法,而在这里面,runWorker定义了线程执行的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
* 我们可能使用一个初始化任务开始,即firstTask为null
* 然后只要线程池在运行,我们就从getTask()获取任务
* 如果getTask()返回null,则worker由于改变了线程池状态或参数配置而退出
* 其它退出因为外部代码抛异常了,这会使得completedAbruptly为true,这会导致在processWorkerExit()方法中替换当前线程
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
* 在任何任务执行之前,都需要对worker加锁去防止在任务运行时,其它的线程池中断操作
* clearInterruptsForTaskRun保证除非线程池正在stoping,线程不会被设置中断标示
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
* 每个任务执行前会调用beforeExecute(),其中可能抛出一个异常,这种情况下会导致线程die(跳出循环,且completedAbruptly==true),没有执行任务
* 因为beforeExecute()的异常没有cache住,会上抛,跳出循环
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
//标识线程是不是异常终止的
boolean completedAbruptly = true;
try {
//task不为null的情况一般是线程数小于核心数时,如果task为null,则去队列中取任务--->getTask()
//可以看到,只要getTask方法被调用且返回null,那么worker必定被销毁,因为一旦while进不去,就会执行processWorkerExit方法,销毁线程。

//而确定一个线程能否获取得到task的逻辑,在getTask方法中
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//线程开始执行之前执行此方法,可以实现Worker未执行退出,本类中未实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//runWorker方法最本质的存在意义,就是调用task的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//线程执行后执行,可以实现标识Worker异常中断的功能,本类中未实现
afterExecute(task, thrown);
}
} finally {
task = null;//运行过的task标null
w.completedTasks++;
w.unlock();
}
}
//标识线程不是异常终止的,是因为不满足while条件,被迫销毁的
completedAbruptly = false;
} finally {
//处理worker退出的逻辑
processWorkerExit(w, completedAbruptly);
}
}

我们归纳:

4.1.4 getTask方法

runWorker方法中的getTask()方法是线程处理完一个任务后,从队列中获取新任务的实现,也是处理判断一个线程是否应该被销毁的逻辑所在:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of: 以下情况会返回null
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 超过了maximumPoolSize设置的线程数量(因为调用了setMaximumPoolSize())
* 2. The pool is stopped.
* 线程池被stop
* 3. The pool is shutdown and the queue is empty.
* 线程池被shutdown,并且workQueue空了
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
* 线程等待任务超时
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
* 返回null表示这个worker要结束了,这种情况下workerCount-1
*/
private Runnable getTask() {
// timedOut 主要是判断后面的poll是否要超时
boolean timedOut = false; // Did the last poll() time out?

/**
* 用于判断线程池状态
*/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
/**
* 对线程池状态的判断,两种情况会workerCount-1,并且返回null
* 1,线程池状态为shutdown,且workQueue为空(反映了shutdown状态的线程池还是要执行workQueue中剩余的任务的)
* 2,线程池状态为>=stop(只有TIDYING和TERMINATED会大于stop)(shutdownNow()会导致变成STOP)(此时不用考虑workQueue的情况)
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//循环的CAS减少worker数量,直到成功
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?

//allowCoreThreadTimeOut字段,表示是否允许核心线程超过闲置时间后被摧毁,默认为false
//我们前面说过,如果getTask方法返回null,那么这个worker只有被销毁一途
//于是这个timed有3种情况
//(1)当线程数没有超过核心线程数,且默认allowCoreThreadTimeOut为false时
// timed值为false。看下面if的判断逻辑,除非目前线程数大于最大值,否则下面的if始终进不去,该方法不可能返回null,worker也就不会被销毁。
// 因为前提"线程数不超过核心线程数"与"线程数大于最大值"两个命题互斥,所以(1)情况,逻辑进入下面的if(返回null的线程销毁逻辑)的可能性不存在。
// 也就是说,当线程数没有超过核心线程数时,线程不会被销毁。
//(2)当当前线程数超过核心线程数,且默认allowCoreThreadTimeOut为false时
// timed值为true。
//(3)如果allowCoreThreadTimeOut为true,则timed始终为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//wc > maximumPoolSize则必销毁,因为这情况下,wc>1也肯定为true
//wc <= maximumPoolSize,且(timed && timedOut) = true,这种情况下一般也意味着worker要被销毁,因为超时一般是由阻塞队列为空造成的,所以workQueue.isEmpty()也大概率为真,进入if逻辑。

//一般情况是这样,那不一般的情况呢?阻塞队列没有为空,但是因为一些原因,还是超时了,这时候取决于wc > 1,它为真就销毁,为假就不销毁。
// 也就是说,如果阻塞队列还有任务,但是wc=1,线程池里只剩下自己这个线程了,那么就不能销毁,这个if不满足,我们的代码继续往下走
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//如果timed为true那么使用poll取线程。否则使用take()
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
workQueue.take();
//workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
//如果正常返回,那么返回取到的task。
if (r != null)
return r;
//否则,设为超时,重新执行循环,
timedOut = true;
} catch (InterruptedException retry) {
//当线程阻塞在从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程
timedOut = false;
}
}

归纳:

4.1.5 processWorkerExit方法

在runWorker方法中,我们看到当不满足while条件后,线程池会执行退出线程的操作,这个操作,就封装在processWorkerExit方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//参数:
//worker: 要结束的worker
//completedAbruptly: 是否突然完成(是否因为异常退出)

/**
* 1、worker数量-1
* 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
* 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊
decrementWorkerCount();

/**
* 2、从Workers Set中移除worker
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数
workers.remove(w); //从HashSet<Worker>中移除
} finally {
mainLock.unlock();
}

/**
* 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
* 主要是判断线程池是否满足终止的状态
* 如果状态满足,但线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
* 没有线程了,更新状态为tidying->terminated
*/
tryTerminate();

/**
* 4、是否需要增加worker线程
* 线程池状态是running 或 shutdown
* 如果当前线程是突然终止的,addWorker()
* 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
* 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
*/
int c = ctl.get();
//如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker
if (runStateLessThan(c, STOP)) {
//不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker()
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize

//如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;

//如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个
if (workerCountOf(c) >= min)
return; // replacement not needed
}

//添加一个没有firstTask的worker
//只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态
addWorker(null, false);
}
}

总而言之:如果线程池还没有完全终止,就仍需要保持一定数量的线程。

线程池状态是running 或 shutdown的情况下:

  • 如果当前线程是突然终止的,addWorker()
  • 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()

故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程。


4.1.6 submit方法

前面我们讲过execute方法,其作用是将一个任务提交给线程池,以期在未来的某个时间点被执行。

submit方法在作用上,和execute方法是一样的,将某个任务提交给线程池,让线程池调度线程去执行它。

那么它和execute方法有什么区别呢?我们来看看submit方法的源码:
submit方法的实现在ThreadPoolExecutor的父类AbstractExecutorService类中,有三种重载方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    /**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该Future的get方法在成功完成时将会返回null。
* submit 参数: task - 要提交的任务 返回:表示任务等待完成的 Future
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* 提交一个Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。
* submit 参数: task - 要提交的任务 result - 完成任务时要求返回的结果
* 返回: 表示任务等待完成的 Future
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* 提交一个Callable的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get
方法在成功完成时将会返回该任务的结果。
* 如果想立即阻塞任务的等待,则可以使用 result =
exec.submit(aCallable).get(); 形式的构造。
* 参数: task - 要提交的任务 返回: 表示任务等待完成的Future
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

源码很简单,submit方法,将任务task封装成FutureTask(newTaskFor方法中就是new了一个FutureTask),然后调用execute。所以submit方法和execute的所有区别,都在这FutureTask所带来的差异化实现上

总而言之,submit方法将一个任务task用future模式封装成FutureTask对象,提交给线程执行,并将这个FutureTask对象返回,以供主线程该任务被线程池执行之后得到执行结果

注意,获得执行结果的方法FutureTask.get(),会阻塞执行该方法的线程,尤其是当任务被DiscardPolicy策略和DiscardOldestPolicy拒绝的时候,get方法会一直阻塞在那里,所以我们最好使用自带超时时间的future。

4.2 线程池的关闭

4.2.1 shutdown方法

讲完了线程池的基本运转过程,在方法章的最后,我们来看看负责线程池生命周期最后收尾工作的几个重要方法,首先是shutdown方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
* 开始一个顺序的shutdown操作,shutdown之前被执行的已提交任务,新的任务不会再被接收了。如果线程池已经被shutdown了,该方法的调用没有其他任何效果了。
* **该方法不会等待之前已经提交的任务执行完毕**,awaitTermination方法才有这个效果。
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断是否可以操作关闭目标线程。
checkShutdownAccess();
//advanceRunState方法,参数:目标状态;作用:一直执行,直到成功利用CAS将状态置为目标值。
//设置线程池状态为SHUTDOWN,此处之后,线程池中不会增加新Task
advanceRunState(SHUTDOWN);
//中断所有的空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试进行terminate操作,但其实我们上面将状态置为shutdown,就已经算是“中止”了一个线程池了,它不会再执行任务,于外部而言,已经失去了作用。而这里,也只是尝试去将线程池的状态一撸到底而已,并不是一定要terminate掉。该方法我们后面会说到。
tryTerminate();
}

我们可以看到,shutdown方法只不过是中断唤醒了所有阻塞的线程,并且把线程池状态置为shutdown,正如注释所说的,它没有等待所有正在执行任务的线程执行完任务,把状态置为shutdown,已经足够线程池丧失基本的功能了。

在该方法中,线程池如何中断线程是我们最需要关心的,我们来看一下interruptIdleWorkers方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void interruptIdleWorkers(boolean onlyOne) {//参数onlyOne表示是否只中断一个线程就退出,在shutdown中该值为false。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历workers 对所有worker做中断处理。
for (Worker w : workers) {
Thread t = w.thread;
// w.tryLock()对Worker获取锁,因为正在执行的worker已经加锁了(见runWorker方法,w.lock()语句)
//所以这保证了正在运行执行Task的Worker不会被中断。只有阻塞在getTask方法的空闲线程才会进这个if判断(被中断),但中断不代表线程立刻停止,它要继续处理到阻塞队列为空时才会被销毁。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

我们可以看到,在中断方法中,我们调用了worker的tryLock方法去尝试获取worker的锁,所以我们说,worker类这一层的封装,是用来控制线程中断的,正在执行任务的线程已经上了锁,无法被中断,只有在获取阻塞队列中的任务的线程(我们称为空闲线程)才会有被中断的可能。

之前我们看过getTask方法,在这个方法中, worker是不加锁的,所以可以被中断。我们为什么说“中断不代表线程立刻停止,它要继续处理到阻塞队列为空时才会被销毁”呢?具体逻辑,我们再来看一下getTask的源码,以及我们的注释(我们模拟中断发生时的场景):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

/**
* 当执行过程中抛出InterruptedException 的时候,该异常被catch住,逻辑重新回到这个for循环
* catch块在getTask方法的最后。
*/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
/**
* 因为逻辑是在抛出中断异常后来到这里的,那说明线程池的状态已经在shutdown方法中被置为shutdown了,rs >= SHUTDOWN为true,rs >=STOP为false(只有TIDYING和TERMINATED状态会大于stop)
* 这时候,如果workQueue为空,判断为真,线程被销毁。
* 否则,workQueue为非空,判断为假,线程不会进入销毁逻辑。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//循环的CAS减少worker数量,直到成功
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?

//因为在catch块中,timeOut已经为false了。
//所以只要不发生当前线程数超过最大线程数这种极端情况,命题(wc > maximumPoolSize || (timed && timedOut)一定为false,线程依旧不被销毁。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//继续执行正常的从阻塞队列中取任务的逻辑,直到阻塞队列彻底为空,这时候,上面第一个if判断符合,线程被销毁,寿命彻底结束。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//如果正常返回,那么返回取到的task。
if (r != null)
return r;
//否则,设为超时,重新执行循环,
timedOut = true;
} catch (InterruptedException retry) {
//捕获中断异常
timedOut = false;
}
}
}

总结:正阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,不再阻塞获取任务。捕获中断异常后,将继续循环到getTask()最开始的判断线程池状态的逻辑,当线程池是shutdown状态,且workQueue.isEmpty时,return null,进行worker线程退出逻辑

所以,这就是我们为什么说,shutdown方法不会立刻停止线程池,它的作用是阻止新的任务被添加进来(逻辑在addWorker方法的第一个if判断中,可以返回去看一下),并且继续处理完剩下的任务,然后tryTerminated,尝试关闭。

4.2.2 tryTerminate方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
* 在以下情况将线程池变为TERMINATED终止状态
* shutdown 且 正在运行的worker 和 workQueue队列 都empty
* stop 且 没有正在运行的worker
*
* 这个方法必须在任何可能导致线程池终止的情况下被调用,如:
* 减少worker数量
* shutdown时从queue中移除任务
*
* 这个方法不是私有的,所以允许子类ScheduledThreadPoolExecutor调用
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 线程池是否需要终止
* 如果以下3中情况任一为true,return,不进行终止
* 1、还在运行状态
* 2、状态是TIDYING、或 TERMINATED,已经终止过了
* 3、SHUTDOWN 且 workQueue不为空
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* 只有shutdown状态 且 workQueue为空,或者 stop状态能执行到这一步
* 如果此时线程池还有线程(正在运行任务或正在等待任务,总之count不等于0)
* 中断唤醒一个正在等任务的空闲worker
*(中断唤醒的意思就是让阻塞在阻塞队列中的worker抛出异常,然后重新判断状态,getTask方法逻辑)
* 线程被唤醒后再次判断线程池状态,会return null,进入processWorkerExit()流程(runWorker逻辑)
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);//中断workers集合中的空闲任务,参数为true,只中断一个。(该逻辑的意义应该在于通知被阻塞在队列中的线程:别瞎jb等了,这个线程池都要倒闭了,赶紧收拾铺盖准备销毁吧你个逼玩意儿)。
//尝试终止失败,返回。可能大家会有疑问,shutdown只调用了一次tryTerminate方法,如果一次尝试失败了,是不是就意味着shutdown方法很可能最终无法终止线程池?
//其实看注释,我们知道线程池在进行所有负面效益的操作时都会调用该方法尝试终止,上面我们中断了一个阻塞线程让他被销毁,他销毁时也会尝试终止(这其中又唤醒了一个阻塞线程去销毁),以此类推,直到最后一个线程执行tryTerminate时,逻辑才有可能走到下面去。
return;
}
/**
* 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//CAS:将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法),期间ctl有变化就会失败,会再次for循环
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//方法为空,需子类实现
terminated();
} finally {
//将状态置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//最后执行termination.signalAll(),并唤醒所有等待线程池终止这个Condition的线程(也就是调用了awaitTermination方法的线程,这个方法的作用是阻塞调用它的线程,直到调用该方法的线程池真的已经被终止了。)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

总结一下:tryTerminate被调用的时机主要有:

  1. shutdown方法时
  2. processWorkerExit方法销毁一个线程时
  3. addWorkerFailed方法添加线程失败或启动线程失败时
  4. remove方法,从阻塞队列中删掉一个任务时

4.2.3 shutdownNow方法

我们知道,shutdown后线程池将变成shutdown状态,此时不接收新任务,但会处理完正在运行的 和 在阻塞队列中等待处理的任务。

我们接下来要说的shutdownNow方法,作用是:shutdownNow后线程池将变成stop状态,此时不接收新任务,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程。
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
* 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
* 这个任务列表是从任务队列中排出(删除)的
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
* 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
* 除了尽力尝试停止运行中的任务,没有任何保证
* 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断调用者是否有权限shutdown线程池
checkShutdownAccess();
//CAS+循环设置线程池状态为stop
advanceRunState(STOP);
//中断所有线程,包括正在运行任务的
interruptWorkers();
//将workQueue中的元素放入一个List并返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
//返回workQueue中未执行的任务
return tasks;
}

interruptWorkers 很简单,循环对所有worker调用 interruptIfStarted,其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt

需要注意的是,对于运行中的线程调用Thread.interrupt并不能保证线程被终止,task.run内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束

4.2.4 awaitTermination方法

该方法的作用是等待线程池终止,参数是timeout:超时时间和unit: timeout超时时间的单位,返回结果:true:线程池终止,false:超过timeout指定时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
//是否terminated终止
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
//是否已经超过超时时间
if (nanos <= 0)
return false;
//核心逻辑:看注释我们能知道,该方法让调用线程等待一段时间,直到被唤醒(有且仅有之前我们说过的tryTerminate方法中的 termination.signalAll()),或者被异常中断,或者传入了nanos时间参数流逝完。
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}

termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待

阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):

  1. 如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出

  2. 如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败

  3. 如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞

综上,要想优雅的关闭线程池,我们应该:

1
2
3
4
5
6
7
8
9
executorService.shutdown();
try{
while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for terminate");
}
}
catch (InterruptedException e) {
//中断处理
}

5 拒绝策略

我们最后来看一下线程池构造函数的最后一个参数:RejectedExecutionHandler。

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。

拒绝策略是一个接口,其设计如下:

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

四个拒绝策略的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}


/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

核心的rejectedExecution方法,在ThreadPoolExecutor中被reject方法调用:

1
2
3
4
5
6
7
8

/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

而reject方法在execute方法中被调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 /**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池已经关闭,并且当前任务成功从队列中移除
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
// 如果添加任务至队列中失败,执行拒绝策略
reject(command);
}
0%