解析ReentrantLock,就是解析AQS!但后者毕竟是一个框架,大而全,且部分功能交由子类重现,不容易找到一个阅读源码的切入点,所以不妨从ReentrantLock开始,先理解AQS实现的独占锁。在解析的过程中,对一些过于简单的细节,不再堆砌,只抓核心功能的实现部分。本解析采用的是jdk1.8.0_171。
实验用例
首先释出一个并发的程序:
|
|
运行后,程序先打印出main和Thread-0,过了5秒后打印出Thread-1,又过了五秒程序结束。此程序可以用来作为解析ReentrantLock源码的例子。
锁未被占用时的线程执行
Thread-0和Thread-1都尝试过获得锁,但是它们的“际遇”是不一样的,首先我们看看Thread-0。
- 在自定义的线程类中一开始创建了ReentrantLock对象,而ReentrantLock无参构造方法默认产生的是NonfairSync实例:
所以lock是不公平锁(这是什么现在不重要);public ReentrantLock() { sync = new NonfairSync(); }
- t1.start():程序开始执行run()中的代码;
- lock.lock()调用的方法如下:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
- 执行之后的打印语句与睡眠语句;
- 最后还会执行finally下的lock.unlock()释放锁;
这里重点将第3步一窥究竟。
一开始会进行一个compareAndSetState(0, 1)的判断,这是基于CAS,继续追踪其源码如下:
|
|
this指的就是当前锁持有的同步器对象,这里就是第1步里得到的sync,stateOffset是state的底层地址偏移量(用到了反射,也用到了unsafe方法来获得这个值),expect是期望值,update是要更新成的值。state最开始的值为0,所以这个时候if判断语句为true,执行setExclusiveOwnerThread(Thread.currentThread()),将当前所在的线程设置为exclusiveOwnerThread,表明它是独占模式同步锁的持有者。
锁被占用时的线程执行
让我们再来看看Thread-1,同样的,它也会尝试执行Thread-0一样的流程,但是当它在执行第3步的时候,出现了不一样的情况,它会去执行acquire(1),这个方法的构造是这样的:
|
|
调用上面的方法会先调用tryAcquire(arg):
|
|
而返回的nonfairTryAcquire()长这样:
|
|
这段代码虽然比之前的稍长,但不难理解。Thread.currentThread()获得当前线程,即Thread1,此时的state是1,所以c等于1,第一个if的判断语句为false,直接跳转到else if,但Thread1并不是exclusiveOwnerThread,所以直接跳转到最后的返回语句,返回false。
接着看看acquireQueued(addWaiter(Node.EXCLUSIVE), arg))发生了什么。
先是addWaiter():
|
|
该方法体第一句的作用是根据当前所在的线程,创建一个新的Node实例。由于之前没有提到过Node,这里稍稍解释一下,AQS内部维护一个基于”CLH”(Craig, Landin, and Hagersten)队列修改得到的FIFO队列,每个元素是一个Node实例,而每个Node实例封装了一个线程对象以及一些相关字段和方法,在这里可以先简单理解为一个Node实例代表了一个线程。方法的形参mode传入的是Node.EXCLUSIVE,它将模式标记为独占。
第二句为Node pred = tail,在当前的情况下,所谓的队列还是空的,没有添加任何节点,所以第一个if语句直接跳过,进入到enq()中,if语句的作用注释已经说的很清楚“Try the fast path of enq”,尝试最快入列。
enq():
|
|
在for循环里,第一个if进行一个null判断,由于当前t是null,所以执行内部语句,内部又嵌入了一个if语句,compareAndSetHead方法是一个CAS方法,作用是如果当前head节点为null,那么就设置为指向一个新创建的Node对象。而当前head节点的确是null,所以head = new Node(),然后执行tail = head语句。程序运行到此时AQS内部结构如下:
head和tail指向相同的Node对象,之前本人说过,为了方便,一个Node实例可以简单的被理解为代表一个线程,但是在这里它内部的thread指向的是null,所以这个Node对象是一个“Fake Node”。
注意this后面的打印输出“[State = 1, empty queue]”,也就是说此时被认为队列是空的,我们追踪程序的相关代码,发现判断内部是否存在线程队列,主要在于方法hasQueuedThread:
|
|
至此,AQS维护一个队列的方式也就清楚了,它并不会主动在内部创建Queue对象,而是通过一头(head)一尾(tail)来操作。
方法的执行到这里并没有返回和结束,它会继续执行for循环,这个时候t不再是null,开始执行else语句,词句段的作用相当于将代表Thread-1的Node对象设置为队尾,AQS结构如下:
此时的队列是“nonempty”,一个作为队列头Fake Node实例,双向连接于代表Thread-1的作为队列队尾的Node实例。
回到起初的acquire方法,分析方法内调用的acquireQueued(),它的作用在我看来,就是入队时:尝试获得锁,如果未遂,进而挂起;出队时:挂起被唤醒,尝试去获取锁:
|
|
一些参数和变量:addWaiter方法return了代表Thread-1的node,这里作为实参传入,arg在这里为1,方法内声明failed这个标志字段,并将其设置为true,还设置了interrupted字段,此interrupted非彼interrupted(Thread的静态方法),用来作为是否中断过的标志。
接下来是一个for循环,p是node的前驱,在本例子中,p == head返回true,然后Thread-1还会进行一次获取同步锁的尝试,我个人是觉得两个if的顺序改变不会带来特别大的变化,所以如果非要问为什么一进入方法首先要再次重新尝试tryAcquire(),本人能给出可能的原因是线程在不同状态下切换是一个很“重”的操作,所以赶在切换成等待状态之前,看看能不能获得锁。
如果执行到第一个if语句内部,那么证明该线程拿到了锁,此时setHead(node)做了这么几件事:
- 将head指针指向自己; // head = node;
- 将内部持有的线程字段置为null; // node.thread = null;
- 将前驱指针置为null; // node.prev = null;
这样一来,该节点也演变为“Fake Node”。将p.next = null,这样原本的head指向的“Fake Node”内已经没有指向任何对象,而p又只是一个局部变量,所以没有强引用指向该节点对象,那么它将会被GC。
第一个if的判断语句返回false,不能继续执行括号内的语句,接着跳转到第二个if,shouldParkAfterFailedAcquire方法作用如其名,放上源码:
|
|
waitStatus是节点的等待状态,它包括SIGNAL、CANCELLED、CONDITION、PROPAGATE等,这里先重点说说SIGNAL和CANCELLED:SIGNAL的值设置为-1,它表示后继节点的线程需要被唤醒(unparking),所以显然,当一个节点前驱的waitStatus处于SIGNAL时,那么它就可以放心睡眠;CANCELLED的值为1,它表示线程已经注销。
结合当前的例子,第一次执行该方法时,ws为0,然后他会跳转到else语句,compareAndSetWaitStatus将ws设置为Node.SIGNAL,然后返回false。继续执行acquireQueued()的for循环,由于当前线程仍然不能拿到同步锁,所以又开始执行shouldParkAfterFailedAcquire(),直到前驱节点的ws的值为Node.SIGNAL,整个方法返回true,简单描述过程:
- 如果前驱节点的ws为Node.SIGNAL,那就直接返回;
- 如果当前的前驱节点ws值大于1,那么就不断递归查找ws不再大于0的节点作为前驱节点;
- 与找到的新的前驱节点进行双向链接;
- 如果前驱节点的ws不大于0,那么将它的ws设置为Node.SIGNAL;
接着就是执行parkAndCheckInterrupt():
|
|
LockSupport.park(this)是不会更改标志位的值的,所以一旦调用了interrupt方法,线程被唤醒,此时Thread.interrupted()的值为true,作为方法的返回值,中断标志位清零。这里多说一下,清零是很重要的动作(虽然默认调用interrupted()就会完成这个动作),因为在两个park()之间,如果不进行标志位清零,for循环又一次执行到parkAndCheckInterrupt(),线程不会再挂起。而且interrupted()只会响应中断操作,如果是调用了unpark方法唤醒线程,interrupted方法只会返回false(写到这里让我产生interrupted()就是为AQS而生的感觉)。
总之,要了解interrupt()和unpark()作用于等待队列中线程所产生的区别。比如,有一个处于睡眠状态的线程,通过中断机制唤醒后,他会尝试获得锁,如果没有获得锁,它会挂起,不会执行到selfInterrupt()语句,一旦获得锁,会执行到selfInterrupt(),他处于RUNNABLE状态,也不会立即响应中断,但此时它的标志位的值是true。而如果是通过unpark()唤醒,无论它获没获得锁,由于interrupted的值是false,所以永远不会执行到selfInterrupt()语句。
以下代码说明了上述结论:
|
|
输出结果为:
|
|
Thread-0在执行的时候,Thread-1进入等待队列,中途Thread-1被interrupt(),当Thread-1获得锁后开始执行,selfInterrupt()更改了中断标志位,Thread-1执行到sleep()语句时,立马响应中断抛出异常。
小结
本篇主要介绍线程获取锁的过程,由于跟随实例来进行分析,所以中间有些细节跳过了,但是这些细节都不是需要烧脑理解的部分,比如在shouldParkAfterFailedAcquire方法里,ws > 0的时候,前驱节点的线程被注销,此时应该不断执行node.prev = pred = pred.prev,直到pred.waitStatus <= 0。本篇小文还解析了在线程获取锁过程中,中断操作对线程运行的影响。
参考
JDK-6503247:acquireQueued()添加finally机制来处理这种bug;