细谈线程池设计与监控
前言
万年八股文,在工作中肯定多多少少都使用了线程池
,但有没有真正的去了解线程池
的整个运行机制和核心关键点。今天分享一下。
线程池简介
先说 线程(Thread)
, 在 Wiki
的解释是操作系统能进行运算调度的最小单元
。包含在进程中,是进程实际运作的单元
。
OK,先记住这一点,我们说 线程池(Thread Pool)
,顾名思义,基于池化思想
的管理线程的手段(工具)。既然叫线程池了,可以
知道肯定是多线程
的服务环境下。
我们都知道Server
的资源是有限的,不可能无止境的扩张(从成本、运维上考虑)。线程过分的开辟的话,会导致整体服务器资源吃紧(线程带来的开销、创建、调度等等)。
池化的思想,线程池维护着多个线程,等待调度分配并发任务去执行。一方面避免有任务就创建线程带来的额外的开销,另一方面也是保护服务器产生过多的线程数量。
线程池解决什么问题
线程池
解决的核心问题是资源管理
问题,在并发环境下,系统和服务并不知道在什么时候需要多少资源。而 池化(Pooling)
思想,就是为了将资源统一管理的情况下,减小风险的思想。
Wiki
上 ThreadPool
图
P.S : 池化思想举例
线程池核心设计
在看线程池的设计之前,先了解一下线程池的构造参数
corePoolSize
– the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is setmaximumPoolSize
– the maximum number of threads to allow in the poolkeepAliveTime
– when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.unit
– the time unit for the keepAliveTime argumentworkQueue
– the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.threadFactory
– the factory to use when the executor creates a new threadhandler
– the handler to use when execution is blocked because the thread bounds and queue capacities are reached
废话太多了,直接上UML
图
线程池总框架设计
实现类 ThreadPoolExecutor
UML
图
- 顶级接口
Executor
只有一个抽象方法void execute(Runnable)
方法,这是将任务
和线程
解耦,用户只需要提供Runnable
对象,不需要关系如何创建线程、如何调度线程执行任务。 - 下层的
ExecutorService
提供了诸多方法,这是对Executor
进行了能力扩充:执行任务(同步、异步)Future
的方法、停止线程池和中断线程等方法 AbstractExecutorService
是一个抽象的实现,查看源码invokeAll
invokeAny
doInvokeAny
能发现,这是将线程
与任务
结合流转的实现,保证上层只需要关心execute
方法就可以了。- 最下面的
ThreadPoolExecutor
实现了最麻烦的运行部分,同时维护这线程池的生命周期、还要管理线程和任务,在此,任务的提交
、线程管理
、监控
等等
下面请看图
- 从上图中看到看到了任务在提交后整个的执行流程、线程池内部就是一个构建了
生产者``消费者
模型,并将线程与任务进行了解耦。 - 通过
缓冲任务
+线程组
模式,达到复用线程的目的 - 任务部分,
生产者
。提交任务,判断是进入哪种执行流程:第一、进入缓冲任务队列;第二、直接开辟线程执行;第三、执行拒绝 - 线程管理,
消费者
。线程池维护一定的线程,线程执行完毕后会继续获取新的任务去执行,如果没有新的任务,线程被回收。
线程池的生命周期
要了解线程池的生命周期运行,就肯定要去看 ThreadPoolExecutor
的源码,从代码上可以看到一个核心的参数,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
从是私有类型可以知道,这个不是用户显性设置的,这是由线程池运行的时候,由内部来维护的。那它干啥用的呢?
首先,它是一个 AtomicInteger
类型,也即是它是主要的控制状态,包含了两个关键性概念(一个int 32 位
)
workerCount
(低29位
)表示有效的线程数,以及开启的线程数runState
(高3位
)记录正在处于Running
、Shutting down
状态,这个是主要的生命周期控制
从注释上看出,runState
主要的取值有
RUNNING
接收新的任务,并且处理进入Queue
的任务SHUTDOWN
不能接收新的任务,但是继续处理进入Queue
的任务STOP
不接受新的任务,也不处理进入Queue 的任务,并且当前正在处理的任务中断TIDYING
当所有的任务都终止、workCount=0
,线程池状态转换到TIDYING
,后续运行terminate() hook
方法。TERMINATED
terminate()
方法执行完毕
runState
的取值变化是有顺序的
RUNNING
->SHUTDOWN
On invocation of shutdown()
- (
RUNNING
ORSHUTDOWN
) ->STOP
On invocation of shutdownNow()
SHUTDOWN
->TIDYING
When both queue and pool are empty
STOP
->TIDYING
When pool is empty
TIDYING
->TERMINATED
When the terminated() hook method has completedå
线程池关键概念
上面提到线程池是解耦,任务
和 线程
的关系。这样的话就拆分出两个关键的概念点
任务
结合代码上从方法名称上我们可以看到几个核心的方法
Executor#execute
void execute(Runnable command);
ExecutorService#submit
Future<?> submit(Runnable task);
AbstractExecutorService#submit
实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
从上述上可以看出,任务的提交到执行的流程最终都是执行execute
的方法。
任务提交
任务提交,借助 newTaskFor
将 Runnable
对象构造成 RunnableFuture
对象作为任务传递进入。
任务调度执行
任务的调度就是 execute
方法, 主要的工作: 检查线程池运行状态、运行线程数、运行策略,决定后续走申请线程执行、或缓冲到队列中去,或直接拒绝该任务
大体执行的:
workCount
<corePoolSize
, 直接启动一个线程来执行新提交的任务(addWorker
内部 当前任务作为这个线程的firstTask
执行)workCount
>=corePoolSize
,workQueue
未满,则加入到workQueue
。workCount
>=corePoolSize
,workCount
<maximumPoolSize
,workQueue
已满,创建新的线程执行workCount
>=maximumPoolSize
,workQueue
已满,执行拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// workCount < corePoolSize数目,添加新的 worker 执行任务,当前的任务作为新加线程的 FirstTask
if (workerCountOf(c) < corePoolSize) {
// 添加成功,直接返回,不成功的话,说明当前线程池不允许提交任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 能执行到这里,说明了 workCount >= corePoolSize 或者 线程池不允许提交任务了
// 检查线程池 RUNNING 状态,把任务添加到 任务Queue中
if (isRunning(c) && workQueue.offer(command)) {
//进入当前分支, 任务添加进入了 workQueue中。workCount 在[0,corePoolSize) 无条件开启新的线程,当 > corePoolSize 任务加入Queue
int recheck = ctl.get();
//如果线程不处于Running状态,那么移除已经入队的任务,并执行 reject 策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池处于Running,且线程数量为 0 ,开启新的线程(这边是因为我们任务已经提交到Queue中,但是线程都关闭了怎么办)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 满了,以 maximumPoolSize 为最大值创建新的线程,创建失败的话,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
任务执行缓冲
workQueue
是线程池管理任务的核心部分, workQueue
从代码上看是一个阻塞队列。
private final BlockingQueue<Runnable> workQueue;
从注释上的看,这个queue
,用于保存任务,并且移交任务给 worker thread
。
- 当
Queue
为空时候,获取元素的线程等待Queue
变成非空 - 当
Queue
满了,存储元素的线程等待队列可用。
线程在执行中是执行任务,那么线程如何从 Queue
获取任务的呢?看 GetTask
方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 两种情况:
// 1. runState == SHUTDOWN && workQueue.isEmpty()
// 2. runState >= STOP
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
//CAS 操作, 减少 workCount 工作线程数
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// workCount <= maximumPoolSize 且没有超时
try {
// workQueue 获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在注释上描述了调用此方法的几种场景:
- 任务执行阻塞,(
corePoolSize
之内的线程不会被回收,它们一直等待任务) - 定时等待,
KeepAliveTime
生效(这么长时间都没有任务,那么线程应该关闭) - 在发生下面的情况,必须返回
null
, 工作线程必须退出:- 线程池 中
> maximumPoolSize
个worker
线程存在(通过调用setMaximumPoolSize
设置) - 线程池处于
SHUTDOWN
,且workQueue
为空,就是不接受新的任务了 - 线程池处于
STOP
,不仅不接受新的任务,已经存在workQueue
的线程不在执行 - 这个线程等待任务超时,且超时的
Worker
线程随时可能被Terminate
,(workCount > corePoolSize
部分的 线程)
- 线程池 中
任务拒绝
为什么要拒绝任务呢,因为任务拒绝模块是线程池的保护机制,从上面我们知道,线程池有最大的容量,当线程池的任务缓存Queue
满了后,并且线程池中的线程数达到maximumPoolSize
时候,就拒绝该任务了,保护线程池。
在 execute(Runnable command)
方法中有 reject
方法,这是任务拒绝执行操作。
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
在 ThreadPoolExecutor
有四个 RejectedExecutionHandler
的实现
ThreadPoolExecutor.AbortPolicy
丢弃并且直接抛出RejectedExecutionException
, 默认的策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
ThreadPoolExecutor.DiscardPolicy
什么都不做,直接丢弃任务,无任何反馈
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
ThreadPoolExecutor.DiscardOldestPolicy
丢弃最早的任务,然后重新提交被拒绝的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
ThreadPoolExecutor.CallerRunsPolicy
由调用线程来执行该任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
线程
线程池为了控制线程,维护线程的生命周期,设计了 Worker线程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//....... 省略代码
}
工作线程
从上面看出,Worker
实现了Runnable
接口,持有Thread
,由 ThreadFactory
创建出来,
private boolean addWorker(Runnable firstTask, boolean core) {
//....... 省略代码
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
//....... 省略代码
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
工作线程的增加
从上面的代码能看出来 Worker
的创建,firstTask
参数不为空的时候,会立马执行,如果 firstTask
为 null
,就去创建一个线程去获取 workQueue
的任务去执行
执行的流程在 Worker
的 run()
方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
//....... 省略代码
public void run() {
runWorker(this);
}
//....... 省略代码
}
工作线程的释放 和 执行任务
线程的释放和 执行任务都在 runWorker
方法中,
任务的获取:
- 在一个
While
循环中一直在通过getTask()
获取任务 getTask()
方法阻塞队列中获取任务,如果获取到,则执行任务(在之前还检查线程池状态了,具体看注释),- 如果取到
null
则跳出循环,则跳出循环,执行processWorkerExit
方法,销毁线程
runWorker
方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
如何监控和改变
到现在我们了解了线程池的实现机制和原理。提出监控和改变是为了实时在线、精细化管控生产环境下的线程池任务、增加对应的监控和告警,应对突发情况。
针对各个场景下的,线程池的参数设置直接定死,其实不是安全的方式,从各个机器配置、环境配置上、应用的应对的服务请求量等等,而且线程池的执行情况也和各个任务类型相关性比较大。
那么我们就需要在 运行时环境下,我想根据真实情况下去更改线程池的参数,使其参数设置合理,这种方式就是将线程池的参数动态话。
如何动态化线程池的参数呢?
线程池的构造参数有上面提到的多个,但是最核心的还是 corePoolSize
、maximumPoolSize
和 workQueue
,所以这几个参数的配置直接影响了线程池的情况
我们先看一下 ThreadPoolExecutor
的 set
get
方法
动态化修改参数可以借助 set方法
修改几个核心的参数
监控方向,可以借助 get方法
获取线程池具体的执行情况
Ref:
- JDK16.0.2源码
- Pool_(computer_science)
- 线程池ThreadPool