ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示
// 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//去掉前三位保存线程状态的位数,剩下的用于保存线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2^COUNT_BITS次方,表示可以保存的最大线程数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
获取线程池状态、线程数量以及合并两个值的操作
// Packing and unpacking ctl
// 传入 ctl 值 获取运行状态 该操作会让除高3位以外的数全部变为0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 传入 ctl 值 获取运行线程数 该操作会让高3位为0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 传入 rs 运行状态 wc 线程数量 计算ctl新值
private static int ctlOf(int rs, int wc) { return rs | wc; }
//阻塞队列,用于存放来不及被核心线程执行的任务
private final BlockingQueue<Runnable> workQueue;
// 全局锁,解决创建销毁线程等线程安全问题
private final ReentrantLock mainLock = new ReentrantLock();
// 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素
private final HashSet<Worker> workers = new HashSet<Worker>();
//线程工厂,给线程取名字
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器 处理拒绝策略
private volatile RejectedExecutionHandler handler;
// 救急线程(或者核心线程)空闲时的最大生存时间
private volatile long keepAliveTime;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
// 最大线程数 - 核心线程数 = 九级线程数
private volatile int maximumPoolSize;
// 默认拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
拒绝策略 jdk 提供了 4 种实现
AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy:让调用者运行任务
DiscardPolicy:放弃本次任务
DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
明白了上述内容 我们就可以看看源码是如何实现的
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程数小于核心线程数 就 调用addWorker 创建新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池还在运行,并且可以加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 重新检查线程池是否还在运行 或者有的线程已经死了 必要时回滚队列并且采用拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程池仍在运行 采用救急线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果创建线程失败,则采用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
① 添加线程的addWorker( ) 方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
// 获取表示状态和线程数的原子整数
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// 如果线程池状态不是 RUNNING 或者 阻塞队列中有任务 则创建线程失败
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) {
// 获取线程个数
int wc = workerCountOf(c);
// 如果线程数大于容量 或者 大于核心线程数或者最大线程数(用哪个绑定取决于传入的core)则创建线程失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 在多线程情况下 如果CAS创建线程 修改 原子整数 失败 则回滚到retry 重新循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取 表示状态和线程个数的原子整数
c = ctl.get();
// 如果 运行状态和当初不同,则回滚重新循环
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建新线程处理任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 创建新线程需要获得全局锁
mainLock.lock();
try {
//加锁的同时再次检测 避免在释放锁之前调用了shut down
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 再次确认线程存活
if (t.isAlive())
throw new IllegalThreadStateException();
// 将该线程 加入到 HashSet集合中(线程池)
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 更新标志位
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 如果工作线程成功添加,开始线程开始工作 并更新标志位
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程启动失败
// 调用addWorkerFailed(w)方法: 删除该工作线程 工作线程数减一,并且尝试终止线程池
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
② 向线程池提交任务
使用
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// 通过submit执行Callable中的call方法
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "OKK";
}
});
try {
// 通过future 来获得返回值
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
③ 关闭线程池的方法
shutdown() 将线程池的状态设置成 SHUTDOWN 中断没有执行任务的线程,其他线程执行完任务,自己消亡
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// 通过安全管理器看是否有权关闭线程池
checkShutdownAccess();
// 将线程池状态设置为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 打断 空闲的工作线程
interruptIdleWorkers();
// 给子类提供一些扩展方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结线程池
tryTerminate();
}
final void tryTerminate() {
for (; ; ) {
// 获取存储状态和线程数量的 原子整数
int c = ctl.get();
// 如果存在以下三种情况,尝试终结线程池失败
// 1、线程池状态为RUNNING
// 2、线程池状态为 RUNNING SHUTDOWN STOP (状态值大于TIDYING)
// 3、线程池状态为SHUTDOWN,但阻塞队列中还有任务等待执行
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 如果存活线程数不为0 打断空闲的线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// 尝试使用CAS将线程池状态改为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 通过CAS将线程池状态改为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdownNow() 首先将线程池状态设置为STOP,然后尝试停止所有的正在执行或暂停人物的线程,并返回等待执行的任务的列表
public List<Runnable> shutdownNow() {
// 返回的任务列表
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 获取全局锁
mainLock.lock();
try {
// 通过安全管理器看是否有权关闭线程池
checkShutdownAccess();
// 将线程池状态设置为STOP
advanceRunState(STOP);
// 遍历打断所有线程
interruptWorkers();
// 将未执行的任务从队列中移除,然后返回给调用者
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
可以从以下角度分析:
注意:
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。
有一次,我们系统里后台任务线程的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现的问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里,如果当时我们设置成无界队列,那么线程池的队列就会越来越多,最多可能会撑满内存,OutOfMemory,导致整个系统不可用
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,快速定位问题。可以通过线程池提供的参数进行监控:
也可以通过扩展线程池进行监控,通过继承线程池来自定义线程池,重写线程池的下列方法,在一些特定的时间段进行一些监控
主要字段
主要方法
/**
* 阻塞队列
* @param <T>
*/
class BlockQueue<T> {
/**
* 任务队列
*/
private Deque<T> queue = new ArrayDeque<>();
/**
* 锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 任务队列满后 生产者进入等待队列等待
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 任务队列空时,消费者进入等待队列等待
*/
private Condition emptyWaitSet = lock.newCondition();
/**
* 容量
*/
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/**
* 从任务队列获取任务
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 有超时时间的获取任务
* @param timeout
* @param unit
* @return
*/
public T take(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout 转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
//返回剩余的时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 添加任务方法
* @param task
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
* 获取任务队列任务数
* @return
*/
public int size () {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}
主要方法
/**
* 线程池
*/
class ThreadPool {
/**
* 任务队列
*/
private BlockQueue<Runnable> taskQueue;
/**
* 线程集合
*/
private HashSet<Worker> workers = new HashSet();
/**
* 核心线程数
*/
private int coreSize;
/**
* 获取任务超时时间
*/
private long timeout;
/**
* 时间工具
*/
private TimeUnit timeUnit;
/**
* 执行任务
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 当任务数没有超过核心线程数,直接交给线程执行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker对象"+worker);
worker.start();
}else { //超过核心线程数 就加入任务队列暂存
System.out.println("线程数满,将任务加入任务队列" + task);
taskQueue.put(task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
}
/**
* 真正执行任务的线程
*/
private class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
try {
System.out.println("正在执行任务"+task);
task.run();
}catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers) {
System.out.println("删除"+this+"线程");
workers.remove(this);
}
}
}
}
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
System.out.println("任务"+j + "执行完成");
});
}
}
以上实现了基本的线程池,但是如果任务数量庞大,并且执行任务比较缓慢,任务队列满后,迟迟等不到解决,并且有新的任务来,会一直处于等待状态,所以要添加拒绝策略
/**
* 待超时的添加任务
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean put(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
队列满后,新的任务可以选择继续死等,带超时的等待,放弃执行任务,抛出异常等等很多解决策略,所以应该把选择权交给工程师,提高可扩展性,把所有操作抽象成接口,让使用者自己实现
/**
* 拒绝策略
* @param <T>
*/
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T task);
}
给 ThreadPool 线程池加入拒绝策略属性,并在构造方法中初始化
/**
* 拒绝策略
*/
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
将执行任务中 加入任务队列的方法改成 tryPut() 尝试加入队列
/**
* 执行任务
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 当任务数没有超过核心线程数,直接交给线程执行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker对象"+worker);
worker.start();
}else { //超过核心线程数 就加入任务队列暂存
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
并且在阻塞队列中实现该方法
/**
* 尝试加入任务队列
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
rejectPolicy.reject(this,task);
}else {
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
1, (queue, task) -> {
queue.put(task,500,TimeUnit.MILLISECONDS);//测试使用有超时时间的拒绝策略
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务"+j + "执行完成");
});
}
}
package threadPool_Test;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description: 自定义线程池
* @Author: Aiguodala
* @CreateDate: 2021/4/11 14:59
*/
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
1, (queue, task) -> {
queue.put(task,500,TimeUnit.MILLISECONDS);//测试使用有超时时间的拒绝策略
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务"+j + "执行完成");
});
}
}
}
/**
* 拒绝策略
* @param <T>
*/
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T task);
}
/**
* 线程池
*/
class ThreadPool {
/**
* 任务队列
*/
private BlockQueue<Runnable> taskQueue;
/**
* 线程集合
*/
private HashSet<Worker> workers = new HashSet();
/**
* 核心线程数
*/
private int coreSize;
/**
* 获取任务超时时间
*/
private long timeout;
/**
* 时间工具
*/
private TimeUnit timeUnit;
/**
* 拒绝策略
*/
private RejectPolicy<Runnable> rejectPolicy;
/**
* 执行任务
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 当任务数没有超过核心线程数,直接交给线程执行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker对象"+worker);
worker.start();
}else { //超过核心线程数 就加入任务队列暂存
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 真正执行任务的线程
*/
private class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
try {
System.out.println("正在执行任务"+task);
task.run();
}catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers) {
System.out.println("删除"+this+"线程");
workers.remove(this);
}
}
}
}
/**
* 阻塞队列
* @param <T>
*/
class BlockQueue<T> {
/**
* 任务队列
*/
private Deque<T> queue = new ArrayDeque<>();
/**
* 锁
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 任务队列满后 生产者进入等待队列等待
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 任务队列空时,消费者进入等待队列等待
*/
private Condition emptyWaitSet = lock.newCondition();
/**
* 容量
*/
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/**
* 从任务队列获取任务
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 有超时时间的获取任务
* @param timeout
* @param unit
* @return
*/
public T take(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout 转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
//返回剩余的时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 添加任务方法
* @param task
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
System.out.println("等待加入任务队列"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
* 待超时的添加任务
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean put(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
System.out.println("等待加入任务队列");
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
/**
* 获取任务队列任务数
* @return
*/
public int size () {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
/**
* 尝试加入任务队列
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
rejectPolicy.reject(this,task);
}else {
System.out.println("加入任务队列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
参考《Java 并发编程的艺术》
例如: 我们在百度中搜索 尊托云数,则网址后面的参数就是 http://www.baidu.com/...
我之前写过一个简易版的自动+手动轮播图: 简易轮播图 但是这个轮播图在切换的时...
如果你很在意你的终端的外观的话,一个跨 shell 的提示符可以让你轻松地定制和配...
接着昨天的,如果forEach中的items类型是map或者Collection类型的,怎样使用增强...
本文实例为大家分享了js实现电灯开关效果的具体代码,供大家参考,具体内容如下 ...
关于我的SWFObject V1.5的使用过程,以 上篇 中的介绍暂时告一段落了,下面我将...
Asp 解析 XML并分页显示,示例源码如下: 复制代码 代码如下: !DOCTYPE html PUB...
主要目的 a. 掌握获取 GridPanel 当前行的各个字段值的方法 b. 掌握如何将前台数...
发现每个编辑实例都可以加载不同的css样式表,而且其样式不继承 页面的css。 于...
Mysql数据库五——mysql事务及引擎 一、事务 1、事务的概念 2、事务的ACID特点 ...