🟢 ThreadPoolExecutor
2022年6月20日
- Java
🟢 ThreadPoolExecutor
1. 类注释
- 作用
- 降低线程创建、关闭的时间消耗,提高响应速度
- 大量线程会占用大量内存资源,提高线程利用率,降低资源消耗
- 提供了对相关状态的监控
- 可控制最大并发数,避免过度竞争
- 执行流程
按需创建
- 默认情况下,即使是核心线程也采用懒加载的方式,即只有第一个任务到达时才会进行创建 & 启动
- 但可以通过 重写
prestartCoreThread
&prestartAllCoreThreads
方法来按需启动
创建新线程
- 通过 ThreadFactory 来创建新线程
- 若没有特别指定,则会使用 defaultThreadFactory
- defaultThreadFactory 创建的线程都将在同一个
ThreadGroup
中,且 线程优先级均为NORM_PRIORITY
,且均非守护线程
- 若是自己提供 ThreadFactory,可以设置线程名称、线程组、优先级、是否为守护线程等
- 若是 ThreadFactory 创建新线程失败,返回一个 null 的新线程,线程池会继续进行,但可能不会执行任何任务
存活时间
- 若当前线程池的线程数 > corePoolSize,多余的线程将被终止,若是它们的空闲时间 > keepAliveTime
- 这样就能节省资源的消耗,当线程池不再被活跃使用,当线程池之后又变得活跃,新线程将会被创建
- keepAliveTime 这个参数可以在运行期间通过
setKeepAliveTime(long, TimeUnit)
动态调整 - 默认情况下,只有
当前线程池的线程数 > corePoolSize
这一情况下,才会执行这一策略 - 但可以通过
allowCoreThreadTimeOut(boolean)
这一方法让其同样适用于 核心线程
工作队列
- 队列可以是 任何
BlockingQueue
- 有三种常见策略:
- 不存元素队列
- 一个默认选择就是
SynchronousQueue
,这个队列不存储任务,当有任务时,会交给线程执行,因此只要有任务,就会创建线程执行这个任务 - 该策略下 maximumPoolSizes 通常无界,以保证新任务提交时,在任务处理速度小于任务处理速度时,不会被拒绝
- 一个默认选择就是
- 无界队列
- 可以选择
LinkedBlockingQueue
,这会导致当 核心线程满时,提交的新任务会被加入到工作队列中,永远不会继续创建新线程 - 适用于每个任务都是互相独立,任何任务的结果都不会影响其他任务的执行,例如网页服务端
- 可以选择
- 有界队列
- 可以选择
ArrayBlockingQueue
,借助 maximumPoolSizes 可以防止资源耗尽,但是这更难调整和控制 - 使用
大队列 + 小线程池
可以降低 CPU 的使用,OS 内核资源,以及线程切换负担,但可能导致吞吐量降低 - 使用
小队列 + 大线程池
将使 CPU 更加繁忙,但可能遇到更多的调度开销,从而也导致吞吐量降低
- 可以选择
- 不存元素队列
- 队列可以是 任何
拒绝策略
- 什么情况下发生:
- 有界工作队列已满 & 达到最大线程数
- 线程池被关闭,shutdown() | shutdownNow()
- 4 种已经被定义好的拒绝策略:
- ThreadPoolExecutor 类下的
AbortPolicy
静态内部类- 拒绝新任务 & 抛出 RejectedExecutionException 异常
- 默认拒绝策略
- ThreadPoolExecutor.
CallerRunsPolicy
- 哪个线程执行的提交任务,该线程负责执行这个任务
- 这种策略是一种简单反馈控制,可以降低新任务的提交速度
- ThreadPoolExecutor.
DiscardPolicy
- 若执行器没有被关闭,那么这个无法被提交的新任务将被直接丢弃
- ThreadPoolExecutor.
DiscardOldestPolicy
- 若执行器没有被关闭,那么工作队列的队头任务会被丢弃,然后重新尝试提交新任务,但也许还是提交失败,导致这个过程一直被重复
- ThreadPoolExecutor 类下的
- 其他拒绝策略
- 可以定义 & 使用其他类型的 RejectedExecutionHandler,但需要谨慎
- 什么情况下发生:
钩子方法
- ThreadPoolExecutor 提供了 protected 修饰的、可以被重写的方法
- 例如,
beforeExecute(Thread, Runnable)
&afterExecute(Runnable, Throwable)
可以在每个任务被执行前和后被调用- 可以用来设置一些执行环境,例如,初始化 ThreadLocals、整合统计结果、或添加日志等
- 除此之外,
terminated()
也可以被重写,来满足某些一旦执行器完全终止就需要做的特殊工作 - 若是钩子方法抛出了异常,内部工作线程可能会依次失败并突然终止
队列的维护
- 通过 getQueue() 方法可以对工作队列进行监控和调试,但不推荐使用该方法进行其他目的
- 提供的
remove(Runnable)
&purge()
在大量任务被取消时,可以协助内存的回收
终止
- 若是线程池不再被任何项目引用,且没有任何遗留的线程,那么线程池将被自动 shutdown
- 若在用户忘记使用 shutdown 函数的情况下,想要确保线程池被回收,则必须保证没有被使用的线程均已经死亡,可以通过
- 设置合适的线程存活时间
- 核心线程数 < 0 | 设置 allowCoreThreadTimeOut(boolean)
2. 类图
public class ThreadPoolExecutor extends AbstractExecutorService {
// ......
}
3. 常用属性
/**
* ctl 是原子类整数,它有两个作用:
* workerCount : 记录有效线程数
* runState : 记录线程池状态,是运行中还是被关闭等
*
* 为了整合到一个 int 中,
* workerCount 被限制最大为 (2^29)-1 (about 5 亿) 个线程, 而非 (2^31)-1 (20 亿)
* 若是这个数量值在未来还不够,成为一个问题,那么 AtomicInteger 将被替换为 AtomicLong,但现在还不需要
*
* workerCount 表示的是已经开始但是还没有结束的 workers 数量
* 这个值可能和实际存活的线程数有短时间的不同
* 例如,当 ThreadFactory 创建一个新线程失败,或者正在终止的线程在正式终止之前仍被记录在内
*
* runState 记录线程池生命周期状态,主要有以下几种情况:
*
* RUNNING: 接受新任务 & 处理队列中的任务
* SHUTDOWN: 不接受新任务 & 处理队列中的任务
* STOP: 不接受新任务 & 不处理队列中的任务 & 中断正在执行的任务
* TIDYING: 所有任务都已经终止,工作队列没有待处理的任务,将执行 terminated() 方法
* TERMINATED: terminated() 方法已经完成
*
* 线程池状态的转换关系:状态数值单调递增,但不必每个状态都会经历
*
* RUNNING -> SHUTDOWN shutdown() 被调用
* (RUNNING or SHUTDOWN) -> STOP shutdownNow() 被调用
* SHUTDOWN -> TIDYING 队列 & 线程池均空
* STOP -> TIDYING 线程池为空
* TIDYING -> TERMINATED terminated() 钩子方法已经执行完毕
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 初始化 ctl
private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 == 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大工作线程数
// 高位存放线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 从 ctl 中获取 runState | workerCount,将 runState & workerCount 封装成 ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* runState 状态值的比较,可以直接比较,无需进行 ctl 中 runState 的拆分工作,当然前提是 workerCount 绝不可能是 负数
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* CAS 增加 ctl 中 workerCount 的值
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* CAS 实现 workerCount --
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* CAS 实现 workerCount --
* 这种情况只发生在 processWorkerExit 线程的突然终止
* 以及与 getTask() 方法相关
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* 用来存放任务 & 传递任务给 工作线程
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 虽然可以使用某种并发集合,但事实证明通常最好使用锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 存储线程池中所有的工作线程,只在持有 mainLock 的情况下进行操作
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* 记录达到过的最大线程池数量,必须在持有 mainLock 的情况下进行操作
*/
private int largestPoolSize;
/**
* 已经完成的任务个数,只有当某个任务已经被终止时才进行更新,必须在持有 mainLock 的情况下进行操作
*/
private long completedTaskCount;
/*
* 以下是用户控制参数,所有变量均为 volatile 类型
* 保证运行过程中拿到的都是最新值,且不需要进行加锁操作,减少开销
* 这些变量没有其他程序需要对它们做一些内部的联合变动,因此也没有必要加锁
*/
/**
* 创建线程的工厂
* 所有线程均通过该工厂进行创建
* 必须提供创建线程失败的回调,这可能会影响到系统 | 用户限制线程数量的相关策略
* 尽管这不是一个错误,但是线程创建失败可能导致新任务被拒绝 | 任务被阻塞在队列中
*/
private volatile ThreadFactory threadFactory;
/**
* 饱和 | 关闭线程池 时会执行的拒绝策略
*/
private volatile RejectedExecutionHandler handler;
/**
* 线程存活时间,默认纳秒为单位
*/
private volatile long keepAliveTime;
/**
* 默认 == false,此时,即使核心线程为空闲状态,仍存活
* 若 == true,此时,核心线程也参与到 keepAliveTime 中,过期删除
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心线程数
*/
private volatile int corePoolSize;
/**
* 最大线程数,受限于线程池最大线程数 CAPACITY
*/
private volatile int maximumPoolSize;
/**
* 默认拒绝策略 - 拒绝新任务 & 抛出异常
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/**
* shutdown 和 shutdownNow 的调用者所需的权限
*/
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
/* The context to be used when executing the finalizer, or null. */
private final AccessControlContext acc;
4. 构造函数
/**
* 创建带 默认线程工厂 & 拒绝策略 的线程池
* 这种情况下,使用 Executors 类提供的常用线程池更方便一些
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
defaultHandler);
}
/**
* 创建 带默认拒绝策略的线程池
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
defaultHandler);
}
/**
* 创建带 默认线程工厂 的线程池
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
handler);
}
/**
* 创建一个全部参数均指定的线程池
*
* @param corePoolSize 核心线程数,即使线程空闲也会存在,除非使用 allowCoreThreadTimeOut
* @param maximumPoolSize t最大线程数
* @param keepAliveTime 线程数 > 核心线程数的前提下,多余的线程最多空闲这么长时间就会被回收
* @param unit 时间单位
* @param workQueue 在任务被执行之前暂存任务,只存放通过 execute 方法提交的 Runnable 类的任务
* @param threadFactory 线程工厂
* @param handler 拒绝策略
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
4. 内部类
/**
* Worker 类主要维护线程运行任务的中断控制状态,以及其他次要的记录。
* 此类扩展 AbstractQueuedSynchronizer 以简化获取和释放围绕每个任务执行的锁。
* 这可以防止旨在唤醒等待任务的工作线程而不是中断正在运行的任务的中断。
* 实现了不可重入互斥锁而不是使用 ReentrantLock,是因为不希望在调用 setCorePoolSize 等线程池控制方法时能够重新获取锁。
* 此外,为了在线程真正开始运行任务之前抑制中断,我们将锁定状态初始化为负值,并在启动时将其清除(在 runWorker 中)。
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
/**
* 该类永远不会被序列化,仍然提供是为了挂起一个 javac 的警告
*/
private static final long serialVersionUID = 6138294804551838833L;
// 当前 worker 正在运行的线程,当 线程工厂失败时 为 null
final Thread thread;
// 执行的初始任务,可能为 null
Runnable firstTask;
// 每个线程完成的任务数
volatile long completedTasks;
/**
* 从线程工厂创建一个线程,并带有初始任务
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // 抑制中断,直到 worker 工作时恢复
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 线程工厂创建一个线程,执行该 Runnable 类
}
// run() 方法
public void run() {
runWorker(this);
}
// 判断是否处于独占状态
protected boolean isHeldExclusively() {
return getState() != 0; // 0 - 未被锁住,1 表示在锁住状态
}
// AQS 自带的需要实现的方法 - 可以看出是只有 0 1 状态,为不可重入独占锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// AQS 自带需要实现的方法
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
/*
* 以下为设置控制状态的方法
*/
/**
* 修改当前任务的状态
* 将 runState 转换为给定的目标,或者如果至少已经是给定的目标,则不理会它。
*
* @param targetState 只能为 SHUTDOWN or STOP,
* 不可以为 TIDYING or TERMINATED,若要设置为该状态,使用下面的 tryTerminate() 方法
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break; // c >= 给定状态 || 成功设置为给定状态
}
}
/**
* 将线程池状态设置为 TERMINATED
* 若不满足 TERMINATED 的前置条件,直接返回;否则执行 terminated(),然后更改状态为 TERMINATED
* 只有在 (SHUTDOWN & 线程池为空 & 队列为空) || (STOP & 线程池为空) 时才有效
* 若有资格 进入 TERMINATED 状态,此时需要中断一个空闲的 worker 来保证 shutdown 信号的传播
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return; // 以上情况终止线程池失败
if (workerCountOf(c) != 0) { // 有资格进入 TERMINATED 状态
interruptIdleWorkers(ONLY_ONE); // 中断一个空闲的 worker
return;
}
// 可以直接 TERMINATED 线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
/*
* 以下方法为控制 woker 线程的中断
*/
/**
* 如果设置了安全管理,需要确保调用者拥有 关闭线程的权限
* 若未设置安全管理,需要确保调用者可被允许中断 worker 线程
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
/**
* 中断 wokers 集合中的所有线程,即使线程是运行状态
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
* 中断可能正在等待任务的线程,以便它们可以检查终止或配置更改。忽略 SecurityExceptions(在这种情况下,某些线程可能保持不间断)。
*
* @param onlyOne 若 == true,最多中断一个 worker,只有在 tryTerminate() 中被调用
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* 中断所有空闲线程的封装,为了避免忘记到底是 true 还是 false 来中断所有空闲线程
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
/*
* 其他共用类,大部分 ScheduledThreadPoolExecutor 也可能被用到
*/
/**
* 为给定命令调用被拒绝的执行处理程序。受包保护,供 ScheduledThreadPoolExecutor 使用。
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* 在调用关闭时执行运行状态转换后的任何进一步清理。
* 此处为无操作,但由 ScheduledThreadPoolExecutor 用于取消延迟的任务。
*/
void onShutdown() {
}
/**
* ScheduledThreadPoolExecutor 需要进行状态检查以在关机期间启用正在运行的任务。
*
* @param shutdownOK true if should return true if SHUTDOWN
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
/**
* 将任务队列排空到一个新列表中,通常使用 drainTo。
* 但是,如果队列是 DelayQueue 或任何其他类型的队列,其 poll 或 drainTo 可能无法删除某些元素,它会一一删除它们。
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
/*
* 用来创建 worker、运行 worker 以及 worker 的清理工作
*/
/**
* 添加 worker
*
* @param firstTask 创建的新线程首先需要执行的任务
*
* @param core 若 == true,添加到 核心线程,若 == false,添加到最大线程
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 获取当前线程池状态
// 特殊情况特判
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 获取当前工作线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; // 线程数超范
if (compareAndIncrementWorkerCount(c))
break retry; // 成功增加工作线程数,此时不需要再继续尝试,退出
c = ctl.get(); // 重新获取一次 ctl
if (runStateOf(c) != rs)
continue retry; // ctl 两次发生改变,说明有其他线程也在更改 ctl,且修改成功,需要再次重试
// else CAS failed due to workerCount change; retry inner loop
}
}
// ctl 修改成功,继续下步操作
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 向工作线程集合中添加当前 worker
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 执行 worker 添加失败的措施
}
return workerStarted;
}
/**
* worker 添加失败后的回滚操作
* - 从 workers 集合中删除当前 worker
* - ctl 的 workcCount --
* - 判断当前线程是否可以进入 终止状态
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* 清理死去的线程,并记录
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 向总完成任务数中增加当前 worker 已完成任务数
workers.remove(w); // 从 workers 集合中删除当前 worker
} finally {
mainLock.unlock();
}
tryTerminate(); // 判断线程池现在是否可以进入 终止 状态
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 若是删掉该 worker 之后,小于核心线程数,且核心线程数必须存在,此时添加没有任何任务的空 worker
addWorker(null, false);
}
}
/**
* 执行阻塞或定时等待任务,具体取决于当前配置设置,
* 若 Worker 由于以下任何原因必须退出,返回 null:
* 1. 工作人员超过 maximumPoolSize(由于调用 setMaximumPoolSize)。
* 2. 池停止。
* 3. 池关闭,队列为空。
* 4、这个worker超时等待一个任务,超时 Worker 在定时等待前后都会被终止
* (即allowCoreThreadTimeOut || workerCount > corePoolSize),
* 如果队列非空,这个worker不是池中的最后一个线程。
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 特殊情况下即使有任务也无需执行
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 核心线程也可被回收 | 当前有除核心线程以外的线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果设置超时时间,则在规定时间内从 queue 中取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; // 如果超时,则返回 null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* 循环运行 Worker 类。反复从队列中获取任务并执行它们,同时处理许多问题:
* 1. 我们可能从一个初始任务开始,在这种情况下我们不需要获取第一个任务。
* 否则,只要 pool 正在运行,我们就从 getTask 获取任务。
* 如果它返回 null ,则 Worker 会因池状态或配置参数的更改而退出。
* 其他退出是由外部代码中的异常抛出导致的,在这种情况下,completedAbruptly 成立,这通常会导致 processWorkerExit 替换该线程。
*
* 2. 在运行任何任务之前,获取锁以防止在任务执行过程中发生其他池中断,然后我们确保除非池停止,否则该线程没有设置其中断。
*
* 3. 每个任务运行之前都会调用 beforeExecute,
* 这可能会抛出异常,在这种情况下,我们会导致线程死掉(用 completedAbruptly true 中断循环)而不处理任务。
*
* 4. 假设 beforeExecute 正常完成,我们运行任务,收集它抛出的任何异常发送给 afterExecute。
* 我们分别处理 RuntimeException、Error(规范保证我们捕获)和任意 Throwables。
* 因为我们不能在 Runnable.run 中重新抛出 Throwables,所以我们在退出时将它们包装在 Errors 中(到线程的 UncaughtExceptionHandler)。
* 任何抛出的异常也会保守地导致线程死亡。
*
* 5、task.run完成后,我们调用afterExecute,也可能会抛出异常,也会导致线程死掉。
* 根据 JLS Sec 14.20,即使 task.run 抛出,此异常也会生效。
* 异常机制的最终效果是 afterExecute 和线程的 UncaughtExceptionHandler 具有我们可以提供的关于用户代码遇到的任何问题的准确信息。
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 运行期间允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 前置钩子
Throwable thrown = null;
try {
task.run(); // 运行 task
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown); // 后置钩子
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
5. ThreadPoolExecutor 常见方法
1) execute(Runnable command)
/**
* 提交任务,若提交失败,执行 RejectedExecutionHandler 策略
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 小于核心线程,添加任务到核心线程
if (addWorker(command, true))
return;
c = ctl.get(); // 添加失败,重新获取 ctl
}
if (isRunning(c) && workQueue.offer(command)) { // 添加到工作队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command); // 不处于运行状态,拒绝新任务,则删除当前任务,执行拒绝策略
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 添加到非核心线程
reject(command); // 添加失败,执行拒绝策略
}
2) shutdown()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 方法在 worker 类里面,判断是否有权限执行该操作
advanceRunState(SHUTDOWN); // worker 类里面,修改线程池状态,修改之后再提交任务由于线程池的状态,所以提交肯定失败
interruptIdleWorkers(); // 中断所有空闲线程,这里可以看出正在执行的任务并不会被中断
onShutdown(); // ScheduledThreadPoolExecutor 中被用到,其他情况为 空函数
} finally {
mainLock.unlock();
}
tryTerminate();
}
3) shutdownNow()
/**
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 操作权限判断
advanceRunState(STOP); // 修改线程池状态,此时再提交新任务肯定失败
interruptWorkers(); // 中断所有的线程,即使正在运行
tasks = drainQueue(); // 取出工作队列中所有任务,作为返回值
} finally {
mainLock.unlock();
}
tryTerminate(); // 判断是否进入线程池终止状态
return tasks;
}
4) prestartAllCoreThreads
/**
* 启动所有核心线程,使它们空闲等待工作。这会覆盖仅在执行新任务时启动核心线程的默认策略。
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
// 启动一个核心线程,使其空闲等待工作。这会覆盖仅在执行新任务时启动核心线程的默认策略。
// 如果所有核心线程都已启动,此方法将返回 false。
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize
&& addWorker(null, true);
}
6. 提供的拒绝策略
策略模式
- 可以看出这是一种策略模式
1) RejectedExecutionHandler
package java.util.concurrent;
/**
* 不能由ThreadPoolExecutor执行的任务的处理程序
*/
public interface RejectedExecutionHandler {
/**
* 当执行不能接受任务时,ThreadPoolExecutor可以调用该方法。
* 这种情况可能发生在没有更多线程或队列槽可用的情况下,因为它们的边界将被超出,或者执行程序关闭时。
*
* <p>在没有其他替代方法的情况下,该方法可能抛出未检查的RejectedExecutionException,该异常将传播给execute的调用者。
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
2) 4 种具体实现类
/**
* 要加入线程池的任务是一个 Runnable 类,不加入线程池中,该 Runnable 类自己执行 run() 方法
* 就说明用它所在的线程执行任务
* 若不在 running 状态,那么 不论是 shutdown 还是 stop 等状态都无需执行该任务
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* @param r 要执行的任务
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* 抛出异常 RejectedExecutionException
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* 直接丢弃被拒绝的任务,什么也不用做
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 空函数
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 取出工作队列队头任务,也就是最早加进去但还没有被执行的任务
// 执行被拒绝的任务,可以看出,如果 execute() 方法还是失败,那么只要策略不变,还是会重新进入该方法
// 然后再从队列中删除一个再最早的任务,一直重复直到成功
e.execute(r);
}
}
}
7. AbstractExecutorService
1) 类描述
ThreadPoolExecutor
继承了该类,因此多了submit()
等方法- 该类为 抽象类
- 实现
ExecutorService
接口
2) 类图
public abstract class AbstractExecutorService implements ExecutorService {
// ...
}
public interface ExecutorService extends Executor {
// ...
}
public interface Executor {
/**
* 执行 command
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
3) 常用方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// RunnableFuture 为继承了 Runnable & Future 类只有一个 run() 的接口
// 而 FutureTask 类实现了 RunnableFuture 接口
// 将 Runnable task 转换为 FutureTask 对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 执行 继承的 Executor 类的 execute(Runnable command)
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/**
* 根据给定 runnable & value,将其转换为 FutureTask 对象
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// FutureTask 中一个重要属性就是 Callable<V> callable,因此
// runnable 会通过 Executors.callable(runnable, result) 实现 Callable 类的方法
// 这里有个 适配器 的操作
return new FutureTask<T>(runnable, value);
}
/**
* 根据给定 callable,将其转换为 FutureTask 对象
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
8. 几种常见线程池
public class Executors {
// 创建指定 线程数 的线程池,工作队列无界
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 创建指定 线程数 & 线程工厂 的线程池,工作队列无界
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
// 创建 只有一个线程 的线程池,工作队列无界
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 创建 只有一个线程 & 指定线程工厂 的线程池,工作队列无界
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
// 创建 最大线程数无界 的线程池,工作队列不能存储任务,任务放入后必须被拿走才能继续放入
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
// 创建 ScheduledThreadPoolExecutor,针对 ScheduledThreadPoolExecutor 还有其他几种创建,这里没有添加
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 创建一个线程池,可以安排命令在给定延迟后运行,或定期执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
// ...
}
9. 如何使用
1) 自定义 ThreadFactory
- ThreadFactory 源码
public interface ThreadFactory {
/**
* 生产一个新的线程
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
- Executors 类的 默认静态工厂,可通过
Executors.defaultThreadFactory()
快速得到一个线程工厂
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group,
r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); // 非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY); // 中等优先级
return t;
}
}
- 自定义 ThreadFactory
public class YhThreadFactory implements ThreadFactory {
private final String name; // 线程名称
private final AtomicInteger threadNumber = new AtomicInteger(1); // 线程 id
/**
* 创建一个可指定名字的线程生产工厂
*/
public YhThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(name + " - " + threadNum.incrementAndGet()); // 修改名称为指定名称
return t;
}
}
2) 应用
// 自定义 Callable 类
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
//返回执行当前 Callable 的线程名字
return Thread.currentThread().getName();
}
}
// 测试类
public class MyTest {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 30L;
public static void main(String[] args) {
// 创建 ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new YhThreadFactory("liuxianzhishou"),
new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<String>> futureList = new ArrayList<>(); // 存放线程池所有线程 call() 后的返回结果
Callable<String> callable = new MyCallable();
for (int i = 0; i < 10; i++) {
// 提交任务,可以使用 submit | execute
Future<String> future = executor.submit(callable);
futureList.add(future); // 将返回结果 future 加入集合
}
// 打印 集合结果
for (Future<String> fut : futureList) {
try {
System.out.println(new Date() + " : " + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//关闭线程池
executor.shutdown();
}
}
10. ScheduledThreadPoolExecutor
1) 原理
- 调用
schedule
|scheduleAtFixedRate
|scheduleWithFixedDelay
,把任务添加到ScheduledThreadPoolExecutor
去执行 ScheduledThreadPoolExecutor
接收到任务之后,根据请求参数的延迟时间计算出真正需要执行任务的时间,然后把任务封装成RunnableScheduledFuture
- 将
RunnableScheduledFuture
添加到DelayQueue
中,- 把时间越小的任务放在队列头,
- 如果时间一样,则先提交的先执行
- 线程池从
DelayQueue
中获取任务执行,- 如果是一次性的任务,执行之后删除队列中的任务,
- 如果是重复执行的,则再次计算时间,然后把任务添加到
DelayQueue
中
2) 源码
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
// --------------------- 构造函数 -------------------------
// ---------------- 最大线程数 无界 ----------
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}
// ------------------ implements ScheduledExecutorService 的方法 ----------------------
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
delayedExecute(t); // 向工作队列中添加 任务
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
// 固定周期执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
// 在上一次执行完成之后,隔多长时间执行
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
// ----------------- extends ThreadPoolExecutor 的方法 ----------------
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
// --------------- 内部类 ---------------------
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// 实现了 堆 的上浮 & 下沉
}
}