细谈线程池设计与监控

前言

万年八股文,在工作中肯定多多少少都使用了线程池,但有没有真正的去了解线程池的整个运行机制和核心关键点。今天分享一下。

线程池简介

先说 线程(Thread) , 在 Wiki 的解释是操作系统能进行运算调度的最小单元。包含在进程中,是进程实际运作的单元。 OK,先记住这一点,我们说 线程池(Thread Pool) ,顾名思义,基于池化思想的管理线程的手段(工具)。既然叫线程池了,可以 知道肯定是多线程的服务环境下。

我们都知道Server的资源是有限的,不可能无止境的扩张(从成本、运维上考虑)。线程过分的开辟的话,会导致整体服务器资源吃紧(线程带来的开销、创建、调度等等)。 池化的思想,线程池维护着多个线程,等待调度分配并发任务去执行。一方面避免有任务就创建线程带来的额外的开销,另一方面也是保护服务器产生过多的线程数量。

线程池解决什么问题

线程池解决的核心问题是资源管理问题,在并发环境下,系统和服务并不知道在什么时候需要多少资源。而 池化(Pooling) 思想,就是为了将资源统一管理的情况下,减小风险的思想。

WikiThreadPool

TheadPool

P.S : 池化思想举例

线程池核心设计

在看线程池的设计之前,先了解一下线程池的构造参数

  • corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
  • maximumPoolSize – the maximum number of threads to allow in the pool
  • keepAliveTime – when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
  • unit – the time unit for the keepAliveTime argument
  • workQueue – the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
  • threadFactory – the factory to use when the executor creates a new thread
  • handler – the handler to use when execution is blocked because the thread bounds and queue capacities are reached

废话太多了,直接上UML

线程池总框架设计

实现类 ThreadPoolExecutor UML

类图

  • 顶级接口 Executor 只有一个抽象方法 void execute(Runnable) 方法,这是将 任务线程 解耦,用户只需要提供 Runnable 对象,不需要关系如何创建线程、如何调度线程执行任务。
  • 下层的 ExecutorService 提供了诸多方法,这是对 Executor 进行了能力扩充:执行任务(同步、异步)Future 的方法、停止线程池和中断线程等方法
  • AbstractExecutorService 是一个抽象的实现,查看源码 invokeAll invokeAny doInvokeAny 能发现,这是将 线程任务 结合流转的实现,保证上层只需要关心 execute 方法就可以了。
  • 最下面的 ThreadPoolExecutor 实现了最麻烦的运行部分,同时维护这线程池的生命周期、还要管理线程和任务,在此,任务的提交线程管理监控等等

下面请看图

线程池执行流程

  • 从上图中看到看到了任务在提交后整个的执行流程、线程池内部就是一个构建了生产者``消费者模型,并将线程与任务进行了解耦。
  • 通过 缓冲任务 + 线程组 模式,达到复用线程的目的
  • 任务部分,生产者。提交任务,判断是进入哪种执行流程:第一、进入缓冲任务队列;第二、直接开辟线程执行;第三、执行拒绝
  • 线程管理,消费者。线程池维护一定的线程,线程执行完毕后会继续获取新的任务去执行,如果没有新的任务,线程被回收。

线程池的生命周期

要了解线程池的生命周期运行,就肯定要去看 ThreadPoolExecutor 的源码,从代码上可以看到一个核心的参数,

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

从是私有类型可以知道,这个不是用户显性设置的,这是由线程池运行的时候,由内部来维护的。那它干啥用的呢?

首先,它是一个 AtomicInteger 类型,也即是它是主要的控制状态,包含了两个关键性概念(一个int 32 位

  • workerCount低29位)表示有效的线程数,以及开启的线程数
  • runState高3位)记录正在处于 RunningShutting down 状态,这个是主要的生命周期控制

