上一篇文章介绍了 Thread 类,可知线程随着任务的执行结束而被销毁。但是,由于线程的创建与销毁操作涉及到系统调用,开销较大,因此需要将线程的生命周期与任务进行解耦。使用线程池来管理线程,可以有效地重复利用线程来执行任务。本文将介绍线程池最基础的实现类 ThreadPoolExecutor。
本文基于 jdk1.8.0_91
类型 名称 描述
接口 Executor 最上层的接口,提供了任务提交的基础方法
接口 ExecutorService 继承了 Executor 接口,扩展了提交任务、获取异步任务执行结果、线程池销毁等方法
接口 ScheduledExecutorService 继承了 ExecutorService 接口,增加了延迟执行任务、定时执行任务的方法
抽象类 AbstractExecutorService 提供了 ExecutorService 接口的默认实现,提供 newTaskFor 方法将任务转换为 RunnableFuture,以便提交给 Executor 执行
实现类 ThreadPoolExecutor 基础、标准的线程池实现
实现类 ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService 中相关延迟任务、定时任务的方法
实现类 ForkJoinPool JDK7 加入的线程池,是 Fork/Join 框架的核心实现,只允许执行 ForkJoinTask 任务
普通类 Executors 创建各种线程池的工具类
线程池可以解决两个问题:
JDK 中建议使用较为方便的 Executors 工厂方法,它们均为大多数使用场景预定义了设置:
阿里 Java 开发手册 对线程池的使用进行了限制,可作参考:
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
Executors 内部都是调用 ThreadPoolExecutor 的构造方法来实现的:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize 核心线程数量
* @param maximumPoolSize 总的线程数量
* @param keepAliveTime 空闲线程的存活时间
* @param unit keepAliveTime的单位
* @param workQueue 任务队列, 保存已经提交但尚未被执行的线程
* @param threadFactory 线程工厂(用于指定如何创建一个线程)
* @param handler 拒绝策略 (当任务太多导致工作队列满时的处理策略)
*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
Java 官方对 ThreadPoolExecutor 的使用说明:
ThreadPoolExecutor 将根据 corePoolSize 和 maximumPoolSize 自动调整线程池中的线程数量,以及判断任务是否进行排队。
当提交新任务时:
如果运行的线程多于 corePoolSize 而少于 maximumPoolSize:
如果线程数大于 maximumPoolSize,新提交的任务将会根据拒绝策略来处理。
在大多数情况下,corePoolSize 和 maximumPoolSize 是通过构造函数来设置的,不过也可以使用 ThreadPoolExecutor 的 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。注意,核心线程和非核心线程只是一种逻辑上的区分,线程池中只有一种类型的线程,称为工作线程(Worker)。
默认情况下,核心线程只是在新任务到达时才创建和启动的。
可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 对其进行动态重写。
一般在构造带有非空队列的线程池时,会希望先启动线程。
使用 ThreadFactory 来创建新线程。
在 Executors 中默认使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的分组和优先级,且都是非守护线程。
如果 ThreadFactory 创建新线程失败,此时线程池可以继续运行,但不能执行任何任务。
默认情况下,保持活动策略只应用在非核心线程上,即当线程的空闲时间超过 keepAliveTime 时将会终止。
若使用 allowCoreThreadTimeOut(boolean) 方法则会把保活策略也应用于核心线程。
当线程池处于非活跃状态时,可以减少资源消耗。如果线程重新变得活跃,则会创建新的线程。
可以使用方法 setKeepAliveTime(time, timeUnit) 动态地更改此参数,若入参使用 Long.MAX_VALUE 和 TimeUnit.NANOSECONDS 则不会终止空闲线程,除非线程池关闭。
所有 BlockingQueue 都可用于传输和保持提交的任务。当线程池中的线程数量大于 corePoolSize 而少于 maximumPoolSize 时,新任务会加入同步队列。
排队有三种通用策略:
直接提交(Direct handoffs)
无界队列(Unbounded queues)
有界队列(Bounded queues)
当线程池已关闭,或者线程池中的线程数量和队列容量已饱和时,继续提交新任务会被拒绝,会触发 RejectedExecutionHandler#rejectedExecution 方法。
ThreadPoolExecutor 定义了四种拒绝策略:
可以自定义 RejectedExecutionHandler 类来实现拒绝策略。需要注意的是,拒绝策略的运行需要指定线程池和队列的容量。
ThreadPoolExecutor 中提供 beforeExecute 和 afterExecute 方法,在执行每个任务之前和之后调用。提供 terminated 方法,在线程池关闭之前做一些收尾工作。
如果钩子方法抛出异常,则内部工作线程将依次失败并终止。
方法 getQueue() 允许出于监控和调试目的而访问工作队列。强烈反对出于其他任何目的而使用此方法。
remove() 和 purge() 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。
当线程池的引用变为不可达,并且线程池中没有遗留的线程(通过设置 allowCoreThreadTimeOut 把非活动的核心线程销毁),此时线程池会自动 shutdown。
ThreadPoolExecutor 中使用一个 AtomicInteger 类型的变量 ctl 来管理线程池。
其中,低 29 位保存线程数,高 3 位保存线程池状态。
线程池中最大的线程数为 2^29-1。
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads // 工作线程数量
* runState, indicating whether running, shutting down etc // 线程池运行状态
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29
// 最大线程数: 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 0001 1111 1111 1111 1111 1111 1111 1111
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 高三位 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位 000
private static final int STOP = 1 << COUNT_BITS; // 高三位 001
private static final int TIDYING = 2 << COUNT_BITS; // 高三位 010
private static final int TERMINATED = 3 << COUNT_BITS; // 高三位 011
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 运行的工作线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 封装运行状态和任务线程
ThreadPoolExecutor 一共定义了 5 种线程池状态:
线程池状态的转移:
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 操作 workers 集合使用到的锁
private final ReentrantLock mainLock = new ReentrantLock();
在线程池中具有一个 Worker 集合,一个 Worker 对应一个工作线程。当线程池启动时,对应的 Worker 会执行池中的任务,执行完毕后从阻塞队列里获取一个新的任务继续执行。
持有 mainLock 时才能操作 Worker 集合,Java 官方对使用 mainLock 而不是并发集合的说明:
Worker 是 ThreadPoolExecutor 的内部类,其继承体系如下:
Worker 使用 AQS 中的 state 属性表示是否持有锁:
Worker 开始工作时,会先执行 unlock() 方法设置 state 为 0,后续再使用 CAS 对 state 来加锁。具体见 ThreadPoolExecutor#runWorker。
注意,线程池中的工作线程在逻辑上分为核心线程和非核心线程,但是在 Worker 类中并没有相关属性标记当前线程是否是核心线程!
而是在运行期间动态指定的:
这样设计的目的,只是为了动态维持线程池中的核心线程数量不超过 corePoolSize,是一种松散的控制。
java.util.concurrent.ThreadPoolExecutor.Worker
private final class Worker // 不可重入的互斥锁
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread; // 工作线程
/** Initial task to run. Possibly null. */
Runnable firstTask; // 初始运行任务
/** Per-thread task counter */
volatile long completedTasks; // 任务完成计数
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
ThreadPoolExecutor 具有属性 threadFactory 表示线程工厂。
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread.start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/
private volatile ThreadFactory threadFactory;
Worker 中包含属性 thread 表示工作线程,在 Worker 构造函数中通过线程工厂来创建线程(即 Thread#new,注意不能启动线程!)。
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
在 Executors 工具类中定义了内部类 DefaultThreadFactory 作为默认的线程工厂,用于统一设置线程信息:
java.util.concurrent.Executors#defaultThreadFactory
/**
* Returns a default thread factory used to create new threads.
* This factory creates all new threads used by an Executor in the
* same {@link ThreadGroup}. If there is a {@link
* java.lang.SecurityManager}, it uses the group of {@link
* System#getSecurityManager}, else the group of the thread
* invoking this {@code defaultThreadFactory} method. Each new
* thread is created as a non-daemon thread with priority set to
* the smaller of {@code Thread.NORM_PRIORITY} and the maximum
* priority permitted in the thread group. New threads have names
* accessible via {@link Thread#getName} of
* <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
* number of this factory, and <em>M</em> is the sequence number
* of the thread created by this factory.
* @return a thread factory
*/
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
java.util.concurrent.Executors.DefaultThreadFactory
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
线程池的顶层接口 Executor 中只有一个 execute 方法,用于把任务提交到线程池,ThreadPoolExecutor 对它进行了实现。
方法说明:
代码流程:
java.util.concurrent.ThreadPoolExecutor#execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 添加新的核心线程,并把当前任务交给它执行
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 线程池未关闭,且非阻塞入队成功,则进入下一步
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 线程池已关闭,不接收新任务,需要出队并拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0) // 入队成功但工作线程为空,则添加非核心线程且不指定任务
addWorker(null, false); // 没有任务的工作线程会从同步队列中拉取任务去执行
}
else if (!addWorker(command, false)) // 队列已满,且创建非核心线程失败,则拒绝任务
reject(command);
}
在任务提交(execute方法)、更新核心线程数(setCorePoolSize方法)、预启动线程(prestartCoreThread方法)中都会调用 addWorker 方法添加新的工作线程。
addWorker 入参指定该工作线程需要执行的任务,以及该工作线程是否核心线程。
代码主要流程:
检查线程池状态(注:SHUTDOWN 不接收新的任务,但是会处理队列里的任务):
检查线程数量限制(注:工作线程在逻辑上分为核心线程、非核心线程):
java.util.concurrent.ThreadPoolExecutor#addWorker
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary. // 检查线程池状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 检查线程数量限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // workerCount 自增,结束自旋
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 线程池状态发生变化,重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建工作线程,指定初始任务(Executors.DefaultThreadFactory 中会执行 Thread#new,但是不会调用 Thread#start)
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁用于操作 workers 集合
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException(); // 如果线程工厂已经提前启动线程了(Thread#start),则报错
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新最大池容量
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程,由 JVM 在操作系统层面创建线程并执行 Thread#run
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 线程添加失败,回滚操作
}
return workerStarted;
}
注意,创建新线程的过程中,需要区分 new Thread()
和 new Thread().start
的不同:
另外,在 ThreadPoolExecutor#addWorker 方法中执行 Thread t = w.thread; t.start()
会触发执行 ThreadPoolExecutor#runWorker,该过程简化如下:
private final class Worker implements Runnable {
final Thread thread; // 工作线程
public Worker() {
thread = new Thread(this);
System.out.println("addWorker!");
}
@Override
public void run() {
System.out.println("runWorker!");
}
}
/**
* 测试在 addWorker 中触发 runWorker
*/
@Test
public void test() throws InterruptedException {
Worker worker = new Worker();
worker.thread.start();
worker.thread.join();
}
在 ThreadPoolExecutor#addWorker 中添加工作线程之后,会启动工作线程(Thread#start),触发工作线程执行任务(Thread#run)。
runWorker 代码流程:
对于 Worker#lock,官方的说明:
Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, and then we ensure that unless pool is stopping, this thread does not have its interrupt set.
java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts // 初始化 state 为 0
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // firstTask 不为空,或者从队列拉取到任务不为空
w.lock(); // 加锁,确保除非线程池关闭,否则没有其他线程能够中断当前任务
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) && // 如果线程池状态 >= STOP,则中断当前线程,不需要执行新任务
!wt.isInterrupted()) // 这里可能会两次执行 isInterrupted,是为了避免 shutdownNow 过程中清除了线程中断状态
wt.interrupt();
try {
beforeExecute(wt, task); // 前置工作,预留
Throwable thrown = null;
try {
task.run(); // 执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 后置工作,预留
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false; // 走到这里说明线程没有任务可执行
} finally {
processWorkerExit(w, completedAbruptly); // 工作线程退出
}
}
在 ThreadPoolExecutor#runWorker 中,工作线程执行任务之前,如果 firstTask 为空,则调用 getTask() 从队列中获取任务。
工作线程从队列中拉取任务之前,需要进行校验,如果出现以下任意一种情况会直接退出:
java.util.concurrent.ThreadPoolExecutor#getTask
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 校验线程池状态:1.线程池状态为 SHUTDOWN 且队列为空;2.线程池状态 >= STOP
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 当前线程是否允许超时,true 表示具有超时时间(keepAliveTime)
if ((wc > maximumPoolSize || (timed && timedOut)) // 校验工作线程状态:1.工作线程数超过 maximumPoolSize;2.当前工作线程已超时;3.队列为空
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 阻塞直到拉取成功或超时
workQueue.take(); // 阻塞直到拉取成功
if (r != null)
return r;
timedOut = true; // 表示直到超时,都没有获取任务
} catch (InterruptedException retry) { // 拉取时被中断唤醒,继续自旋
timedOut = false;
}
}
}
工作线程从线程池中拉取任务,具有两种方式:
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
:阻塞直到拉取任务成功或超时。workQueue.take()
:阻塞直到拉取任务成功。代码中通过 allowCoreThreadTimeOut || wc > corePoolSize
这个表达式来控制使用哪种拉取任务方式。
该表达式为 true 时,如果线程在 keepAliveTime 时间内没有拉取到任务,则会被销毁,表现为“非核心线程”。
但是,由于工作线程数量 wc 是会实时发生变化的,因此同一个线程在运行期间可能会先后使用不同的方式拉取任务。
也就是说,工作线程在运行期间可能会在 “核心线程” 和 “非核心线程” 两种形态之间切换。
而实际上 ThreadPoolExecutor 区分 “核心线程” 和 “非核心线程” 只是为了利用 corePoolSize 来控制活跃线程数量以及任务是否进入队列中排队等待,并不关心 Worker 到底是不是“核心线程”。
在 runWorker() 中,如果通过 getTask() 识别到空闲线程(timedOut = true),或者工作线程在执行任务过程中出现异常,会调用 processWorkerExit() 退出工作线程。
代码流程:
注意,当前线程在执行完 processWorkerExit 方法之后会自动结束运行,Thread#isAlive 返回 false。
因此在当前线程终止之前,如果满足以下条件之一,则会创建新的非核心线程来替换当前线程:
Java 官方的说明:
replaces the worker if either it exited due to user task exception or if fewer than corePoolSize workers are running or queue is non-empty but there are no workers.
java.util.concurrent.ThreadPoolExecutor#processWorkerExit
/**
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 当前线程执行任务时出现异常,需要扣减 workerCount
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 统计所有线程完成的任务数
workers.remove(w); // 移除当前线程的 worker
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试终止线程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // RUNNING、SHUTDOWN,即线程池尚未停止
if (!completedAbruptly) {
// 没有出现异常,说明当前线程是非活跃线程:
// 1. allowCoreThreadTimeOut 为 false,则 min 为 corePoolSize。若 workerCountOf(c) >= min 说明当前终止的是非核心线程,无需补充新线程
// 2. allowCoreThreadTimeOut 为 true,且队列为空,则 min 为 0。 若 workerCountOf(c) >= min 说明当前没有任务需要处理,无需补充新线程
// 3. allowCoreThreadTimeOut 为 true,且队列非空,则 min 为 1。 若 workerCountOf(c) >= min 说明具有活跃的线程处理任务,无需补充新线程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false); // 创建新的线程替换当前线程
}
}
tryTerminate 用于尝试终止线程池,在 shutdow()、shutdownNow()、remove() 中均是通过此方法来终止线程池。
此方法必须在任何可能导致线程终止的行为之后被调用,例如减少工作线程数,移除队列中的任务,或者是在工作线程运行完毕后处理工作线程退出逻辑的方法(processWorkerExit)。
代码流程:
java.util.concurrent.ThreadPoolExecutor#tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 校验线程池状态,只有状态为 STOP,或者(状态为 SHUTDOWN 且队列为空)的情况下,才可以往下执行,否则直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) || // TIDYING、TERMINATED
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); // 仅中断一个工作线程,由它来传递线程池关闭消息
return;
}
final ReentrantLock mainLock = this.mainLock; // 来到这里,说明线程池中没有工作线程了
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // (STOP or SHUTDOWN) -> TIDYING
try {
terminated(); // 钩子方法
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // TIDYING -> TERMINATED
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
理解了 tryTerminate() 如何尝试关闭线程池后,再来看一下发起线程池关闭的方法:shutdown()、shutdownNow()。
关闭线程池,不接收新的任务,但是会处理队列里的任务。
java.util.concurrent.ThreadPoolExecutor#shutdown
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查关闭权限
advanceRunState(SHUTDOWN); // 修改线程池状态
interruptIdleWorkers(); // 依次中断所有空闲线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试关闭线程池
}
理解 shutdown() 如何做到在关闭线程池之前,不接受新任务,并且继续处理已有任务,关键在于两个操作:
设置线程池状态为 SHUTDOWN 之后:
java.util.concurrent.ThreadPoolExecutor#interruptIdleWorkers()
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) { // 能够获取锁,说明当前线程没有在执行任务,是“空闲”的
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers() 在中断线程之前,使用 tryLock() 尝试一次性获取锁,再中断任务。
关闭线程池,不接收新任务,也不会处理队列里的任务,并且中断正在运行的任务。
java.util.concurrent.ThreadPoolExecutor#shutdownNow
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); // 中断所有线程
tasks = drainQueue(); // 移除等待队列中的所有任务
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
理解 shutdownNow() 如何做到在关闭线程池时,不接受新任务,也不会处理队列里的任务,并且中断正在运行的任务。关键在于三个操作:
设置线程池状态为 STOP 之后:
java.util.concurrent.ThreadPoolExecutor#interruptWorkers
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
java.util.concurrent.ThreadPoolExecutor.Worker#interruptIfStarted
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 注意这里没有获取锁!
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
与 interruptIdleWorkers() 相比,interruptWorkers() 在中断线程之前,只需校验 getState() >= 0,无需获取锁即可强行中断运行中的线程。
java.util.concurrent.ThreadPoolExecutor#drainQueue
/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList); // 批量将队列中的任务转移到 taskList
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
将队列 workQueue 中未处理的任务全部拉取到 taskList 中,不再处理任务。
合理地配置线程池:
《Java 并发编程实战》提出了一个线程数计算公式。
定义:
$$ N_{cpu} = CPU 核心数 $$
$$ U_{cpu} = 目标 CPU 利用率, 0 \leqslant U_{cpu} \leqslant 1 $$
$$ \frac{ W }{ C } = 等待时间和计算时间的比例 $$
要使得处理器达到期望的使用率,线程数的最优大小等于:
$$ N_{threads} = N_{cpu} * U_{cpu} * ( 1 + \frac{ W }{ C } ) $$
可以通过 Runtime 来获取 CPU 核心数:
int N_CPU = Runtime.getRuntime().availableProcessors();
回顾一下ThreadPoolExecutor 的内部结构:
作者:Sumkor
链接:https://segmentfault.com/a/11...
还记得我在《2020 年 JavaScript 状态调研报告小结》中提到的 2020 年全球开发者...
macromedia dreamweaver 8序列号 激活码: wpd800-50438-28032-39991 wpd800-599...
Do not use these html elements in html pages. Presentational elements shoul...
1、使用css精灵。 好处是将css中使用的小图片可以合并为一张大图片减少了对服务...
dreamweaver软件: 点此下载 1、熟悉网页设计的网友就知道,调用STYLE的方法很多...
这是一款基于HTML5和JavaScript的进度条应用,这款进度条插件非常有特点,它在进...
windows 下默认的滚动条样式巨丑,项目中又有比较多地方会显示滚动条, 故回头翻...
Dreamweaver网页中的banner图片需要切换,我们可以添加按钮来切换图片,下面我们...
在开发中,如果遇到需要使用canvas同时绘制多张图片,但因为图片大小的不一样,...
随着网络时代的发展与进步,我们的学习工作和生活早已离不开互联网,智能家居、...