跳到主要内容

1 篇博文 含有标签「Work线程」

查看所有标签

概述

什么是线程池

线程池(Thread Pool)是一种基于池化思想用于管理线程的工具,常出现在多线程服务器中,比如 MySQL

线程过多会有额外的开销,例如创建销毁线程的开销、调度线程的开销等,同时也降低了机器的性能

线程池同时维护多个线程,等待分配可并发执行的任务;这种做饭,一方面避免了处理任务时创建销毁线程的开销,另一方面避免了线程数量膨胀导致的过分调度问题

线程池的优点:

  • 降低资源消耗: 通过池化技术重复利用已创建的线程,降低线程创建和销毁的开销
  • 提高响应速度: 有任务需要执行时,可以立刻开始执行
  • 提高线程的可管理性: 线程是稀缺资源,如果无限创建,不仅会消耗系统资源,还会因线程的不合理分布导致**资源调度失衡:**降低系统稳定性,使用线程池可以进行统一的分配、调优和监控
  • 提供更多功能: 线程池有可拓展性,可以添加许多其他功能,比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或者定期执行

线程池解决的问题是什么

线程池解决的核心问题就是资源管理问题,在并发环境下,系统不能确定在任意时刻,有多少任务需要执行,有多少资源需要投入,这种不确定性会带来下面的问题:

  1. 频繁申请/销毁资源和调度资源,带来额外的消耗,可能会非常巨大
  2. 对资源无线申请缺少抑制手段,容易引发系统资源耗尽
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性

为了解决资源分配这个问题,线程池采用了“池化”(Pooling)思想

池化:为了最大化收益和最小化风险,将资源统一在一起管理的一种思想

线程池的核心设计与实现

总体设计

继承关系

Java 中线程池核心实现类是 ThreadPoolExecutor,下面是它的继承关系

20240721115348

Executor

顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需要提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架来完成线程的调配和任务的执行部分

ExecutorService

ExecutorService 接口增加了一些能力:

  1. 扩充执行任务的能力,扩充可以为一个或一批异步任务生成 Future 的方法
  2. 提供了管控线程池的能力,比如停止线程池的运行

AbstractExecutorService

AbstractExecutorService 是上层的抽象类,将执行任务的流程串联了起来,包装下层的实现只需关注一个执行任务的方法

ThreadPoolExecutor

最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者更好的结合从而执行并行任务

运行机制

20240721115421

线程池在内部构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。

线程池的运行主要分为两部分:任务管理、线程管理

任务管理部分充当生产者角色,当任务提交后,线程池会判断该任务后续的流转:

  1. 直接申请线程执行该任务
  2. 缓冲到队列中等待线程执行
  3. 拒绝该任务

线程管理部分是消费者,它们背统一维护在线程池中,根据任务请求进行线程的分配,当线程执行完任务后会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收

生命周期管理

线程池运行的状态并不是用户手动设置的,而是随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,线程池将这两个关键参数的维护放在了一起,如下所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0);

ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它同时包含两部分信息:线程池的运行状态(runState)和线程数量(workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰

用过用过变量去存储两个值,可以避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源

线程池中经常出现要同时判断线程池运行状态和线程数量的情况,线程池也提供了许多方法去供用户获得当前线程池的运行状态、线程个数,这里都是使用的位运算,相比于基本运算,速度会快很多

获取生命周期状态、获取线程池线程数量方法如下:

private static int runStateOf(int c) { return c & -CAPACITY; } // 计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数⽣成ctl

ThreadPoolExecutor 的运行有 5 种状态

20240721115545

其生命周期转换如下所示

20240721115552

任务执行机制

任务调度

首先,所有任务的调度都是由 execute 方法完成的,这部分的工作是:

  • 首先检测当前线程池的运行状态,如果不是 RUNNING,则直接拒绝,线程池要包装在 RUNNING 的状态下执行任务
  • 如果 workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务
  • 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到阻塞队列中去
  • 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务
  • 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认处理方式是直接抛异常

具体流程如下:

20240721115605

任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。线程池中是以生产者消费者模型,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。两个附加操作是:

  1. 在队列为空时,获取元素的线程会等待队列变为非空
  2. 当队列满时,存储元素的线程会等待队列可用

阻塞队列常用于生产者和消费者的唱K,阻塞队列就是生产者存放匀速的容器,而消费者也只从容器中拿元素

下图线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素

20240721115618

不同的队列可以实现不一样的任务存取策略,下面是阻塞队列的成员:

20240721115628

任务申请

任务的执行有两种情况:

  1. 任务直接由新创建的线程执行
  2. 线程从任务队列中获取任务然后执行,执行完任务的空闲线程再次去队列中申请任务再去执行

第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务的巨大多数情况

线程需要从任务缓存模块中不断的取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask 方法实现,执行流程如下:

20240721115642

getTask 这部分做了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回 null 值。工作线程 Worker 会布顿接受新任务去执行,而当工作线程 Worker 接收不到任务的时候,就会开始回收

任务拒绝

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数达到 maximumPoolSize 时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池

拒绝策略是一个接口,其设计如下:

public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

用户可以通过实现这个接口定制自己的拒绝策略,也可以选择提供的四种拒绝策略

20240721115704

Worker线程管理

Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。部分代码如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}

