由几个小例引发的对interrupt()、LockSupport.park()深入解析

在之前解析interrupt()源码的小文最后,曾经列出过一些例子,主要是关于标志位更改对LockSupport.park()的影响,这里我觉得有必要单独花一个篇幅来研究,因为AQS就涉及到了这方面的问题。通过本文,可以对Thread.interrupted()有一个牢固的掌握,对interrupt()有更进一步的理解,对LockSupport.park()、LockSupport.unpark()挂起唤醒机制有一个清晰的认识。本篇应该算是《线程生命周期 & 中断机制》的姊妹篇。

4个例子

首先是第1例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MultInterruptParkDemo {
public static volatile boolean flag = true;
public static void main(String[] args) {
ThreadDemo04 t4 = new ThreadDemo04();
t4.start();
t4.interrupt();
flag = false;
}
public static class ThreadDemo04 extends Thread {
@Override
public void run() {
while (flag) {
}
LockSupport.park();
System.out.println("本打印出现在第一个park()之后");
LockSupport.park();
System.out.println("本打印出现在第二个park()之后");
}
}
}

本例的输出为:

1
2
3
4
本打印出现在第一个park()之后
本打印出现在第二个park()之后
Process finished with exit code 0

也就是说,一次中断操作后无论线程调用多少次LockSupport.park(),程序都不会挂起,而是正常运行结束。

接下来把程序改成如下所示的第2例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MultInterruptParkDemo {
public static volatile boolean flag = true;
public static void main(String[] args) {
ThreadDemo04 t4 = new ThreadDemo04();
t4.start();
LockSupport.unpark(t4);
LockSupport.unpark(t4);
LockSupport.unpark(t4);
flag = false;
}
public static class ThreadDemo04 extends Thread {
@Override
public void run() {
while (flag) {
}
LockSupport.park();
System.out.println("本打印出现在第一个park()之后");
LockSupport.park();
System.out.println("本打印出现在第二个park()之后");
}
}
}

本例的输出为:

1
本打印出现在第一个park()之后

程序在输出了第一条打印语句后挂起,说明无论调用多少次LockSupport.unpark(t4),只会提供给线程一个许可。

继续更改代码,将更改的代码设为第3例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MultInterruptParkDemo {
public static volatile boolean flag = true;
public static void main(String[] args) {
ThreadDemo04 t4 = new ThreadDemo04();
t4.start();
t4.interrupt();
flag = false;
}
public static class ThreadDemo04 extends Thread {
@Override
public void run() {
while (flag) {
}
LockSupport.park();
System.out.println("本打印出现在第一个park()之后");
System.out.println(Thread.interrupted());
System.out.println(Thread.interrupted());
LockSupport.park();
System.out.println("本打印出现在第二个park()之后");
}
}
}

本例的输出为:

1
2
3
本打印出现在第一个park()之后
true
false

程序打印了上述结果后挂起,而第1例中interrupt()在没有两个System.out.println(Thread.interrupted())语句(采用两个的原因是方便看出标志位已经清零)之前,程序是会正常结束的。可能你认为这是理所当然的,因为标志位清零后第二句LockSupport.park()自动将线程挂起,那么我们在看下面这第4例,用一个sleep()取代第一个LockSupport.park():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MultInterruptParkDemo {
public static volatile boolean flag = true;
public static void main(String[] args) {
ThreadDemo04 t4 = new ThreadDemo04();
t4.start();
t4.interrupt();
flag = false;
}
public static class ThreadDemo04 extends Thread {
@Override
public void run() {
while (flag) {
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("本打印出现在第一个sleep()之后");
System.out.println(Thread.interrupted());
System.out.println(Thread.interrupted());
LockSupport.park();
System.out.println("本打印出现在第二个park()之后");
}
}
}

输出如下:

1
2
3
4
5
6
7
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.demo.MultInterruptParkDemo$ThreadDemo04.run(MultInterruptParkDemo.java:34)
本打印出现在第一个sleep()之后
false
false
本打印出现在第二个park()之后

程序运行结束,没有挂起。

上面的结果表示,虽然标志位更改了,但是之前响应中断的是sleep()而不是park(),那么第二个park()不会让程序阻塞。

从源码找答案

追踪Thread.interrupted()源码

interrupted()的API描述:

1
2
3
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

继续追踪isInterrupted():

1
private native boolean isInterrupted(boolean ClearInterrupted);

这是一个native方法,在本人之前的《线程生命周期 & 中断机制》中,追踪过interrupt0(),而追踪isInterrupted()的方法与之如出一辙,这里就省略过程,直接放上核心代码,需要说明的是,os在这里仍然选用Linux版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted();
if (interrupted && clear_interrupted) {
osthread->set_interrupted(false);
// consider thread->_SleepEvent->reset() ... optional optimization
}
return interrupted;
}

