细谈线程池设计与监控
前言
万年八股文,在工作中肯定多多少少都使用了线程池,但有没有真正的去了解线程池的整个运行机制和核心关键点。今天分享一下。
线程池简介
先说 线程(Thread) , 在 Wiki 的解释是操作系统能进行运算调度的最小单元。包含在进程中,是进程实际运作的单元。
OK,先记住这一点,我们说 线程池(Thread Pool) ,顾名思义,基于池化思想的管理线程的手段(工具)。既然叫线程池了,可以
知道肯定是多线程的服务环境下。
我们都知道Server的资源是有限的,不可能无止境的扩张(从成本、运维上考虑)。线程过分的开辟的话,会导致整体服务器资源吃紧(线程带来的开销、创建、调度等等)。
池化的思想,线程池维护着多个线程,等待调度分配并发任务去执行。一方面避免有任务就创建线程带来的额外的开销,另一方面也是保护服务器产生过多的线程数量。
线程池解决什么问题
线程池解决的核心问题是资源管理问题,在并发环境下,系统和服务并不知道在什么时候需要多少资源。而 池化(Pooling) 思想,就是为了将资源统一管理的情况下,减小风险的思想。
Wiki上 ThreadPool图

P.S : 池化思想举例
线程池核心设计
在看线程池的设计之前,先了解一下线程池的构造参数
corePoolSize– the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is setmaximumPoolSize– the maximum number of threads to allow in the poolkeepAliveTime– when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.unit– the time unit for the keepAliveTime argumentworkQueue– the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.threadFactory– the factory to use when the executor creates a new threadhandler– the handler to use when execution is blocked because the thread bounds and queue capacities are reached
废话太多了,直接上UML图
线程池总框架设计
实现类 ThreadPoolExecutor UML 图

- 顶级接口
Executor只有一个抽象方法void execute(Runnable)方法,这是将任务和线程解耦,用户只需要提供Runnable对象,不需要关系如何创建线程、如何调度线程执行任务。 - 下层的
ExecutorService提供了诸多方法,这是对Executor进行了能力扩充:执行任务(同步、异步)Future的方法、停止线程池和中断线程等方法 AbstractExecutorService是一个抽象的实现,查看源码invokeAllinvokeAnydoInvokeAny能发现,这是将线程与任务结合流转的实现,保证上层只需要关心execute方法就可以了。- 最下面的
ThreadPoolExecutor实现了最麻烦的运行部分,同时维护这线程池的生命周期、还要管理线程和任务,在此,任务的提交、线程管理、监控等等
下面请看图

- 从上图中看到看到了任务在提交后整个的执行流程、线程池内部就是一个构建了
生产者``消费者模型,并将线程与任务进行了解耦。 - 通过
缓冲任务+线程组模式,达到复用线程的目的 - 任务部分,
生产者。提交任务,判断是进入哪种执行流程:第一、进入缓冲任务队列;第二、直接开辟线程执行;第三、执行拒绝 - 线程管理,
消费者。线程池维护一定的线程,线程执行完毕后会继续获取新的任务去执行,如果没有新的任务,线程被回收。
线程池的生命周期
要了解线程池的生命周期运行,就肯定要去看 ThreadPoolExecutor 的源码,从代码上可以看到一个核心的参数,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
从是私有类型可以知道,这个不是用户显性设置的,这是由线程池运行的时候,由内部来维护的。那它干啥用的呢?
首先,它是一个 AtomicInteger 类型,也即是它是主要的控制状态,包含了两个关键性概念(一个int 32 位)
workerCount(低29位)表示有效的线程数,以及开启的线程数runState(高3位)记录正在处于Running、Shutting down状态,这个是主要的生命周期控制
从注释上看出,runState 主要的取值有
RUNNING接收新的任务,并且处理进入Queue的任务SHUTDOWN不能接收新的任务,但是继续处理进入Queue的任务STOP不接受新的任务,也不处理进入Queue 的任务,并且当前正在处理的任务中断TIDYING当所有的任务都终止、workCount=0,线程池状态转换到TIDYING,后续运行terminate() hook方法。TERMINATEDterminate()方法执行完毕

