线程池源码解析

本篇主要解析线程池源码,最大的看点在于对shutdown()和shutdownNow()的比较分析以及对线程复用的深入研究,源码中还有很多巧妙的设计,比如ctl等,文章中相关的部分也会一并谈到。

线程池框架结构

首先有必要了解一下线程池中主要的类和接口,初学很容易将它们混淆,实际上框架的继承结构非常简单,继承结构中没有出现@Override:

  • Executor:一个接口,只有一个未实现的execute();
  • ExecutorService:一个接口,从实际的角度来看,拓展了Executor,最重要的可能就是添加了未实现的submit(),submit()的返回值是Future;当然,除此之外,还添加了一些未实现的方法,诸如shutdown()、isShutdown()、invokeAny()、invokeAll()等,这些方法的存在,让ExecutorService相对比较完备,所以在产生线程池实例时,完全可以用ExecutorService作为引用的类型;
  • AbstractExecutorService:一个抽象类,实现了ExecutorService的submit()、invokeAll和invokeAny();
  • ThreadPoolExecutor:线程池具体实现类,直接继承自AbstractExecutorService,对后者实现的方法不进行覆盖(submit()等),没有实现的方法均给出实现;
  • Executors:一个工厂类,大部分返回ThreadPoolExecutor实例;

由此可见,几乎所有线程池具体的实现逻辑,都发生在ThreadPoolExecutor内部。

ThreadPoolExecutor实现机理

构造 & 运行流程

下面是ThreadPoolExecutor带全参的构造器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize:核心池的容量;
  • maximumPoolSize:线程池的最大容量;
  • keepAliveTime:当实际容量大于核心容量时,线程如果超过这个时间仍然“无事可做”,则要求被回收;
  • unit:keepAliveTime单位,可有天、小时、分钟、毫秒、微秒和纳秒等;
  • workQueue:工作队列;
  • threadFactory:线程工厂,一般默认;
  • handler:拒绝策略;

要想真正知道这些参数的意涵,需要先了解ThreadPoolExecutor的工作流程:

图1

图中的1、2、3、4分别表示:

  1. execute()/submit()任务后,如果正在运行的线程在corePoolSize范围之内,则创建线程运行;
  2. 当运行的线程数已经达到corePoolSize容量的时候,将任务放入工作队列中;
  3. 当队列的容量也填满的时候,判断运行的线程是否达到了maximumPoolSize,没有则创建线程运行任务;
  4. 队列和maximumPoolSize容量都已满,则实行拒绝策略;

比如假设有一个线程池,corePoolSize为2,workQueue大小为3,如果corePoolSize和workQueue都满了的情况下,再多4个线程任务maximumPoolSize也将满额。现在添加7个线程任务,不妨设这些线程的线程名为1,2,3,4,5,6,7。那么执行的顺序就是1,2,6,7,3,4,5。

一般来说,推荐使用new ThreadPoolExecutor对象的方式创建线程池,而非Executors,因为后者使用默认参数构造对象,写出来的代码容易产生被忽视的问题,在文章最后附上了Executors创建的一些线程池的简要介绍,在这里只举newSingleThreadExecutor:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

通过代码,连测试都不需要就可以看出,keepAliveTime针对的是实际alive线程数大于核心容量时的情况,不然这个“单例”线程池没有任何构造的必要。

拒绝策略

默认的拒绝策略如下:

1
2
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

RejectedExecutionHandler是一个接口,内部只有一个待实现的方法:

1
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

拒绝策略包括四种,他们分别是:CallerRunsPolicy、AbortPolicy、DiscardPolicy以及DiscardOldestPolicy。

CallerRunsPolicy

首先给出源码:

1
2
3
4
5
6
7
8
9
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

从r.run()可以看出,该策略是直接让主线程执行r的run方法,这样就可能面临很多意想不到的问题,比如主线程阻塞等等。

AbortPolicy

1
2
3
4
5
6
7
8
9
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

AbortPolicy策略就是抛异常。

DiscardPolicy

1
2
3
4
5
6
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

DiscardPolicy就是什么也没做。

DiscardOldestPolicy

1
2
3
4
5
6
7
8
9
10
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

该策略就是将工作队列poll一个,然后再将任务提交到线程池。

以上就是线程池框架的基本结构,通过该结构,了解了一些类的基本信息和框架运行流程。

线程池生命周期

上一节最后列出的代码中多次出现isShutdown方法,这里就来研究一下线程池的生命周期。

代码中的注释是这样说的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*

对各状态进行一个简要的“翻译”吧:

