J.U.C并发框架之AQS(五):Condition

通常ReentrantLock和Condition配套使用,后者提供了await()以及signal()等方法,本篇将重点解析await()和signal()的实现原理,以期了解它们的工作流程。

signal()会随意唤醒条件队列中的线程吗?调用signal()会立马唤醒条件队列的线程吗?中断条件队列中的等待线程会抛异常吗?很多问题将通过本篇获解。

Condition基本用法

照例先放一段演示代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ReenConditionDemo {
public static final ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
public static void main(String[] args) {
Thread t = new Thread(()
-> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " begin");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println(Thread.currentThread().getName() + " over");
}, "Task");
t.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
System.out.println("Main begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signal();
lock.unlock();
System.out.println("Main over");
}
}

运行输出如下:

1
2
3
4
Task begin
Main begin
Main over
Task over

Task线程先拿到锁,执行到condition.await()开始等待,并释放锁,主线程拿到锁开始执行,执行到condition.signal()将Task线程唤醒,主线程unlock()后,Task线程开始执行。

Condition似乎可以看作模拟Syncronized锁住的对象,它调用await()和signal()对应wait()和notify()。

下图列出了Condition接口全部方法:

Condition01

Condition实现解析

追踪Condition实现的相关代码:

1
2
3
4
5
6
7
8
9
// 追踪的lock.newCondition()实现:
public Condition newCondition() {
return sync.newCondition();
}
// 追踪的sync.newCondition()实现:
final ConditionObject newCondition() {
return new ConditionObject();
}

ConditionObject是AQS的一个内部类,实现了Condition和序列化接口。

await()解析

按照之前演示的顺序,先解析await(),它的代码并不冗长:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

ConditionObject维护了一个自己的等待队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

初始的时候内部的指针都为null,比如lastWaiter,而第一个if语句是处理ws为CANCELLED等情况,这里就不去说明了。

创建代表当前线程的节点,将ws设置为CONDITION,然后就是入队返回了,这里的队列也没有创建专门的Queue,就是用的普通链表形式。

接下来是fullyRelease方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

这个方法的作用是释放锁,并保存同步器的状态值。release(savedState)正常情况应该返回true,也就是tryRelease(savedState)应该返回true,不仅如此,在tryRelease内还会调用setExclusiveOwnerThread(null),并setState(0)(具体参看ReentrantLock源码中的tryRelease()部分),这样就让出了同步器构造的锁。tryRelease后执行release第一个if内的语句,会唤醒后继节点,具体来说就是本站AQS第一篇解析的内容了。

接下来的代码,将单独抽出来:

1
2
3
4
5
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

继续追踪isOnSyncQueue方法的源码,该方法作用如其名,就是判断是否在同步队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}

逐句解释:

  • 如果ws是Node.CONDITION或者该节点前驱指针指向为null则说明不在同步队列,之前的解析文章中已经解释过,head == tail表示队列为空;
  • 如果一个节点有后继节点,那么就说明它在同步队列;
  • 如果不属于以上情况,那么从队列的尾部开始遍历,查找是否有和传入的node相等的节点;

while语句内部,本人关注这段代码:

1
2
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;

进而追踪checkInterruptWhileWaiting:

1
2
3
4
5
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

这个方法的名字告诉本人,这是一个在等待期间检测中断的一个方法。这里强调一下,unpark()是不会更改中断标志位的,所以如果是signal()(内部调用的是unpark(),后续会分析),那么该方法只会返回0,假设此时中断标志位不为0,来展开后续分析。

来看看transferAfterCancelledWait方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

(node, Node.CONDITION, 0)对应(Node node, int expect, int update),如果在signal()之前,那么当前节点的ws当然是Node.CONDITION,这样就继续入队,并返回true,入队后怎么重新获得锁,参考本站AQS第一篇的内容。

如果在signal()之后发生中断动作,那么ws将不再是Node.CONDITION,但是也有可能代表当前线程的节点并没有完成入队,所以用一个while语句spin。