runState 的取值变化是有顺序的
RUNNING->SHUTDOWNOn invocation of shutdown()
- (
RUNNINGORSHUTDOWN) ->STOPOn invocation of shutdownNow()
SHUTDOWN->TIDYINGWhen both queue and pool are empty
STOP->TIDYINGWhen pool is empty
TIDYING->TERMINATEDWhen the terminated() hook method has completedå
线程池关键概念
上面提到线程池是解耦,任务 和 线程 的关系。这样的话就拆分出两个关键的概念点
任务
结合代码上从方法名称上我们可以看到几个核心的方法
Executor#execute
void execute(Runnable command);
ExecutorService#submit
Future<?> submit(Runnable task);
AbstractExecutorService#submit实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
从上述上可以看出,任务的提交到执行的流程最终都是执行execute的方法。
任务提交
任务提交,借助 newTaskFor 将 Runnable 对象构造成 RunnableFuture 对象作为任务传递进入。
任务调度执行
任务的调度就是 execute 方法, 主要的工作: 检查线程池运行状态、运行线程数、运行策略,决定后续走申请线程执行、或缓冲到队列中去,或直接拒绝该任务
大体执行的:
workCount<corePoolSize, 直接启动一个线程来执行新提交的任务(addWorker内部 当前任务作为这个线程的firstTask执行)workCount>=corePoolSize,workQueue未满,则加入到workQueue。workCount>=corePoolSize,workCount<maximumPoolSize,workQueue已满,创建新的线程执行workCount>=maximumPoolSize,workQueue已满,执行拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// workCount < corePoolSize数目,添加新的 worker 执行任务,当前的任务作为新加线程的 FirstTask
if (workerCountOf(c) < corePoolSize) {
// 添加成功,直接返回,不成功的话,说明当前线程池不允许提交任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 能执行到这里,说明了 workCount >= corePoolSize 或者 线程池不允许提交任务了
// 检查线程池 RUNNING 状态,把任务添加到 任务Queue中
if (isRunning(c) && workQueue.offer(command)) {
//进入当前分支, 任务添加进入了 workQueue中。workCount 在[0,corePoolSize) 无条件开启新的线程,当 > corePoolSize 任务加入Queue
int recheck = ctl.get();
//如果线程不处于Running状态,那么移除已经入队的任务,并执行 reject 策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池处于Running,且线程数量为 0 ,开启新的线程(这边是因为我们任务已经提交到Queue中,但是线程都关闭了怎么办)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 满了,以 maximumPoolSize 为最大值创建新的线程,创建失败的话,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
任务执行缓冲
workQueue 是线程池管理任务的核心部分, workQueue 从代码上看是一个阻塞队列。
private final BlockingQueue<Runnable> workQueue;
从注释上的看,这个queue,用于保存任务,并且移交任务给 worker thread。
- 当
Queue为空时候,获取元素的线程等待Queue变成非空 - 当
Queue满了,存储元素的线程等待队列可用。
线程在执行中是执行任务,那么线程如何从 Queue 获取任务的呢?看 GetTask 方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 两种情况:
// 1. runState == SHUTDOWN && workQueue.isEmpty()
// 2. runState >= STOP
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
//CAS 操作, 减少 workCount 工作线程数
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// workCount <= maximumPoolSize 且没有超时
try {
// workQueue 获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在注释上描述了调用此方法的几种场景:
- 任务执行阻塞,(
corePoolSize之内的线程不会被回收,它们一直等待任务) - 定时等待,
KeepAliveTime生效(这么长时间都没有任务,那么线程应该关闭) - 在发生下面的情况,必须返回
null, 工作线程必须退出:- 线程池 中
> maximumPoolSize个worker线程存在(通过调用setMaximumPoolSize设置) - 线程池处于
SHUTDOWN,且workQueue为空,就是不接受新的任务了 - 线程池处于
STOP,不仅不接受新的任务,已经存在workQueue的线程不在执行 - 这个线程等待任务超时,且超时的
Worker线程随时可能被Terminate,(workCount > corePoolSize部分的 线程)
- 线程池 中
任务拒绝
为什么要拒绝任务呢,因为任务拒绝模块是线程池的保护机制,从上面我们知道,线程池有最大的容量,当线程池的任务缓存Queue满了后,并且线程池中的线程数达到maximumPoolSize
时候,就拒绝该任务了,保护线程池。
在 execute(Runnable command) 方法中有 reject 方法,这是任务拒绝执行操作。
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
在 ThreadPoolExecutor 有四个 RejectedExecutionHandler 的实现
ThreadPoolExecutor.AbortPolicy丢弃并且直接抛出RejectedExecutionException, 默认的策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
ThreadPoolExecutor.DiscardPolicy什么都不做,直接丢弃任务,无任何反馈
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
ThreadPoolExecutor.DiscardOldestPolicy丢弃最早的任务,然后重新提交被拒绝的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
ThreadPoolExecutor.CallerRunsPolicy由调用线程来执行该任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
线程
线程池为了控制线程,维护线程的生命周期,设计了 Worker线程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//....... 省略代码
}
工作线程
从上面看出,Worker 实现了Runnable接口,持有Thread,由 ThreadFactory 创建出来,
private boolean addWorker(Runnable firstTask, boolean core) {
//....... 省略代码
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
//....... 省略代码
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
工作线程的增加
从上面的代码能看出来 Worker 的创建,firstTask 参数不为空的时候,会立马执行,如果 firstTask 为 null,就去创建一个线程去获取 workQueue 的任务去执行
执行的流程在 Worker 的 run() 方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
//....... 省略代码
public void run() {
runWorker(this);
}
//....... 省略代码
}
工作线程的释放 和 执行任务
线程的释放和 执行任务都在 runWorker 方法中,
任务的获取:
- 在一个
While循环中一直在通过getTask()获取任务 getTask()方法阻塞队列中获取任务,如果获取到,则执行任务(在之前还检查线程池状态了,具体看注释),- 如果取到
null则跳出循环,则跳出循环,执行processWorkerExit方法,销毁线程
runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
如何监控和改变
到现在我们了解了线程池的实现机制和原理。提出监控和改变是为了实时在线、精细化管控生产环境下的线程池任务、增加对应的监控和告警,应对突发情况。
针对各个场景下的,线程池的参数设置直接定死,其实不是安全的方式,从各个机器配置、环境配置上、应用的应对的服务请求量等等,而且线程池的执行情况也和各个任务类型相关性比较大。
那么我们就需要在 运行时环境下,我想根据真实情况下去更改线程池的参数,使其参数设置合理,这种方式就是将线程池的参数动态话。
如何动态化线程池的参数呢?
线程池的构造参数有上面提到的多个,但是最核心的还是 corePoolSize、maximumPoolSize 和 workQueue,所以这几个参数的配置直接影响了线程池的情况
我们先看一下 ThreadPoolExecutor 的 set get 方法

动态化修改参数可以借助 set方法 修改几个核心的参数

监控方向,可以借助 get方法获取线程池具体的执行情况
Ref:
- JDK16.0.2源码
- Pool_(computer_science)
- 线程池ThreadPool