J.U.C并发框架之AQS(一):ReentrantLock获取锁

解析ReentrantLock,就是解析AQS!但后者毕竟是一个框架,大而全,且部分功能交由子类重现,不容易找到一个阅读源码的切入点,所以不妨从ReentrantLock开始,先理解AQS实现的独占锁。在解析的过程中,对一些过于简单的细节,不再堆砌,只抓核心功能的实现部分。本解析采用的是jdk1.8.0_171。

实验用例

首先释出一个并发的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReentrantLockDemo02 {
public static void main(String[] args) {
Thread t1 = new ThreadDemo01();
Thread t2 = new ThreadDemo01();
System.out.println(Thread.currentThread().getName());
t1.start();
t2.start();
}
public static class ThreadDemo01 extends Thread {
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
}

运行后,程序先打印出main和Thread-0,过了5秒后打印出Thread-1,又过了五秒程序结束。此程序可以用来作为解析ReentrantLock源码的例子。

锁未被占用时的线程执行

Thread-0和Thread-1都尝试过获得锁,但是它们的“际遇”是不一样的,首先我们看看Thread-0。

  1. 在自定义的线程类中一开始创建了ReentrantLock对象,而ReentrantLock无参构造方法默认产生的是NonfairSync实例:
    public ReentrantLock() {
      sync = new NonfairSync();
    }
    
    所以lock是不公平锁(这是什么现在不重要);
  2. t1.start():程序开始执行run()中的代码;
  3. lock.lock()调用的方法如下:
    final void lock() {
      if (compareAndSetState(0, 1))
          setExclusiveOwnerThread(Thread.currentThread());
      else
          acquire(1);
    }
    
  4. 执行之后的打印语句与睡眠语句;
  5. 最后还会执行finally下的lock.unlock()释放锁;

这里重点将第3步一窥究竟。

一开始会进行一个compareAndSetState(0, 1)的判断,这是基于CAS,继续追踪其源码如下:

1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

this指的就是当前锁持有的同步器对象,这里就是第1步里得到的sync,stateOffset是state的底层地址偏移量(用到了反射,也用到了unsafe方法来获得这个值),expect是期望值,update是要更新成的值。state最开始的值为0,所以这个时候if判断语句为true,执行setExclusiveOwnerThread(Thread.currentThread()),将当前所在的线程设置为exclusiveOwnerThread,表明它是独占模式同步锁的持有者。

锁被占用时的线程执行

让我们再来看看Thread-1,同样的,它也会尝试执行Thread-0一样的流程,但是当它在执行第3步的时候,出现了不一样的情况,它会去执行acquire(1),这个方法的构造是这样的:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

调用上面的方法会先调用tryAcquire(arg):

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

而返回的nonfairTryAcquire()长这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这段代码虽然比之前的稍长,但不难理解。Thread.currentThread()获得当前线程,即Thread1,此时的state是1,所以c等于1,第一个if的判断语句为false,直接跳转到else if,但Thread1并不是exclusiveOwnerThread,所以直接跳转到最后的返回语句,返回false。

接着看看acquireQueued(addWaiter(Node.EXCLUSIVE), arg))发生了什么。

先是addWaiter():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

该方法体第一句的作用是根据当前所在的线程,创建一个新的Node实例。由于之前没有提到过Node,这里稍稍解释一下,AQS内部维护一个基于”CLH”(Craig, Landin, and Hagersten)队列修改得到的FIFO队列,每个元素是一个Node实例,而每个Node实例封装了一个线程对象以及一些相关字段和方法,在这里可以先简单理解为一个Node实例代表了一个线程。方法的形参mode传入的是Node.EXCLUSIVE,它将模式标记为独占。

第二句为Node pred = tail,在当前的情况下,所谓的队列还是空的,没有添加任何节点,所以第一个if语句直接跳过,进入到enq()中,if语句的作用注释已经说的很清楚“Try the fast path of enq”,尝试最快入列。

enq():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

在for循环里,第一个if进行一个null判断,由于当前t是null,所以执行内部语句,内部又嵌入了一个if语句,compareAndSetHead方法是一个CAS方法,作用是如果当前head节点为null,那么就设置为指向一个新创建的Node对象。而当前head节点的确是null,所以head = new Node(),然后执行tail = head语句。程序运行到此时AQS内部结构如下:

enq01

head和tail指向相同的Node对象,之前本人说过,为了方便,一个Node实例可以简单的被理解为代表一个线程,但是在这里它内部的thread指向的是null,所以这个Node对象是一个“Fake Node”。

注意this后面的打印输出“[State = 1, empty queue]”,也就是说此时被认为队列是空的,我们追踪程序的相关代码,发现判断内部是否存在线程队列,主要在于方法hasQueuedThread:

1
2
3
public final boolean hasQueuedThreads() {
return head != tail;
}

至此,AQS维护一个队列的方式也就清楚了,它并不会主动在内部创建Queue对象,而是通过一头(head)一尾(tail)来操作。

方法的执行到这里并没有返回和结束,它会继续执行for循环,这个时候t不再是null,开始执行else语句,词句段的作用相当于将代表Thread-1的Node对象设置为队尾,AQS结构如下:

enq02

此时的队列是“nonempty”,一个作为队列头Fake Node实例,双向连接于代表Thread-1的作为队列队尾的Node实例。