RUNNING:正常运行状态,能接受新任务,也能处理工作队列的任务;
SHUTDOWN:不接受新的任务,但是会处理还在工作队列中的任务;
STOP:不接受新的任务,也不处理工作队列的任务,还会interrupt正在运行的任务;
TIDYING:看名字就知道是一个过渡状态,所有任务销毁了,workerCount也变成0了,transitioning到该状态的时候,将运行terminated(),这是一个钩子方法,也就是空方法,待具体实现类去实现;
TERMINATED:terminated()完成的状态;

RUNNING -> SHUTDOWN 调用shutdown(),而(RUNNING or SHUTDOWN) -> STOP 调用shutdownNow(),这是两个可以直接干预生命周期的方法。

shutdown()

shutdown()的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
  • checkShutdownAccess():判断线程是否可shutdown,比如在内部实现中,某种情况会调用checkAccess(),如果线程为空会抛出空指针异常等等;
  • advanceRunState(SHUTDOWN):将状态设置为SHUTDOWN;
  • interruptIdleWorkers():中断空闲线程(此方法待与后面篇幅进行比较);
  • tryTerminate():内部是一个循环,在满足条件的循环内,只要workerCountOf(c) != 0,就持续调用interruptIdleWorkers()。tryTerminate()会不断的像它的名字一样,尝试将执行完空闲出来的线程terminate掉;

shutdownNow()

shutdownNow()的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

和shutdown()最大的不同在于返回了一个集合,并且中断方法也不一样了,换成了interruptWorkers()。在interruptWorkers内,主要执行的代码如下:

1
2
for (Worker w : workers)
w.interruptIfStarted();

而interruptIfStarted()核心代码是:

1
2
3
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
t.interrupt();
}

之所以需要判断getState() >= 0,是因为每个worker在接到第一个任务的时候后,会将state设置为-1(Worker类本身就继承了AQS,它也是个同步器),目的就是为了防止还没有执行runWorker方法之前就调用中断方法,“工人还没开工”,此时调用interrupt方法,不会更改中断标志位,使shutdownNow()失效。除此之外,只要线程不为null且没有被中断标识过都会调用interrupt方法。

而在shutdown()代码中,interruptIdleWorkers()方法最大的特点在于以下这段代码:

1
2
if (!t.isInterrupted() && w.tryLock())
t.interrupt();

也就是说,在shutdown()中,要运行t.interrupt(),他首先需要尝试去拿到Worker对象持有的锁,拿不到锁它不会让线程对象调用interrupt(),这是shutdown()和shutdownNow()的区别之一。

除此之外,shutdownNow()内还调用了drainQueue方法,看代码:

1
2
3
4
5
6
7
8
9
10
11
12
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

drainQueue()的作用主要就是用来清空workQueue到一个新的列表中。在调用了q.drainTo(taskList)后还进行if (!q.isEmpty()),是为了应对工作队列使用DelayQueue等其它方式实现的情况。一方面对线程池中的线程调用中断方法,另一方面清空工作队列中的任务,所以在之前源码注释中有这么一行:STOP -> TIDYING When pool is empty,只需要pool为empty的条件。

ctl

ctl是一个线程池管理的工具,它打包了两个变量:workerCount和runState,前者表示活动线程数,后者就是前文说提到过的线程池状态。

在代码的实现中ctl用AtomicInteger表示,采用位运算来生成值,低29位保存workerCount,高三位保存runState。比如RUNNING状态用下面代码表示:

1
private static final int RUNNING = -1 << COUNT_BITS;

然后用ctlOf方法来合成:

1
private static int ctlOf(int rs, int wc) { return rs | wc; }

wc就是workerCount,这样就把两个变量合成在一串二进制数字中了,当要分别取出它们的时候可以调用下列方法:

1
2
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }

CAPACITY这个中间变量又是通过下面代码计算得来:

1
2
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

线程池中线程运行机制

线程“包裹”在Worker类中,下面就开始研究线程池中线程的运行机制。

Worker类

首先Worker的继承结构如下:

1
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {}

先行注释有这样一段:

This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution. This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run.

这就是shutdown()方法可以在内部调用tryLock()来判断空闲线程的原因所在,Worker继承了AQS,利用了它提供的锁的机制。

Worker类构造函数和run方法:

1
2
3
4
5
6
7
8
9
10
11
12
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}

变量thread是worker运行的线程,firstTask是初始运行的任务,可以为null。setState(-1)之前在剖析shutdownNow方法的时候已经提到过了,主要就是防止在runWorker之前就调用中断方法,造成线程池提供的shutdownNow方法“失效”。getThreadFactory().newThread方法传入this,为后续开启线程时运行Worker的run方法,而这个方法进一步代理了runWorker(this)。