Worker 这个工作线程,实现了 Runnable 接口,并持有一个线程 thread,一个初始化任务 firstTask。

  • thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,可以用来执行任务
  • firstTask 用来保存传入的第一个任务,这个任务可以有也可以为 null
    • 如果这个值非空,线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况
    • 如果这个值是 null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建

Worker 执行任务的模型如下所示:

20240721115722

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张 Hash 表去持有线程的引用,这样可以通过添加引用、移除引用操作来控制线程的生命周期,这时候重要的是如何判断线程是否在运行。

Worker 是通过继承 AQS,使用 AQS 来实现独占锁这个功能。没有使用可重入锁 ReentrantLock,而是使用 AQS,为的就是实现不可重入的特性去反应线程现在的执行状态

  1. lock 方法一旦获取了独占锁,表示当前线程正在执行任务中
  2. 如果正在执行任务,则不应该中断线程
  3. 如果该线程现在不是独占锁状态,也就是空闲状态,说明它没有在处理任务,这时可以对该线程进行中断
  4. 线程池在执行 shutdown 方法或者 tryTerminate 方法时会调用 interruptldleWorkers 方法来中断空闲的线程,interruptldleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收

回收过程如下:

20240721115735

Worker线程增加

增加线程是通过线程池的 addWorker 方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否操作成功

addWorker 方法有两个参数:firstTask、core

  • firstTask 参数用于指定新增的线程执行的第一个任务,该参数可以为空
  • core 参数为 true 表示在新增线程时会判断当前活动线程是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程树是否少于 maximumPoolSize

执行流程如下:

20240721115750

Worker线程回收

线程池中线程的销毁依赖 JVM 自动回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker 被创建出来后,就会不断的进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无非获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用

try {
while (task != null || (task = getTask()) != null) {
// 执⾏任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收⾃⼰
}

线程回收工作是在 processWorkerExit 方法完成的

20240721115805

在这个方法中,将线程引用移除线程池就已经结束了线程销毁的部分。但是由于引用线程销毁的可能性有很多,线程池也要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程

Worker线程执行任务

在 Worker 类中的 run 方法调用了 runWorker 方法来执行任务,runWorker 方法的执行过程如下:

  1. while 循环不断的通过getTask()方法获取任务
  2. getTask()方法从阻塞队列中获取任务
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态
  4. 执行任务
  5. 如果 getTask 结果为 null 则跳出循环,执行processWorkerExit(),销毁线程

执行流程如下:

20240721115817

线程池在业务中的实践

业务背景

使用线程池的典型场景如下:

  1. 快速响应用户需求:用户发起请求,追求响应时间,需要将多维的信息或数据聚合起来
  2. 快速处理批量任务:离线的大量计算任务,需要快速执行

问题以及方案思考

线程池使用时的核心问题就是:线程池的参数不好配置

一方面线程池的机制不是很好理解,配置需要强依赖开发人员的个人经验;另一方面,线程池的执行情况喝任务类型相关性很大,IO 密集型和 CPU 密集型的任务运行起来差异非常大

例如:

  • 使用线程池做并行计算时,如果没有预估好调用的流量,导致最大核心线程数设置偏小,会导致大量任务被拒绝,大量抛出 RejectedExecutionException 异常
  • 使用线程池做任务隔离时,如果队列设置过长,最大线程数设置失效,导致请求数量增加时,大量任务堆积在队列中,任务执行时间过长

现在并没有一种合理的计算公式能够计算出某种场景中的线程池应该是什么参数,其他的技术方案要么有其他缺陷要么不够成熟

ThreadPool线程池任务执行Work线程阅读需 14 分钟