回到起初的acquire方法,分析方法内调用的acquireQueued(),它的作用在我看来,就是入队时:尝试获得锁,如果未遂,进而挂起;出队时:挂起被唤醒,尝试去获取锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

一些参数和变量:addWaiter方法return了代表Thread-1的node,这里作为实参传入,arg在这里为1,方法内声明failed这个标志字段,并将其设置为true,还设置了interrupted字段,此interrupted非彼interrupted(Thread的静态方法),用来作为是否中断过的标志。

接下来是一个for循环,p是node的前驱,在本例子中,p == head返回true,然后Thread-1还会进行一次获取同步锁的尝试,我个人是觉得两个if的顺序改变不会带来特别大的变化,所以如果非要问为什么一进入方法首先要再次重新尝试tryAcquire(),本人能给出可能的原因是线程在不同状态下切换是一个很“重”的操作,所以赶在切换成等待状态之前,看看能不能获得锁。

如果执行到第一个if语句内部,那么证明该线程拿到了锁,此时setHead(node)做了这么几件事:

  • 将head指针指向自己; // head = node;
  • 将内部持有的线程字段置为null; // node.thread = null;
  • 将前驱指针置为null; // node.prev = null;

这样一来,该节点也演变为“Fake Node”。将p.next = null,这样原本的head指向的“Fake Node”内已经没有指向任何对象,而p又只是一个局部变量,所以没有强引用指向该节点对象,那么它将会被GC。

第一个if的判断语句返回false,不能继续执行括号内的语句,接着跳转到第二个if,shouldParkAfterFailedAcquire方法作用如其名,放上源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

waitStatus是节点的等待状态,它包括SIGNAL、CANCELLED、CONDITION、PROPAGATE等,这里先重点说说SIGNAL和CANCELLED:SIGNAL的值设置为-1,它表示后继节点的线程需要被唤醒(unparking),所以显然,当一个节点前驱的waitStatus处于SIGNAL时,那么它就可以放心睡眠;CANCELLED的值为1,它表示线程已经注销。

结合当前的例子,第一次执行该方法时,ws为0,然后他会跳转到else语句,compareAndSetWaitStatus将ws设置为Node.SIGNAL,然后返回false。继续执行acquireQueued()的for循环,由于当前线程仍然不能拿到同步锁,所以又开始执行shouldParkAfterFailedAcquire(),直到前驱节点的ws的值为Node.SIGNAL,整个方法返回true,简单描述过程:

  • 如果前驱节点的ws为Node.SIGNAL,那就直接返回;
  • 如果当前的前驱节点ws值大于1,那么就不断递归查找ws不再大于0的节点作为前驱节点;
  • 与找到的新的前驱节点进行双向链接;
  • 如果前驱节点的ws不大于0,那么将它的ws设置为Node.SIGNAL;

接着就是执行parkAndCheckInterrupt():

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

LockSupport.park(this)是不会更改标志位的值的,所以一旦调用了interrupt方法,线程被唤醒,此时Thread.interrupted()的值为true,作为方法的返回值,中断标志位清零。这里多说一下,清零是很重要的动作(虽然默认调用interrupted()就会完成这个动作),因为在两个park()之间,如果不进行标志位清零,for循环又一次执行到parkAndCheckInterrupt(),线程不会再挂起。而且interrupted()只会响应中断操作,如果是调用了unpark方法唤醒线程,interrupted方法只会返回false(写到这里让我产生interrupted()就是为AQS而生的感觉)。

总之,要了解interrupt()和unpark()作用于等待队列中线程所产生的区别。比如,有一个处于睡眠状态的线程,通过中断机制唤醒后,他会尝试获得锁,如果没有获得锁,它会挂起,不会执行到selfInterrupt()语句,一旦获得锁,会执行到selfInterrupt(),他处于RUNNABLE状态,也不会立即响应中断,但此时它的标志位的值是true。而如果是通过unpark()唤醒,无论它获没获得锁,由于interrupted的值是false,所以永远不会执行到selfInterrupt()语句。

以下代码说明了上述结论:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ReentrantLockDemo {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new ThreadDemo01();
Thread t2 = new ThreadDemo01();
System.out.println(Thread.currentThread().getName());
t1.start();
t2.start();
Thread.sleep(1000);
t2.interrupt();
}
public static class ThreadDemo01 extends Thread {
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(5000);
System.out.println("线程" + Thread.currentThread() + "执行完sleep方法");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
}

输出结果为:

1
2
3
4
5
6
7
8
9
main
Thread-0
线程Thread[Thread-0,5,main]执行完sleep方法
Thread-1
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.demo.ReentrantLockDemo$ThreadDemo01.run(ReentrantLockDemo.java:24)
Process finished with exit code 0

Thread-0在执行的时候,Thread-1进入等待队列,中途Thread-1被interrupt(),当Thread-1获得锁后开始执行,selfInterrupt()更改了中断标志位,Thread-1执行到sleep()语句时,立马响应中断抛出异常。

小结

本篇主要介绍线程获取锁的过程,由于跟随实例来进行分析,所以中间有些细节跳过了,但是这些细节都不是需要烧脑理解的部分,比如在shouldParkAfterFailedAcquire方法里,ws > 0的时候,前驱节点的线程被注销,此时应该不断执行node.prev = pred = pred.prev,直到pred.waitStatus <= 0。本篇小文还解析了在线程获取锁过程中,中断操作对线程运行的影响。

参考

JDK-6503247:acquireQueued()添加finally机制来处理这种bug;