本篇延续之前的AQS系列,解析CountDownLatch,目的在于摸清CountDownLatch的工作机理。
线程内调用await()是如何响应中断的?多个线程各自调用await()并挂起,当同步器的state更新为0后会全部唤醒执行吗?通过本篇文章,诸如此类的问题将得到解决。
CountDownLatch示例
照例先放上演示代码:
|
|
运行的结果如下:
|
|
CountDownLatch源码解析
CountDownLatch和ReentrantLock类似,内部都维护了一个继承自AbstractQueuedSynchronizer的同步器。
通常,在new一个CountDownLatch对象的时候,会传入一个int值(往往是线程数),此时会执行构造器方法:
|
|
而Sync的构造器如下:
|
|
可见,传入的int值,最终会设置为同步器内的state值。
countDown()
countDown()源码:
|
|
继续追踪sync.releaseShared(1)
:
|
|
以上就是countDown()的主要实现,涉及三个方法:releaseShared()、tryReleaseShared()、doReleaseShared(),第一个封装了后两个,组成了CountDownLatch释放共享锁的机制,其中tryReleaseShared
代码在CountDownLatch内重写,而doReleaseShared
方法来自AQS。
tryReleaseShared
的作用是判断是否可以释放共享锁
,它返回一个布尔值:
|
|
如果同步器的state等于0,自然无所谓释放锁,返回false,否则将state的值减去1(nextc = c-1),这符合countDown()的目的。然后采用CAS的方式将state更新为新值,判断更新后的state是否为0,如果为0说明可以释放共享锁,返回true,否则返回false。
如果通过tryReleaseShared
判断后得到可以释放锁的结果,那么除了将state更新以外,还需要继续进行释放锁的一些实质操作
,采用的是doReleaseShared
方法:
|
|
还记得之前写的J.U.C并发框架之AQS(一)内容吗?“Fake Node”是在enq()中通过执行compareAndSetHead(new Node())语句产生的,此时head不再指向null。也就是说,至少需要一个尝试获取锁未遂并进入等待队列的线程出现过,head才不为空。
结合开头的演示例子,如果将主线程睡眠时间换成更大的值,这样就确保了countDown()之前没有出现过尝试获取锁失败的线程,那么以上for循环将直接break(除非head在这段时间被其它线程修改,这样的话那么继续loop)。
第一个if语句的条件(h != null && h != tail)
中,h != tail
要求判断等待队列不为empty,Fake Node是不算等待队列的成员的,如果条件满足,内部内嵌if - else if句式:
- 内嵌if语句:如果ws == Node.SIGNAL,唤醒后继节点,期间通过CAS更新ws值,如果失败跳出当次循环,重新loop;
- else if语句:只有在ws == 0的时候,将其值更新为Node.PROPAGATE,更新失败跳出当次循环,重新loop。啰嗦一句,&&是短路运算符,ws不为0直接跳过执行;
从以上分析可见,CountDownLatch类创建的latch实例可以看作是一把锁,一个线程通过countDown()可以修改latch持有的同步器的state,好似共享了一把锁一样。
想必await()会有一个入队的“仪式”,这样才能和前面解析的内容配套,接下来就解析await()。
await()
它的代码如下:
|
|
继续追踪:
|
|
以上就是await()的主要实现,同样涉及三个方法:acquireSharedInterruptibly()、tryAcquireShared()、doAcquireSharedInterruptibly(),第一个封装了后两个,组成了CountDownLatch获取共享锁的机制,其中tryAcquireShared
代码在CountDownLatch内重写,而doAcquireSharedInterruptibly
方法来自AQS。
tryAcquireShared()
长这样,它的作用是判断是否可以获取共享锁
:
|
|
传入的参数并没有在方法体中发挥功效
,这样声明方法,可能是因为重写AQS方法时需要继承方法签名。
如果同步器的state为0,则返回1,表示可以获取锁;否则返回-1,表示不能获取锁。
一旦没能获取共享锁,那么执行doAcquireSharedInterruptibly
,它的作用可认为是等待时机以获取共享锁
:
|
|
addWaiter()根据线程创建了共享模式下的实例node,并放入队尾。第一个if语句中,如果p的前驱不是head,就跳到并行的后一个if,这部分和acquireQueued方法类似,不细究了,值得注意的是if条件满足会直接抛中断异常。
如果node的前驱 == head,和acquireQueued类似,首先还是会尝试获取共享锁,获取失败的情形和前面分析的类似,现假设此时获取成功,那么将执行setHeadAndPropagate
方法。
setHeadAndPropagate
源码如下:
|
|
setHead方法在之前的文章已经分析过了,这里主要关注代码中的if语句。
s是node的后继节点,isShared
方法如下:
|
|
这里会产生一些好奇:
第一个好奇:nextWaiter眼熟
注释是这么说的:
And because conditions can only be exclusive, we save a field by using special value to indicate shared mode.
所以nextWaiter在独断模式和共享模式的意义是不一样的,在这里用来表示node处于共享模式。
第二个好奇:s == null作为条件语句
doReleaseShared()主要功能在前面已经解析过,但其中的unparkSuccessor由于是之前文章解析过的方法,前面并没有具体说明。实际上,其内部存在处理后继节点为null的代码:
|
|
这里的s是头节点的后继,而代表头节点的指针head在之前就已然指向了node,所以方法内外的s都是node的后继,而在上述代码中,s如果为null,或者s的ws值大于0,将从队列的尾端开始向前查找ws小于等于0的node,然后进行后续处理。
小结
总的来说,由于CountDownLatch不需要显示调用方法释放锁(比如独断锁的lock.lock()),唤醒后继节点的操作直接在doAcquireSharedInterruptibly内完成。所以即便有多个线程通过await()挂起,一旦latch内维护的同步器的state更新为0,那么多个挂起的线程都将被唤醒。
以上就是CountDownLatch的实现解析。