概述
什么是线程池
线程池(Thread Pool)是一种基于池化思想用于管理线程的工具,常出现在多线程服务器中,比如 MySQL
线程过多会有额外的开销,例如创建销毁线程的开销、调度线程的开销等,同时也降低了机器的性能
线程池同时维护多个线程,等待分配可并发执行的任务;这种做饭,一方面避免了处理任务时创建销毁线程的开销,另一方面避免了线程数量膨胀导致的过分调度问题
线程池的优点:
- 降低资源消耗: 通过池化技术重复利用已创建的线程,降低线程创建和销毁的开销
- 提高响应速度: 有任务需要执行时,可以立刻开始执行
- 提高线程的可管理性: 线程是稀缺资源,如果无限创建,不仅会消耗系统资源,还会因线程的不合理分布导致**资源调度失衡:**降低系统稳定性,使用线程池可以进行统一的分配、调优和监控
- 提供更多功能: 线程池有可拓展性,可以添加许多其他功能,比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或者定期执行
线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题,在并发环境下,系统不能确定在任意时刻,有多少任务需要执行,有多少资源需要投入,这种不确定性会带来下面的问题:
- 频繁申请/销毁资源和调度资源,带来额外的消耗,可能会非常巨大
- 对资源无线申请缺少抑制手段,容易引发系统资源耗尽
- 系统无法合理管理内部的资源分布,会降低系统的稳定性
为了解决资源分配这个问题,线程池采用了“池化”(Pooling)思想
池化:为了最大化收益和最小化风险,将资源统一在一起管理的一种思想
线程池的核心设计与实现
总体设计
继承关系
Java 中线程池核心实现类是 ThreadPoolExecutor,下面是它的继承关系
Executor
顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需要提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架来完成线程的调配和任务的执行部分
ExecutorService
ExecutorService 接口增加了一些能力:
- 扩充执行任务的能力,扩充可以为一个或一批异步任务生成 Future 的方法
- 提供了管控线程池的能力,比如停止线程池的运行
AbstractExecutorService
AbstractExecutorService 是上层的抽象类,将执行任务的流程串联了起来,包装下层的实现只需关注一个执行任务的方法
ThreadPoolExecutor
最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者更好的结合从而执行并行任务
运行机制
线程池在内部构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分为两部分:任务管理、线程管理
任务管理部分充当生产者角色,当任务提交后,线程池会判断该任务后续的流转:
- 直接申请线程执行该任务
- 缓冲到队列中等待线程执行
- 拒绝该任务
线程管理部分是消费者,它们背统一维护在线程池中,根据任务请求进行线程的分配,当线程执行完任务后会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收
生命周期管理
线程池运行的状态并不是用户手动设置的,而是随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,线程池将这两个关键参数的维护放在了一起,如下所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0);
ctl
是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它同时包含两部分信息:线程池的运行状态(runState)和线程数量(workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰
用过用过变量去存储两个值,可以避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源
线程池中经常出现要同时判断线程池运行状态和线程数量的情况,线程池也提供了许多方法去供用户获得当前线程池的运行状态、线程个数,这里都是使用的位运算,相比于基本运算,速度会快很多
获取生命周期状态、获取线程池线程数量方法如下:
private static int runStateOf(int c) { return c & -CAPACITY; } // 计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线 程数⽣成ctl
ThreadPoolExecutor 的运行有 5 种状态
其生命周期转换如下所示
任务执行机制
任务调度
首先,所有任务的调度都是由 execute 方法完成的,这部分的工作是:
- 首先检测当前线程池的运行状态,如果不是 RUNNING,则直接拒绝,线程池要包装在 RUNNING 的状态下执行任务
- 如果 workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务
- 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到阻塞队列中去
- 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务
- 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认处理方式是直接抛异常
具体流程如下:
任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分。线程池中是以生产者消费者模型,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。两个附加操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空
- 当队列满时,存储元素的线程会等待队列可用
阻塞队列常用于生产者和消费者的唱K,阻塞队列就是生产者存放匀速的容器,而消费者也只从容器中拿元素
下图线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素
不同的队列可以实现不一样的任务存取策略,下面是阻塞队列的成员:
任务申请
任务的执行有两种情况:
- 任务直接由新创建的线程执行
- 线程从任务队列中获取任务然后执行,执行完任务的空闲线程再次去队列中申请任务再去执行
第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务的巨大多数情况
线程需要从任务缓存模块中不断的取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask 方法实现,执行流程如下:
getTask 这部分做了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回 null 值。工作线程 Worker 会布顿接受新任务去执行,而当工作线程 Worker 接收不到任务的时候,就会开始回收
任务拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池
拒绝策略是一个接口,其设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
用户可以通过实现这个接口定制自己的拒绝策略,也可以选择提供的四种拒绝策略
Worker线程管理
Worker线程
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。部分代码如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
Worker 这个工作线程,实现了 Runnable 接口,并持有一个线程 thread,一个初始化任务 firstTask。
- thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,可以用来执行任务
- firstTask 用来保存传入的第一个任务,这个任务可以有 也可以为 null
- 如果这个值非空,线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况
- 如果这个值是 null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建
Worker 执行任务的模型如下所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张 Hash 表去持有线程的引用,这样可以通过添加引用、移除引用操作来控制线程的生命周期,这时候重要的是如何判断线程是否在运行。
Worker 是通过继承 AQS,使用 AQS 来实现独占锁这个功能。没有使用可重入锁 ReentrantLock,而是使用 AQS,为的就是实现不可重入的特性去反应线程现在的执行状态
- lock 方法一旦获取了独占锁,表示当前线程正在执行任务中
- 如果正在执行任务,则不应该中断线程
- 如果该线程现在不是独占锁状态,也就是空闲状态,说明它没有在处理任务,这时可以对该线程进行中断
- 线程池在执行 shutdown 方法或者 tryTerminate 方法时会调用 interruptldleWorkers 方法来中断空闲的线程,interruptldleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收
回收过程如下: