JAVA中断机制

1 中断的含义

有人说中断是让某个线程停止运行的意思,那这如何解释中断可以用来唤醒阻塞中的线程呢?有人说中断是唤醒线程的意思,但很多时候我们确实使用它来停止线程。这到底是怎么回事?

其实中断的意思是:用强制的方式来改变线程的状态,一般使用抛出异常的方式,强行获取到CPU执行权,至于改变状态后,线程是唤醒还是停止,则取决于后续的逻辑:

  • 如果你在catch块捕获异常并什么都不处理,那么等于线程被唤醒了。
  • 如果你一直向上抛出异常,或者在catch块中优雅退出,那么实现的就是将线程停止的目的。

2 中断机制

java并未提供任何直接中断某线程的方法,只提供了中断机制。即线程A向线程B发出“请你改变状态”的请求(线程B也可以自己给自己发送此请求),但线程B并不会立刻停止运行,而是自行选择合适的时机以自己的方式响应中断,当然也可以忽略中断。

也就是说java的中断不能直接控制线程,而是需要被中断的线程自己决定怎么处理,好比是父母叮嘱在外的子女注意身体,但子女是否注意身体,怎么注意身体,则完全取决于他们自己。

3 中断的相关方法

3.1 interrupt()方法和基本概念

interrupt status(中断状态):请记住这个术语,中断机制就是围绕着这个字段来工作的。在Java源码中代表中断状态的字段是Thread类中的:

1
private volatile Interruptible blocker;

对“Interruptible”这个类不需要深入分析,对于“blocker”变量有以下几个操作。

  1.默认blocker=null; ®1

  2.调用方法“interrupt0();”将会导致“该线程的中断状态将被设置(JDK文档中术语)”。®2

  3.再次调用“interrupt0();”将会导致“其中断状态将被清除(同JDK文档中术语)”®3

interrupt()方法并不会中断正在运行的线程,当你的线程正在正常运行时,这个时候调用t.interrupt,除了给该线程置了一个标志位,其他什么反应都没有。当然,如果你在run方法中有判断这个标志位,当其中断为true时有优雅退出的逻辑,或者像很多jdk实现的抛出一个InterruptException异常,利用异常使线程结束,那则另当别论。

“中断“这个词有误区,在语言层面,我们不会真的中断一个运行中的线程,但中断确实可以“中断轻量级阻塞”或者说“唤醒轻量级阻塞”。

“轻量级阻塞”就是调用 Object 类的 wait()、wait(long) 或 wait(long, int) 方法,或者线程类的 join()、join(long)、join(long, int)、sleep(long) 或 sleep(long, int) 方法这几个函数,所造成的线程阻塞。此时线程是处于Waiting状态,会响应t.interrupt,中断阻塞,并直接抛出InterruptException异常。

3.1.1 为什么调用interrupt()并不能中断线程?

