J.U.C并发框架之AQS(六):CountDownLatch

本篇延续之前的AQS系列,解析CountDownLatch,目的在于摸清CountDownLatch的工作机理。

线程内调用await()是如何响应中断的?多个线程各自调用await()并挂起,当同步器的state更新为0后会全部唤醒执行吗?通过本篇文章,诸如此类的问题将得到解决。

CountDownLatch示例

照例先放上演示代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CountDownLatchDemo implements Runnable {
static final CountDownLatch latch = new CountDownLatch(6);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
exec.submit(demo);
}
Thread.sleep(3000);
latch.await();
System.out.println(Thread.currentThread().getName());
exec.shutdown();
}
@Override
public void run() {
try {
Thread.sleep( 3000);
System.out.println(Thread.currentThread().getName());
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行的结果如下:

1
2
3
4
5
6
7
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
main

CountDownLatch源码解析

CountDownLatch和ReentrantLock类似,内部都维护了一个继承自AbstractQueuedSynchronizer的同步器。

通常,在new一个CountDownLatch对象的时候,会传入一个int值(往往是线程数),此时会执行构造器方法:

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

而Sync的构造器如下:

1
2
3
Sync(int count) {
setState(count);
}

可见,传入的int值,最终会设置为同步器内的state值。

countDown()

countDown()源码:

1
2
3
public void countDown() {
sync.releaseShared(1);
}

继续追踪sync.releaseShared(1)

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

以上就是countDown()的主要实现,涉及三个方法:releaseShared()、tryReleaseShared()、doReleaseShared(),第一个封装了后两个,组成了CountDownLatch释放共享锁的机制,其中tryReleaseShared代码在CountDownLatch内重写,而doReleaseShared方法来自AQS。

tryReleaseShared的作用是判断是否可以释放共享锁,它返回一个布尔值:

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

如果同步器的state等于0,自然无所谓释放锁,返回false,否则将state的值减去1(nextc = c-1),这符合countDown()的目的。然后采用CAS的方式将state更新为新值,判断更新后的state是否为0,如果为0说明可以释放共享锁,返回true,否则返回false。

如果通过tryReleaseShared判断后得到可以释放锁的结果,那么除了将state更新以外,还需要继续进行释放锁的一些实质操作,采用的是doReleaseShared方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

还记得之前写的J.U.C并发框架之AQS(一)内容吗?“Fake Node”是在enq()中通过执行compareAndSetHead(new Node())语句产生的,此时head不再指向null。也就是说,至少需要一个尝试获取锁未遂并进入等待队列的线程出现过,head才不为空。

结合开头的演示例子,如果将主线程睡眠时间换成更大的值,这样就确保了countDown()之前没有出现过尝试获取锁失败的线程,那么以上for循环将直接break(除非head在这段时间被其它线程修改,这样的话那么继续loop)。

第一个if语句的条件(h != null && h != tail)中,h != tail要求判断等待队列不为empty,Fake Node是不算等待队列的成员的,如果条件满足,内部内嵌if - else if句式:

  • 内嵌if语句:如果ws == Node.SIGNAL,唤醒后继节点,期间通过CAS更新ws值,如果失败跳出当次循环,重新loop;
  • else if语句:只有在ws == 0的时候,将其值更新为Node.PROPAGATE,更新失败跳出当次循环,重新loop。啰嗦一句,&&是短路运算符,ws不为0直接跳过执行;

从以上分析可见,CountDownLatch类创建的latch实例可以看作是一把锁,一个线程通过countDown()可以修改latch持有的同步器的state,好似共享了一把锁一样。

想必await()会有一个入队的“仪式”,这样才能和前面解析的内容配套,接下来就解析await()。

await()

它的代码如下:

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

继续追踪:

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

以上就是await()的主要实现,同样涉及三个方法:acquireSharedInterruptibly()、tryAcquireShared()、doAcquireSharedInterruptibly(),第一个封装了后两个,组成了CountDownLatch获取共享锁的机制,其中tryAcquireShared代码在CountDownLatch内重写,而doAcquireSharedInterruptibly方法来自AQS。

tryAcquireShared()长这样,它的作用是判断是否可以获取共享锁

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

传入的参数并没有在方法体中发挥功效,这样声明方法,可能是因为重写AQS方法时需要继承方法签名。

如果同步器的state为0,则返回1,表示可以获取锁;否则返回-1,表示不能获取锁。

一旦没能获取共享锁,那么执行doAcquireSharedInterruptibly,它的作用可认为是等待时机以获取共享锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

addWaiter()根据线程创建了共享模式下的实例node,并放入队尾。第一个if语句中,如果p的前驱不是head,就跳到并行的后一个if,这部分和acquireQueued方法类似,不细究了,值得注意的是if条件满足会直接抛中断异常。

如果node的前驱 == head,和acquireQueued类似,首先还是会尝试获取共享锁,获取失败的情形和前面分析的类似,现假设此时获取成功,那么将执行setHeadAndPropagate方法。

setHeadAndPropagate源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

setHead方法在之前的文章已经分析过了,这里主要关注代码中的if语句。

s是node的后继节点,isShared方法如下:

1
2
3
final boolean isShared() {
return nextWaiter == SHARED;
}

这里会产生一些好奇:

第一个好奇:nextWaiter眼熟

注释是这么说的:

And because conditions can only be exclusive, we save a field by using special value to indicate shared mode.

所以nextWaiter在独断模式和共享模式的意义是不一样的,在这里用来表示node处于共享模式。

第二个好奇:s == null作为条件语句

doReleaseShared()主要功能在前面已经解析过,但其中的unparkSuccessor由于是之前文章解析过的方法,前面并没有具体说明。实际上,其内部存在处理后继节点为null的代码:

1
2
3
4
5
6
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}

这里的s是头节点的后继,而代表头节点的指针head在之前就已然指向了node,所以方法内外的s都是node的后继,而在上述代码中,s如果为null,或者s的ws值大于0,将从队列的尾端开始向前查找ws小于等于0的node,然后进行后续处理。

小结

总的来说,由于CountDownLatch不需要显示调用方法释放锁(比如独断锁的lock.lock()),唤醒后继节点的操作直接在doAcquireSharedInterruptibly内完成。所以即便有多个线程通过await()挂起,一旦latch内维护的同步器的state更新为0,那么多个挂起的线程都将被唤醒。

以上就是CountDownLatch的实现解析。