到这里基本上就已经明晰interrupted()的原理了,但为了更直白,还是把OSThread的interrupted、set_interrupted方法也列出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
class OSThread: public CHeapObj<mtThread> {
...
private:
volatile jint _interrupted; // Thread.isInterrupted state
public:
volatile bool interrupted() const {
return _interrupted != 0;
}
void set_interrupted(bool z) {
_interrupted = z ? 1 : 0;
}
...
}

jint是一个JNI对Java数据类型的映射,这里对应Java里的int,现在可以分析出Java中Thread.interrupt()的原理了:

  1. 首先调用的是currentThread().isInterrupted(true)并将其返回;
  2. isInterrupted是一个native方法,底层调用的是系统的is_interrupted方法;
  3. 该方法获得Java线程对应的系统线程osthread,调用其方法interrupted获得中断标志_interrupted的值,并将该值赋给bool的局部变量interrupted;
  4. 由于clear_interrupted传入的参数为true,如果interrupted为true,那么执行osthread的set_interrupted方法,将标志位更改位false;
  5. 最后返回的是第3步的interrupted,但实际上此时标志位的值已经在第4步得到更改;

这就是为什么调用了Thread.interrupt()后,默认会更改中断标志位的值,到此其原理已经解析完毕。

追踪park()、unpark()底层源码

接下来,将通过分析park()和unpark()源码,来解释之前四个例子的输出结果。

unpark()底层源码

所要解析的unpark()的底层源码在hotspot-327ea6f9647c/src/os/linux/vm/os_linux.cpp下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
// thread might be parked
if (_cur_index != -1) {
// thread is definitely parked
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}

《多线程编程指南》介绍了pthread_mutex_lock方法,该方法可以锁定mutex所指向的互斥锁,所以在调用pthread_mutex_unlock(_mutex)解锁之前,其它线程要访问被锁定的内容将阻塞。

在被锁定的同步语句内,首先将_counter的值赋给s,然后更改_counter为1,这就是普遍说的给予一个“许可”的底层操作,然后判断s(也就是之前的_counter值)是否大于等于1,如果大于等于1,说明目标线程并没有挂起;如果小于1,说明目标线程有可能挂起,则调用pthread_cond_signal方法将其唤醒。

从以上分析也可以看出,_counter的更改不是累加的过程,而是每次都写死为1,这就解释了第2例的输出结果,即无论之前调用多少次LockSupport.unpark(),只会更改_counter为1,提供一个逻辑“许可”。

park()底层源码

park()的底层源码行数比较多,这里针对本文所要解决的问题,按从前到后的顺序列出关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void Parker::park(bool isAbsolute, jlong time) {
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
if (Thread::is_interrupted(thread, false)) {
return;
}
// Don't wait if cannot get lock since interference arises from
// unblocking. Also. check interrupt before trying wait
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
...
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
...
}

只要理解了这段代码,第1、3、4例的输出结果也就能解释

一开始的Atomic::xchg方法通过原子操作将_counter设置为0,而返回的是未设置之前的值,如果这个值大于0,证明获得了unpark()的许可,则park方法直接return不会挂起,至此第4个例子的输出就能解释了,虽然Thread.sleep()响应了t4.interrupt(),但是并没有更改_counter,而interrupt()在《线程生命周期 & 中断机制》就解析过,底层调用的正是parker的unpark方法,所以_counter是大于0的。

继续对底层的park()进行解析,通过Thread* thread = Thread::current()获得当前线程,紧接着是一个if语句,Thread::is_interrupted(thread, false)中传入false的作用是不执行osthread->set_interrupted(false),返回的仍然是中断标志位的值,所以调用park()不会对标志位清零。而此时中断标志位的值如果是true,则park方法直接return不会挂起,至此第1个例子的输出也能解释了:一次中断操作后无论线程调用多少次LockSupport.park(),程序都不会挂起的原因,就在于中断标志位没有清零。

在第3例中,之所以第二个LockSupport.park()能挂起,是因为System.out.println(Thread.interrupted())清零了标志位,Thread.interrupted()在前文早已完全解析。程序在执行到下面这段代码时:

1
2
3
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}

两个条件分句返回都是false,所以park()仍然没有return,那么后续会调用相关方法将线程挂起。所以第3个例的输出结果便得到了解释

小结

本文通过四个例子引发思考,通过解析源码解决困惑。在AQS源码中,acquireQueued方法的for循环内,parkAndCheckInterrupt()之所以要return Thread.interrupted(),正是因为如果不清零标志位,那么后续将永远不会再挂起,如同第1例中出现的那样。

中断标志位未清零,_counter == 1,只要满足这两者其中一个条件,park()就不会将线程挂起。

参考

pthread_cond_wait为什么需要传递mutex参数:算是本文读源码衍生出来的问题;

C++和JNI的数据转换:讲解了Java类型映射本地类型的相关知识;

《多线程编程指南》:帮助解答底层的困惑;