我们来看interrupt方法源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Interrupts this thread.
*
* <p> Unless the current thread is interrupting itself, which is
* always permitted, the {@link #checkAccess() checkAccess} method
* of this thread is invoked, which may cause a {@link
* SecurityException} to be thrown.
*
* <p> If this thread is blocked in an invocation of the {@link
* Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link
* Object#wait(long, int) wait(long, int)} methods of the {@link Object}
* class, or of the {@link #join()}, {@link #join(long)}, {@link
* #join(long, int)}, {@link #sleep(long)}, or {@link #sleep(long, int)},
* methods of this class, then its interrupt status will be cleared and it
* will receive an {@link InterruptedException}.
*
* 如果线程堵塞在object.wait、Thread.join和Thread.sleep,将会抛出InterruptedException,同时清除线程的中断状态;
*
*
* <p> If this thread is blocked in an I/O operation upon an {@link
* java.nio.channels.InterruptibleChannel InterruptibleChannel}
* then the channel will be closed, the thread's interrupt
* status will be set, and the thread will receive a {@link
* java.nio.channels.ClosedByInterruptException}.
*

*如果该线程被阻塞在InterruptibleChannel 的I/O操作上,调用该方法,该channel会被关闭,中断状态会被设置,
*并且线程会收到ClosedByInterruptException异常
*
* <p> If this thread is blocked in a {@link java.nio.channels.Selector}
* then the thread's interrupt status will be set and it will return
* immediately from the selection operation, possibly with a non-zero
* value, just as if the selector's {@link
* java.nio.channels.Selector#wakeup wakeup} method were invoked.
*
*如果该线程被阻塞在nio的Selector上,调用该方法,中断状态会被设置,
*并且线程会携带一个非零值立刻返回(其实就是调用了java.nio.channels.Selector#wakeup方法)
*
*
* <p> If none of the previous conditions hold then this thread's interrupt
* status will be set. </p>
*
* <p> Interrupting a thread that is not alive need not have any effect.
*
* @throws SecurityException
* if the current thread cannot modify this thread
*
* @revised 6.0
* @spec JSR-51
*/
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag,只是设置标记
// Interruptible的interrupt方法在AbstractInterruptibleChannel和AbstractSelector类中有各自的实现
//故而如当前方法的注释说的那样(当前方法注释中中文翻译的部分),interrupt对这两种场景有效
//但对于普通线程,其实绝大多数不是上面的中断场景。
//因为thread默认注入的不是AbstractInterruptibleChannel和AbstractSelector类的实现
b.interrupt(this);
return;
}
}
interrupt0();
}

如上是Java源码中的代码,由此我们看出问题的答案。线程的blocker字段(也就是interrupt status)默认是null(®1)。调用interrupt()方法时,只是运行了interrupt0()(设置中断标记),并没有进入if语句,所以没调用真正执行中断的代码b.interrupt().

3.1.2 interrupt()方法使线程提前结束阻塞状态

interrupt()方法使线程提前结束阻塞状态,为什么呢?我们用sleep方法使线程阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Causes the currently executing thread to sleep (temporarily cease
* execution) for the specified number of milliseconds, subject to
* the precision and accuracy of system timers and schedulers. The thread
* does not lose ownership of any monitors.
*
* @param millis
* the length of time to sleep in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public static native void sleep(long millis) throws InterruptedException;

该本地方法会将blocker置为®2,因而此时调用interrupt方法,将执行if中的语句:

1
2
3
4
5
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}

其中,interrupt0();方法将中断标志清除,即置为®3。然后调用核心语句b.interrupt(this),真正的打断线程,并且抛出InterruptedException。

我们来看下b.interrupt(this)为什么能中断线程。

除了上文说到的Interruptible.interrupt在AbstractInterruptibleChannel和AbstractSelector类中有两种实现外,对于正常Thead类,我们注入的Interruptible b并非AbstractInterruptibleChannel和AbstractSelector类中的Interruptible实现。(这两类只在特定场景实现,可见下文:AbstractInterruptibleChannel类中的中断实现)

来看看Thread.blockedOn方法,Thread本身不会初始化它的blocker字段,blocker的值是注入进来的。

1
2
3
4
5
6
7
/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code
*/
void blockedOn(Interruptible b) {
synchronized (blockerLock) {
blocker = b;
}
}

注释说的很明白,该方法被via sun.misc.SharedSecrets调用,注入的b所执行的interrupt(this)方法,其实是native方法,定义在jvm.cpp中:

1
2
3
4
5
6
7
8
9
10
11
12
13
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");

// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END

JVM_Interrupt对参数进行了校验,然后直接调用Thread::interrupt:

1
2
3
4
5
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}

Thread::interrupt调用os::interrupt方法实现,os::interrupt方法定义在os_linux.cpp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");

//获取系统native线程对象
OSThread* osthread = thread->osthread();

if (!osthread->interrupted()) {
osthread->set_interrupted(true);
//内存屏障,使osthread的interrupted状态对其它线程立即可见
OrderAccess::fence();
//前文说过,_SleepEvent用于Thread.sleep,线程调用了sleep方法,则通过unpark唤醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}

//_parker用于concurrent相关的锁,此处同样通过unpark唤醒
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//synchronized同步块和Object.wait() 唤醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;

}

由此可见,interrupt其实就是通过ParkEvent的unpark方法唤醒对象;

3.2 isInterrupted()方法和interrupted()方法

  • isInterrupted()只是判断自己是否已经被置上中断标志
  • interrupted()方法判断自己是否已经被置上中断标志。并且清除线程的中断状态

接下来说一下”interrupted()”和”isInterrupted()”两个方法的相同点和不同点。在这之前看一下源码中两个方法的代码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
public boolean isInterrupted() {
return isInterrupted(false);
}
/**
* Tests if some Thread has been interrupted. The interrupted state
* is reset or not based on the value of ClearInterrupted that is
* passed.
*/
private native boolean isInterrupted(boolean ClearInterrupted);

相同点都是判断线程的interrupt status是否被设置,若被设置返回true,否则返回false.区别有两点:

  • 一:前者是static方法,调用者是current thread,而后者是普通方法,调用者是this current.
  • 二:它们其实都调用了Java中的一个native方法isInterrupted(boolean ClearInterrupted); 不同的是前者传入了参数true,后者传入了false.
  • 意义就是:前者将清除线程的interrupt state(®3),调用后者线程的interrupt state不受影响。

4 AbstractInterruptibleChannel类中的中断实现

如果一个nio通道实现了InterruptibleChannel接口,就可以响应interrupt()中断,其原理就在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannel的方法begin()中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);//设置当前线程的blocker为interruptor
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}

protected final void end(boolean completed)
throws AsynchronousCloseException
{
blockedOn(null);//设置当前线程的blocker为null
Thread interrupted = this.interrupted;
//如果发生中断,Thread.interrupt方法会调用Interruptible的interrupt方法,
//设置this.interrupted为当前线程
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}

以上述代码为例,nio通道的ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上。当线程中断时,Thread.interrupt()触发回调接口Interruptible关闭io通道,导致read方法返回,最后在finally块中执行end()方法检查中断标记,抛出ClosedByInterruptException;

5 AbstractSelector类中的中断实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//java.nio.channels.spi.AbstractSelector
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
AbstractSelector.this.wakeup();
}};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end() {
AbstractInterruptibleChannel.blockedOn(null);
}
//sun.nio.ch.class EPollSelectorImpl
protected int doSelect(long timeout) throws IOException {
......
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
......
}

6 对于中断的处理

既然Java中断机制只是设置被中断线程的中断状态,那么被中断线程该做些什么?

6.1 中断状态的管理

  • 一般说来,当可能阻塞的方法声明中有抛出InterruptedException则暗示该方法是可中断的,如BlockingQueue#put、BlockingQueue#take、Object#wait、Thread#sleep等,如果程序捕获到这些可中断的阻塞方法抛出的InterruptedException或检测到中断后,这些中断信息该如何处理?一般有以下两个通用原则:

    • 如果遇到的是可中断的阻塞方法抛出InterruptedException,可以继续向方法调用栈的上层抛出该异常
    • 如果是检测到中断,则可清除中断状态并抛出InterruptedException,使当前方法也成为一个可中断的方法。
  • 若有时候不太方便在方法上抛出InterruptedException,比如要实现的某个接口中的方法签名上没有throws InterruptedException,这时就可以捕获可中断方法的InterruptedException并通过Thread.currentThread.interrupt()来重新设置中断状态。如果是检测并清除了中断状态,亦是如此。

一般的代码中,尤其是作为一个基础类库时,绝不应当吞掉中断(即捕获到InterruptedException后在catch里什么也不做,清除中断状态后又不重设中断状态也不抛出InterruptedException等)。因为吞掉中断状态会导致方法调用栈的上层得不到这些信息。

当然,凡事总有例外的时候,当你完全清楚自己的方法会被谁调用,而调用者也不会因为中断被吞掉了而遇到麻烦,就可以这么做。

  • 总得来说,就是要让方法调用栈的上层获知中断的发生。假设你写了一个类库,类库里有个方法amethod,在amethod中检测并清除了中断状态,而没有抛出InterruptedException,作为amethod的用户来说,他并不知道里面的细节,如果用户在调用amethod后也要使用中断来做些事情,那么在调用amethod之后他将永远也检测不到中断了,因为中断信息已经被amethod清除掉了。

6.2 中断的响应

  • 程序里发现中断后该怎么响应?这就得视实际情况而定了。有些程序可能一检测到中断就立马将线程终止,有些可能是退出当前执行的任务,继续执行下一个任务……作为一种协作机制,这要与中断方协商好,当调用interrupt会发生些什么都是事先知道的,如做一些事务回滚操作,一些清理工作,一些补偿操作等。若不确定调用某个线程的interrupt后该线程会做出什么样的响应,那就不应当中断该线程。
0%