本篇主要解析线程池源码,最大的看点在于对shutdown()和shutdownNow()的比较分析以及对线程复用的深入研究,源码中还有很多巧妙的设计,比如ctl等,文章中相关的部分也会一并谈到。
线程池框架结构
首先有必要了解一下线程池中主要的类和接口,初学很容易将它们混淆,实际上框架的继承结构非常简单,继承结构中没有出现@Override:
- Executor:一个接口,只有一个未实现的execute();
- ExecutorService:一个接口,从实际的角度来看,拓展了Executor,最重要的可能就是添加了未实现的submit(),submit()的返回值是Future;当然,除此之外,还添加了一些未实现的方法,诸如shutdown()、isShutdown()、invokeAny()、invokeAll()等,这些方法的存在,让ExecutorService相对比较完备,所以在产生线程池实例时,完全可以用ExecutorService作为引用的类型;
- AbstractExecutorService:一个抽象类,实现了ExecutorService的submit()、invokeAll和invokeAny();
- ThreadPoolExecutor:线程池具体实现类,直接继承自AbstractExecutorService,对后者实现的方法不进行覆盖(submit()等),没有实现的方法均给出实现;
- Executors:一个工厂类,大部分返回ThreadPoolExecutor实例;
由此可见,几乎所有线程池具体的实现逻辑,都发生在ThreadPoolExecutor内部。
ThreadPoolExecutor实现机理
构造 & 运行流程
下面是ThreadPoolExecutor带全参的构造器:
|
|
- corePoolSize:核心池的容量;
- maximumPoolSize:线程池的最大容量;
- keepAliveTime:当实际容量大于核心容量时,线程如果超过这个时间仍然“无事可做”,则要求被回收;
- unit:keepAliveTime单位,可有天、小时、分钟、毫秒、微秒和纳秒等;
- workQueue:工作队列;
- threadFactory:线程工厂,一般默认;
- handler:拒绝策略;
要想真正知道这些参数的意涵,需要先了解ThreadPoolExecutor的工作流程:
图中的1、2、3、4分别表示:
- execute()/submit()任务后,如果正在运行的线程在corePoolSize范围之内,则创建线程运行;
- 当运行的线程数已经达到corePoolSize容量的时候,将任务放入工作队列中;
- 当队列的容量也填满的时候,判断运行的线程是否达到了maximumPoolSize,没有则创建线程运行任务;
- 队列和maximumPoolSize容量都已满,则实行拒绝策略;
比如假设有一个线程池,corePoolSize为2,workQueue大小为3,如果corePoolSize和workQueue都满了的情况下,再多4个线程任务maximumPoolSize也将满额。现在添加7个线程任务,不妨设这些线程的线程名为1,2,3,4,5,6,7。那么执行的顺序就是1,2,6,7,3,4,5。
一般来说,推荐使用new ThreadPoolExecutor对象的方式创建线程池,而非Executors,因为后者使用默认参数构造对象,写出来的代码容易产生被忽视的问题,在文章最后附上了Executors创建的一些线程池的简要介绍,在这里只举newSingleThreadExecutor:
|
|
通过代码,连测试都不需要就可以看出,keepAliveTime针对的是实际alive线程数大于核心容量时的情况,不然这个“单例”线程池没有任何构造的必要。
拒绝策略
默认的拒绝策略如下:
|
|
RejectedExecutionHandler是一个接口,内部只有一个待实现的方法:
|
|
拒绝策略包括四种,他们分别是:CallerRunsPolicy、AbortPolicy、DiscardPolicy以及DiscardOldestPolicy。
CallerRunsPolicy
首先给出源码:
|
|
从r.run()可以看出,该策略是直接让主线程执行r的run方法,这样就可能面临很多意想不到的问题,比如主线程阻塞等等。
AbortPolicy
|
|
AbortPolicy策略就是抛异常。
DiscardPolicy
|
|
DiscardPolicy就是什么也没做。
DiscardOldestPolicy
|
|
该策略就是将工作队列poll一个,然后再将任务提交到线程池。
以上就是线程池框架的基本结构,通过该结构,了解了一些类的基本信息和框架运行流程。
线程池生命周期
上一节最后列出的代码中多次出现isShutdown方法,这里就来研究一下线程池的生命周期。
代码中的注释是这样说的:
|
|
对各状态进行一个简要的“翻译”吧:
RUNNING:正常运行状态,能接受新任务,也能处理工作队列的任务;
SHUTDOWN:不接受新的任务,但是会处理还在工作队列中的任务;
STOP:不接受新的任务,也不处理工作队列的任务,还会interrupt正在运行的任务;
TIDYING:看名字就知道是一个过渡状态,所有任务销毁了,workerCount也变成0了,transitioning到该状态的时候,将运行terminated(),这是一个钩子方法,也就是空方法,待具体实现类去实现;
TERMINATED:terminated()完成的状态;
RUNNING -> SHUTDOWN 调用shutdown(),而(RUNNING or SHUTDOWN) -> STOP 调用shutdownNow(),这是两个可以直接干预生命周期的方法。
shutdown()
shutdown()的源代码如下:
|
|
- checkShutdownAccess():判断线程是否可shutdown,比如在内部实现中,某种情况会调用checkAccess(),如果线程为空会抛出空指针异常等等;
- advanceRunState(SHUTDOWN):将状态设置为SHUTDOWN;
- interruptIdleWorkers():中断空闲线程(此方法待与后面篇幅进行比较);
- tryTerminate():内部是一个循环,在满足条件的循环内,只要workerCountOf(c) != 0,就持续调用interruptIdleWorkers()。tryTerminate()会不断的像它的名字一样,尝试将执行完空闲出来的线程terminate掉;
shutdownNow()
shutdownNow()的源代码如下:
|
|
和shutdown()最大的不同在于返回了一个集合,并且中断方法也不一样了,换成了interruptWorkers()。在interruptWorkers内,主要执行的代码如下:
|
|
而interruptIfStarted()核心代码是:
|
|
之所以需要判断getState() >= 0,是因为每个worker在接到第一个任务的时候后,会将state设置为-1(Worker类本身就继承了AQS,它也是个同步器),目的就是为了防止还没有执行runWorker方法之前就调用中断方法,“工人还没开工”,此时调用interrupt方法,不会更改中断标志位,使shutdownNow()失效。除此之外,只要线程不为null且没有被中断标识过都会调用interrupt方法。
而在shutdown()代码中,interruptIdleWorkers()方法最大的特点在于以下这段代码:
|
|
也就是说,在shutdown()中,要运行t.interrupt(),他首先需要尝试去拿到Worker对象持有的锁,拿不到锁它不会让线程对象调用interrupt(),这是shutdown()和shutdownNow()的区别之一。
除此之外,shutdownNow()内还调用了drainQueue方法,看代码:
|
|
drainQueue()的作用主要就是用来清空workQueue到一个新的列表中。在调用了q.drainTo(taskList)后还进行if (!q.isEmpty()),是为了应对工作队列使用DelayQueue等其它方式实现的情况。一方面对线程池中的线程调用中断方法,另一方面清空工作队列中的任务,所以在之前源码注释中有这么一行:STOP -> TIDYING When pool is empty,只需要pool为empty的条件。
ctl
ctl是一个线程池管理的工具,它打包了两个变量:workerCount和runState,前者表示活动线程数,后者就是前文说提到过的线程池状态。
在代码的实现中ctl用AtomicInteger表示,采用位运算来生成值,低29位保存workerCount,高三位保存runState。比如RUNNING状态用下面代码表示:
|
|
然后用ctlOf方法来合成:
|
|
wc就是workerCount,这样就把两个变量合成在一串二进制数字中了,当要分别取出它们的时候可以调用下列方法:
|
|
CAPACITY这个中间变量又是通过下面代码计算得来:
|
|
线程池中线程运行机制
线程“包裹”在Worker类中,下面就开始研究线程池中线程的运行机制。
Worker类
首先Worker的继承结构如下:
|
|
先行注释有这样一段:
This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution. This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run.
这就是shutdown()方法可以在内部调用tryLock()来判断空闲线程的原因所在,Worker继承了AQS,利用了它提供的锁的机制。
Worker类构造函数和run方法:
|
|
变量thread是worker运行的线程,firstTask是初始运行的任务,可以为null。setState(-1)之前在剖析shutdownNow方法的时候已经提到过了,主要就是防止在runWorker之前就调用中断方法,造成线程池提供的shutdownNow方法“失效”。getThreadFactory().newThread方法传入this,为后续开启线程时运行Worker的run方法,而这个方法进一步代理了runWorker(this)。
runWorker()
接下来继续看runWorker()代码,限于篇幅,本人把重要的部分抽离出来:
|
|
这段代码在设计上很好的表明了作者的意图,比如if语句中的Thread.interrupted()
,它的目的注释写得很清楚:If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted. interrupted方法在这里调用后会清空标志位。
getTask()
接下来研究getTask()代码,我将一些状态判断的代码抽离掉,毕竟前面已经分析够多次相似的情形,而且逻辑也并不复杂(但抽离掉的部分会在一定条件下返回null,这构成前文提到的while循环执行的判断条件,注意一下即可),余下的代码如下:
|
|
workQueue通常是阻塞队列(不是阻塞队列的情形更简单),一旦队列内为空,方法体将进入阻塞状态,类似生产者消费者模式。runWorker方法内的循环中,task = getTask()也会处于等待,这个时候,线程池中的这一线程并没有真正执行任务,那么无论之前已调用shutdown()还是shutdownNow(),都将执行Worker对象的interrupt方法。下面用一个小例说明。
下例中的线程池采用newFixedThreadPool,内部是LinkedBlockingQueue队列,先考虑程序中执行了shutdown()的情况:
|
|
这段代码的结果是在sleep了三秒后正常结束,睡三秒是为了保证pool.shutdown()在sleep之前已经调用,如果调用的是pool.shutdownNow(),结果是程序抛中断异常然后退出。
如果将上述代码中pool.shutdown()语句注释掉,结果是程序将始终挂起。
为什么在while循环之前要调用w.unlock()也就清楚了,程序执行在runWorker()的while循环的判断语句时,如果出现阻塞,会被认为线程处于空闲状态,自然而然不应该持有锁。
到目前为止,已经明白了线程池是如何对所持有的线程进行复用的了。
小结
线程池算是并发编程里面的一个“集大成”者,很多技术在线程池里都有涉及,部分技术的理解和运用参考本站其它内容,特别是本人之前对interrupt的分析,对理解线程池很有帮助。
附录
Executors创建的线程池简介
Executors提供了如下诸多线程池实现:
- newFixedThreadPool
- newSingleThreadExecutor
- newCachedThreadPool
- newWorkStealingPool
- newScheduledThreadPool
- newSingleThreadScheduledExecutor
newFixedThreadPool
的实现:
|
|
corePoolSize和maximumPoolSize的大小相等,keepAliveTime当然只能为0了,这里可以看出它的工作队列采用的是LinkedBlockingQueue,此构造方法需要传入线程工厂实例,但在另一个重载的构造方法中,默认了ThreadFactory参数,所以该参数是可选的。
newSingleThreadExecutor
:
|
|
和newFixedThreadPool最大不一样是将线程池中的线程数固定为1,它同样提供了重载的构造方法,可以自选线程工厂实现。
newCachedThreadPool
:
|
|
corePoolSize设为了0,maximumPoolSize为Integer.MAX_VALUE,可以看作无限大(很难找到需要这么多线程的实际场景),keepAliveTime设置了60L个单位,工作队列指定为SynchronousQueue,它同样提供了重载的构造方法,可以自选线程工厂实现。
后续JDK版本新增的newWorkStealingPool
:
|
|
ForkJoinPool内部实现了工作窃取算法,Runtime.getRuntime().availableProcessors()获取了处理器的核心数,ForkJoinPool.defaultForkJoinWorkerThreadFactory提供了线程工厂,拒绝策略设为null,同步模式为true。同时,存在一个重构方法,第一个参数可以指定为某个值,而不必采用处理器的核心数。
newScheduledThreadPool
:
|
|
返回的是一个ScheduledThreadPoolExecutor实例,同样的它也存在一个重载构造方法,可以用来自定义线程工厂。继续追踪ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口,它的构造方法如下:
|
|
内部维护一个DelayedWorkQueue对象作为工作队列。Executors还提供newSingleThreadScheduledExecutor
方法,可归在newScheduledThreadPool系列中。
参考资料
Introduction to Thread Pools in Java
Java里一个线程调用了Thread.interrupt()到底意味着什么?