runWorker()

接下来继续看runWorker()代码,限于篇幅,本人把重要的部分抽离出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
task.run();
task = null;
w.completedTasks++;
w.unlock();
}
}

这段代码在设计上很好的表明了作者的意图,比如if语句中的Thread.interrupted(),它的目的注释写得很清楚:If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted. interrupted方法在这里调用后会清空标志位。

getTask()

接下来研究getTask()代码,我将一些状态判断的代码抽离掉,毕竟前面已经分析够多次相似的情形,而且逻辑也并不复杂(但抽离掉的部分会在一定条件下返回null,这构成前文提到的while循环执行的判断条件,注意一下即可),余下的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
nt c = ctl.get();
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

workQueue通常是阻塞队列(不是阻塞队列的情形更简单),一旦队列内为空,方法体将进入阻塞状态,类似生产者消费者模式。runWorker方法内的循环中,task = getTask()也会处于等待,这个时候,线程池中的这一线程并没有真正执行任务,那么无论之前已调用shutdown()还是shutdownNow(),都将执行Worker对象的interrupt方法。下面用一个小例说明。

下例中的线程池采用newFixedThreadPool,内部是LinkedBlockingQueue队列,先考虑程序中执行了shutdown()的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadPoolDemo03 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
Runnable task01 = () -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
pool.execute(task01);
pool.shutdown(); // 注释与未注释结果截然不同
// pool.shutdownNow();
}
}

这段代码的结果是在sleep了三秒后正常结束,睡三秒是为了保证pool.shutdown()在sleep之前已经调用,如果调用的是pool.shutdownNow(),结果是程序抛中断异常然后退出。

如果将上述代码中pool.shutdown()语句注释掉,结果是程序将始终挂起。

为什么在while循环之前要调用w.unlock()也就清楚了,程序执行在runWorker()的while循环的判断语句时,如果出现阻塞,会被认为线程处于空闲状态,自然而然不应该持有锁。

到目前为止,已经明白了线程池是如何对所持有的线程进行复用的了。

小结

线程池算是并发编程里面的一个“集大成”者,很多技术在线程池里都有涉及,部分技术的理解和运用参考本站其它内容,特别是本人之前对interrupt的分析,对理解线程池很有帮助。

附录

Executors创建的线程池简介

Executors提供了如下诸多线程池实现:

  1. newFixedThreadPool
  2. newSingleThreadExecutor
  3. newCachedThreadPool
  4. newWorkStealingPool
  5. newScheduledThreadPool
  6. newSingleThreadScheduledExecutor

newFixedThreadPool的实现:

1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

corePoolSize和maximumPoolSize的大小相等,keepAliveTime当然只能为0了,这里可以看出它的工作队列采用的是LinkedBlockingQueue,此构造方法需要传入线程工厂实例,但在另一个重载的构造方法中,默认了ThreadFactory参数,所以该参数是可选的。

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

和newFixedThreadPool最大不一样是将线程池中的线程数固定为1,它同样提供了重载的构造方法,可以自选线程工厂实现。

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

corePoolSize设为了0,maximumPoolSize为Integer.MAX_VALUE,可以看作无限大(很难找到需要这么多线程的实际场景),keepAliveTime设置了60L个单位,工作队列指定为SynchronousQueue,它同样提供了重载的构造方法,可以自选线程工厂实现。

后续JDK版本新增的newWorkStealingPool

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

ForkJoinPool内部实现了工作窃取算法,Runtime.getRuntime().availableProcessors()获取了处理器的核心数,ForkJoinPool.defaultForkJoinWorkerThreadFactory提供了线程工厂,拒绝策略设为null,同步模式为true。同时,存在一个重构方法,第一个参数可以指定为某个值,而不必采用处理器的核心数。

newScheduledThreadPool

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

返回的是一个ScheduledThreadPoolExecutor实例,同样的它也存在一个重载构造方法,可以用来自定义线程工厂。继续追踪ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口,它的构造方法如下:

1
2
3
4
5
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

内部维护一个DelayedWorkQueue对象作为工作队列。Executors还提供newSingleThreadScheduledExecutor方法,可归在newScheduledThreadPool系列中。

参考资料

《Java并发编程的艺术》

Introduction to Thread Pools in Java

Java里一个线程调用了Thread.interrupt()到底意味着什么?

toArray() vs. toArray(new Object[0])

Thread pools and work queues

源码分析之ThreadPoolExecutor