在signal()之前中断不是正常的程序,理应推出等待抛出异常,所以用THROW_IE表示,值为-1;而在signal()之后中断,就是重复多次中断,就用REINTERRUPT表示,值为1。

后续还有以下代码:

1
2
3
4
5
6
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);

简要说明如下:

  • 第一个if:之前的解析文章中已知,acquireQueued方法,如果遭遇中断事件也会返回true,那么在不是THROW_IE的情况下,也可以将interruptMode设置为REINTERRUPT;
  • 第二个if:unlinkCancelledWaiters方法是用来清除在条件队列中却不处于条件状态的节点;
  • 第三个if:主要用来抛中断异常,如果interruptMode == REINTERRUPT还会自我调用interrupt();

signal()解析

在之前的解析中,存在如下代码:

1
2
3
while (!isOnSyncQueue(node))
Thread.yield();
return false;

我的原话是:“如果在signal()之后发生中断动作,那么ws将不再是Node.CONDITION,但是也有可能代表当前线程的节点并没有完成入队,所以用一个while语句spin。”

那么等待的入队动作,发生在什么地方呢?答案可能就在signal()源码内。

signal():

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

继续追踪:

1
2
3
4
5
6
7
8
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

firstWaiter被设置为first.nextWaiter,后续还执行first.nextWaiter = null,这样条件队列的头节点就被“剥离”。transferForSignal(first)返回true会停止循环。

继续追踪transferForSignal方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

如果node(第一次进入while条件语句指代头节点first)状态不是Node.CONDITION,那么if语句返回false,则继续while循环找寻下一个等待队列中的节点;反之则node的ws更新为0,继续向下执行transferForSignal中的代码。

enq()已经在之前的文章已经解析过,它是这么一个方法,内部有一个局部变量t,这个t原本指向的是尾节点,当node作为参数传入后,将node接在尾节点后,此时t相当于node的前驱节点并双向链接于node。

前驱节点ws > 0,又或者这个前驱节点设置ws为Node.SIGNAL失败,则会执行:

1
LockSupport.unpark(node.thread);

从上述代码,至少揭示了两件事:

  • signal()会按照条件队列的入队顺序进行操作;
  • signal()通常不会立马唤醒条件队列中的线程;

以本篇文章开头的例子来说,只有两个线程,一个是主线程,一个是开的Task线程,代表Task线程的node的前驱节点就是head,head的ws并没有大于0,而且也可以成功设置为Node.SIGNAL。

将主线程的lock.unlock()注释掉,在源码上打断点进行说明:

condition02

在执行if语句之前:

condition03

执行if语句之后:

condition04

Node@693代表的就是Task线程的节点,而Node@694是head指针指向的节点,也是p代表的节点。它是693的前驱,693是它的后继。

if语句内的LockSupport.unpark(node.thread);并没有执行(之前打过断点,直接跳过,上图并未打断点)。而p的ws设置为了-1,当主线程释放锁,Task线程才会唤醒执行。

所以:

  • signal()之后,通常是将挂起的线程直接放到同步队列中;
  • 即便挂起的线程被唤醒,一般也是通过acquireQueued方法,在同步等待队列的机制下完成后续操作;

signalAll()和signal()的实现大体上相同,最大的不同在于doSignal和doSignalAll,这里把后者代码也贴出来:

1
2
3
4
5
6
7
8
9
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

两个相比区别主要在于while的条件语句,signalAll()会依次将条件队列中的Node实例放到同步等待队列中。

小结

Condition作为ReentrantLock的配套工具,它的实现不是孤立的,和ReentrantLock内部维护的同步器联系紧密,特别是它巧妙的利用了同步器的等待队列。

一些问题的本人理解

1
while (!isOnSyncQueue(node)) {}

在await中唤醒而未入同步等待队列的情况,除了中断,还可能包括unpark(),所以while()中的条件有它存在的必要性。