从注释上看出,runState 主要的取值有

  • RUNNING 接收新的任务,并且处理进入Queue的任务
  • SHUTDOWN 不能接收新的任务,但是继续处理进入Queue的任务
  • STOP 不接受新的任务,也不处理进入Queue 的任务,并且当前正在处理的任务中断
  • TIDYING 当所有的任务都终止、workCount=0,线程池状态转换到 TIDYING,后续运行 terminate() hook 方法。
  • TERMINATED terminate()方法执行完毕

线程池生命周期状态变化

runState 的取值变化是有顺序的

  • RUNNING -> SHUTDOWN

    On invocation of shutdown()

  • (RUNNING OR SHUTDOWN) -> STOP

    On invocation of shutdownNow()

  • SHUTDOWN -> TIDYING

    When both queue and pool are empty

  • STOP -> TIDYING

    When pool is empty

  • TIDYING -> TERMINATED

    When the terminated() hook method has completedå

线程池关键概念

上面提到线程池是解耦,任务线程 的关系。这样的话就拆分出两个关键的概念点

任务

结合代码上从方法名称上我们可以看到几个核心的方法

  • Executor#execute
  void execute(Runnable command);
  • ExecutorService#submit
  Future<?> submit(Runnable task);
  • AbstractExecutorService#submit 实现
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

从上述上可以看出,任务的提交到执行的流程最终都是执行execute的方法。

任务提交

任务提交,借助 newTaskForRunnable 对象构造成 RunnableFuture 对象作为任务传递进入。

任务调度执行

任务的调度就是 execute 方法, 主要的工作: 检查线程池运行状态、运行线程数、运行策略,决定后续走申请线程执行、或缓冲到队列中去,或直接拒绝该任务

大体执行的:

  • workCount < corePoolSize, 直接启动一个线程来执行新提交的任务(addWorker内部 当前任务作为这个线程的firstTask执行)
  • workCount >= corePoolSize, workQueue 未满,则加入到 workQueue
  • workCount >= corePoolSize, workCount < maximumPoolSize, workQueue 已满,创建新的线程执行
  • workCount >= maximumPoolSize, workQueue 已满,执行拒绝策略
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // workCount < corePoolSize数目,添加新的 worker 执行任务,当前的任务作为新加线程的 FirstTask
        if (workerCountOf(c) < corePoolSize) {
            // 添加成功,直接返回,不成功的话,说明当前线程池不允许提交任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        // 能执行到这里,说明了 workCount >= corePoolSize 或者 线程池不允许提交任务了
        
        // 检查线程池 RUNNING 状态,把任务添加到 任务Queue中
        if (isRunning(c) && workQueue.offer(command)) {
            //进入当前分支, 任务添加进入了 workQueue中。workCount 在[0,corePoolSize) 无条件开启新的线程,当 > corePoolSize 任务加入Queue
            int recheck = ctl.get();
            //如果线程不处于Running状态,那么移除已经入队的任务,并执行 reject 策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果线程池处于Running,且线程数量为 0 ,开启新的线程(这边是因为我们任务已经提交到Queue中,但是线程都关闭了怎么办)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果 workQueue 满了,以 maximumPoolSize 为最大值创建新的线程,创建失败的话,则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

任务执行缓冲

workQueue 是线程池管理任务的核心部分, workQueue 从代码上看是一个阻塞队列。

    private final BlockingQueue<Runnable> workQueue;

从注释上的看,这个queue,用于保存任务,并且移交任务给 worker thread

  • Queue 为空时候,获取元素的线程等待 Queue 变成非空
  • Queue 满了,存储元素的线程等待队列可用。

线程在执行中是执行任务,那么线程如何从 Queue 获取任务的呢?看 GetTask 方法

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // 两种情况: 
            //   1. runState == SHUTDOWN && workQueue.isEmpty()
            //   2. runState >= STOP
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                //CAS 操作, 减少 workCount 工作线程数
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            // workCount <= maximumPoolSize 且没有超时
            try {
                // workQueue 获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

在注释上描述了调用此方法的几种场景:

  • 任务执行阻塞,( corePoolSize 之内的线程不会被回收,它们一直等待任务)
  • 定时等待,KeepAliveTime 生效(这么长时间都没有任务,那么线程应该关闭)
  • 在发生下面的情况,必须返回 null, 工作线程必须退出:
    • 线程池 中 > maximumPoolSizeworker 线程存在(通过调用 setMaximumPoolSize 设置)
    • 线程池处于 SHUTDOWN,且 workQueue 为空,就是不接受新的任务了
    • 线程池处于 STOP,不仅不接受新的任务,已经存在 workQueue 的线程不在执行
    • 这个线程等待任务超时,且超时的 Worker线程随时可能被 Terminate,(workCount > corePoolSize 部分的 线程)

任务拒绝

为什么要拒绝任务呢,因为任务拒绝模块是线程池的保护机制,从上面我们知道,线程池有最大的容量,当线程池的任务缓存Queue满了后,并且线程池中的线程数达到maximumPoolSize 时候,就拒绝该任务了,保护线程池。

execute(Runnable command) 方法中有 reject 方法,这是任务拒绝执行操作。

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

ThreadPoolExecutor 有四个 RejectedExecutionHandler 的实现

  • ThreadPoolExecutor.AbortPolicy 丢弃并且直接抛出 RejectedExecutionException, 默认的策略
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
    }
  • ThreadPoolExecutor.DiscardPolicy 什么都不做,直接丢弃任务,无任何反馈
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   }
  • ThreadPoolExecutor.DiscardOldestPolicy 丢弃最早的任务,然后重新提交被拒绝的任务
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
          e.getQueue().poll();
          e.execute(r);
      }
   }
  • ThreadPoolExecutor.CallerRunsPolicy 由调用线程来执行该任务。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }

线程

线程池为了控制线程,维护线程的生命周期,设计了 Worker线程

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
  final Thread thread;
  Runnable firstTask;
  Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
  }
    //....... 省略代码
}

工作线程

从上面看出,Worker 实现了Runnable接口,持有Thread,由 ThreadFactory 创建出来,

 private boolean addWorker(Runnable firstTask, boolean core) {
        //....... 省略代码
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            //....... 省略代码
        } finally {
            if (! workerStarted)
            addWorkerFailed(w);
        }
        return workerStarted;
 }

工作线程的增加

从上面的代码能看出来 Worker 的创建,firstTask 参数不为空的时候,会立马执行,如果 firstTasknull,就去创建一个线程去获取 workQueue 的任务去执行 执行的流程在 Workerrun() 方法

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
  //....... 省略代码  
  public void run() {
    runWorker(this);
  }
  //....... 省略代码
}

工作线程的释放 和 执行任务

线程的释放和 执行任务都在 runWorker 方法中, 任务的获取:

  • 在一个 While 循环中一直在通过 getTask() 获取任务
  • getTask() 方法阻塞队列中获取任务,如果获取到,则执行任务(在之前还检查线程池状态了,具体看注释),
  • 如果取到 null 则跳出循环,则跳出循环,执行 processWorkerExit 方法,销毁线程

runWorker 方法

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

如何监控和改变

到现在我们了解了线程池的实现机制和原理。提出监控和改变是为了实时在线、精细化管控生产环境下的线程池任务、增加对应的监控和告警,应对突发情况。

针对各个场景下的,线程池的参数设置直接定死,其实不是安全的方式,从各个机器配置、环境配置上、应用的应对的服务请求量等等,而且线程池的执行情况也和各个任务类型相关性比较大。

那么我们就需要在 运行时环境下,我想根据真实情况下去更改线程池的参数,使其参数设置合理,这种方式就是将线程池的参数动态话。

如何动态化线程池的参数呢?

线程池的构造参数有上面提到的多个,但是最核心的还是 corePoolSizemaximumPoolSizeworkQueue,所以这几个参数的配置直接影响了线程池的情况

我们先看一下 ThreadPoolExecutorset get 方法

set方法

动态化修改参数可以借助 set方法 修改几个核心的参数

get方法

监控方向,可以借助 get方法获取线程池具体的执行情况

Ref:

comments powered by Disqus