当前位置:主页 > 查看内容

Java 深入理解线程池

发布时间:2021-07-07 00:00| 位朋友查看

简介:文章目录 一、Java 中的线程池 1. 线程池状态 2. 线程池主要属性参数 3. 线程池的实现原理 3.1 ThreadPoolExecutor 线程池主要处理流程 3.2 线程池方法解析 4. 合理地配置线程池 5. 线程池的监控 二、手写线程池 1. 实现阻塞队列 2. 实现线程池 3. 测试 4.……

一、Java 中的线程池

1. 线程池状态

ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

在这里插入图片描述

线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示

  • 使用一个数来表示两个值的主要原因是:可以通过一次CAS同时更改两个属性的值
	// 原子整数,前3位保存了线程池的状态,剩余位保存的是线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
	//去掉前三位保存线程状态的位数,剩下的用于保存线程数量
    private static final int COUNT_BITS = Integer.SIZE - 3;

	// 2^COUNT_BITS次方,表示可以保存的最大线程数
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

获取线程池状态、线程数量以及合并两个值的操作

    // Packing and unpacking ctl
	
	// 传入 ctl 值 获取运行状态 该操作会让除高3位以外的数全部变为0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

	// 传入 ctl 值 获取运行线程数  该操作会让高3位为0
    private static int workerCountOf(int c)  { return c & CAPACITY; }

	// 传入 rs 运行状态 wc 线程数量 计算ctl新值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

2. 线程池主要属性参数

	//阻塞队列,用于存放来不及被核心线程执行的任务
    private final BlockingQueue<Runnable> workQueue;
	
	// 全局锁,解决创建销毁线程等线程安全问题
    private final ReentrantLock mainLock = new ReentrantLock();

	// 用于存放核心线程的容器,只有当持有锁时才能够获取其中的元素
    private final HashSet<Worker> workers = new HashSet<Worker>();

	//线程工厂,给线程取名字
    private volatile ThreadFactory threadFactory;

	// 拒绝执行处理器 处理拒绝策略
    private volatile RejectedExecutionHandler handler;

    // 救急线程(或者核心线程)空闲时的最大生存时间
    private volatile long keepAliveTime;

	// 核心线程数
    private volatile int corePoolSize;

	// 最大线程数 
	// 最大线程数 - 核心线程数 = 九级线程数
    private volatile int maximumPoolSize;

    // 默认拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
  • corePoolSize : (核心线程数量),如果调用了线程池的 prestartAllCoreThreads( ) 方法,线程池会提前创建并启动所有基本线程,否则是懒惰创建
  • workQueue:用于保存等待执行的任务的阻塞队列。可以选择以下几个具体实现
    • ArrayBlockingQueue:是一个基于数组的有界阻塞队列,按FIFO(先进先出原则)排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。
    • LinkedBlockingQuene:基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。吞吐量通常要高于ArrayBlockingQueue 。静态工厂方法 Executors.newFixedThreadPool( ) 使用了该队列
    • ③ SynchronousQuene 是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene ,静态工厂方法 Executors.newCachedThreadPool() 用的是此队列
    • PriorityBlockingQueue:具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
  • maximumPoolSize:线程池最大线程数量,包括了核心线程数量和救急线程数量
  • threadFactory:线程工厂,可以给线程设置名字等
  • handler:拒绝执行处理器 处理拒绝策略 在处理过程中具体讲解
  • keepAliveTime:线程活动保持时间,线程池工作线程空闲后,保持存活的时间,所以,如果任务很多,并且每个任务执行时间很多,可以调大存活时间,提高线程利用率
  • unit:空闲线程存活时间单位

3. 线程池的实现原理

3.1 ThreadPoolExecutor 线程池主要处理流程

在这里插入图片描述

在这里插入图片描述

  1. 使用者 发布任务
  2. 如果当前运行的线程少于 核心线程数(corePoolSize),则创建新线程来执行任务(这一步需要获得全局锁,不然会引发线程安全问题)
  3. 如果运行的线程等于或者大于corePoolSize 则将任务加入阻塞队列(BlockQueue)
  4. 如果BlockQueue 已满,无法将任务加入队列,则创建新线程来处理任务(这同样需要获得全局锁)
    • 此处就用到救急线程,其数量就是最大线程数减去核心线程数的数量
  5. 如果创建新线程使当前运行的线程超出maximumPoolSize 任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法
    • 拒绝策略 jdk 提供了 4 种实现
      在这里插入图片描述

    • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略

    • CallerRunsPolicy:让调用者运行任务

    • DiscardPolicy:放弃本次任务

    • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之

明白了上述内容 我们就可以看看源码是如何实现的

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

		// 工作线程数小于核心线程数 就 调用addWorker 创建新线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

		// 如果线程池还在运行,并且可以加入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();

			// 重新检查线程池是否还在运行 或者有的线程已经死了 必要时回滚队列并且采用拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);

			// 线程池仍在运行 采用救急线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

		// 如果创建线程失败,则采用拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

3.2 线程池方法解析

① 添加线程的addWorker( ) 方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (; ; ) {
            // 获取表示状态和线程数的原子整数
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);

            // 如果线程池状态不是 RUNNING 或者 阻塞队列中有任务 则创建线程失败
            if (rs >= SHUTDOWN &&
                    !(rs == SHUTDOWN &&
                            firstTask == null &&
                            !workQueue.isEmpty()))
                return false;

            for (; ; ) {
                // 获取线程个数
                int wc = workerCountOf(c);
                // 如果线程数大于容量 或者 大于核心线程数或者最大线程数(用哪个绑定取决于传入的core)则创建线程失败
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 在多线程情况下 如果CAS创建线程 修改 原子整数 失败 则回滚到retry 重新循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 重新获取 表示状态和线程个数的原子整数
                c = ctl.get();
                // 如果 运行状态和当初不同,则回滚重新循环
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {

            // 创建新线程处理任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 创建新线程需要获得全局锁
                mainLock.lock();
                try {
                    //加锁的同时再次检测 避免在释放锁之前调用了shut down
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        // 再次确认线程存活
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 将该线程 加入到 HashSet集合中(线程池)
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 更新标志位
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 如果工作线程成功添加,开始线程开始工作 并更新标志位
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程启动失败 
            // 调用addWorkerFailed(w)方法: 删除该工作线程 工作线程数减一,并且尝试终止线程池
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

② 向线程池提交任务

  • execute() : 上面已经分析过,用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
  • submit() :用于提交需要返回值的任务,线程池会返回一个 future 对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future 的get () 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,也可以使用带超时时间的get() ,这里着重分析该方法

使用

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        // 通过submit执行Callable中的call方法
        Future<String> future = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "OKK";
            }
        });

        try {
            // 通过future 来获得返回值
            System.out.println(future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

③ 关闭线程池的方法

shutdown() 将线程池的状态设置成 SHUTDOWN 中断没有执行任务的线程,其他线程执行完任务,自己消亡

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 获取全局锁
        mainLock.lock();
        try {
            // 通过安全管理器看是否有权关闭线程池
            checkShutdownAccess();
            // 将线程池状态设置为 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 打断 空闲的工作线程
            interruptIdleWorkers();
            // 给子类提供一些扩展方法
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终结线程池
        tryTerminate();
    }
    final void tryTerminate() {
        for (; ; ) {

            // 获取存储状态和线程数量的 原子整数
            int c = ctl.get();
            // 如果存在以下三种情况,尝试终结线程池失败
            // 1、线程池状态为RUNNING
            // 2、线程池状态为 RUNNING SHUTDOWN STOP (状态值大于TIDYING)
            // 3、线程池状态为SHUTDOWN,但阻塞队列中还有任务等待执行
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            // 如果存活线程数不为0 打断空闲的线程
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            // 获取全局锁
            mainLock.lock();
            try {
                // 尝试使用CAS将线程池状态改为 TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        // 通过CAS将线程池状态改为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdownNow() 首先将线程池状态设置为STOP,然后尝试停止所有的正在执行或暂停人物的线程,并返回等待执行的任务的列表

    public List<Runnable> shutdownNow() {
        // 返回的任务列表
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        // 获取全局锁
        mainLock.lock();
        try {
            // 通过安全管理器看是否有权关闭线程池
            checkShutdownAccess();
            // 将线程池状态设置为STOP
            advanceRunState(STOP);
            // 遍历打断所有线程
            interruptWorkers();
            // 将未执行的任务从队列中移除,然后返回给调用者
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

4. 合理地配置线程池

可以从以下角度分析:

  • 任务的性质:CPU密集型任务、IO密集型任务、混合型任务
    • CPU密集型应该配置尽可能小的线程,通常采用cpu核数+1能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统或其它原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费
    • IO密集型应该配置尽可能多的线程,因为CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行IO操作时、远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。
  • 任务的优先级:高、中和低
  • 任务的执行时间:长、中和短
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接

注意

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。
有一次,我们系统里后台任务线程的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现的问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里,如果当时我们设置成无界队列,那么线程池的队列就会越来越多,最多可能会撑满内存,OutOfMemory,导致整个系统不可用

5. 线程池的监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,快速定位问题。可以通过线程池提供的参数进行监控:

  • largestPoolSize:线程池里曾经创建过的最大线程数量,可以判断知道线程池是否满过
  • completedTaskCount:线程池已完成的任务数量
  • getPoolSize( ):线程池的线程数量
  • getActiveCount():获取活动的线程数

也可以通过扩展线程池进行监控,通过继承线程池来自定义线程池,重写线程池的下列方法,在一些特定的时间段进行一些监控

  • protected void beforeExecute(Thread t, Runnable r) { }
  • protected void afterExecute(Runnable r, Throwable t) { }
  • protected void terminated() { }

二、手写线程池

1. 实现阻塞队列

主要字段

  • 任务队列queue 用于存放发布的任务
  • ReentrantLock加锁保证取放任务的线程安全
  • fullWaitSet 和 emptyWaitSet 作为任务队列满或者空 时的等待队列

主要方法

  • 线程池获取任务 T take()
    • 重载方法,分别用于没有时间限制的获取任务以及带超时的获取任务
    • 若任务队列为空 进入等待队列
    • 不为空则选取第一个
  • 主线程 添加任务 void put(T task)
  • 获取 任务队列的任务个数 size ()
/**
 * 阻塞队列
 * @param <T>
 */
class BlockQueue<T> {

    /**
     * 任务队列
     */
    private Deque<T> queue = new ArrayDeque<>();

    /**
     * 锁
     */
    private ReentrantLock lock = new ReentrantLock();

    /**
     * 任务队列满后 生产者进入等待队列等待
     */
    private Condition fullWaitSet = lock.newCondition();

    /**
     * 任务队列空时,消费者进入等待队列等待
     */
    private Condition emptyWaitSet = lock.newCondition();

    /**
     * 容量
     */
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    /**
     * 从任务队列获取任务
     * @return
     */
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 有超时时间的获取任务
     * @param timeout
     * @param unit
     * @return
     */
    public T take(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            //将timeout 转换成纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    //返回剩余的时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 添加任务方法
     * @param task
     */
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 获取任务队列任务数
     * @return
     */
    public int size () {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }
}

2. 实现线程池

主要方法

  1. void execute (Runnable task)
    • 如果 任务数量小于 线程池中的线程数,则创建Worker对象(实现Thread类)来执行任务
    • 如果 大于线程数,则先放入阻塞队列中存放
  2. Worker 的 run 方法
    • 如果传入的任务不为空 则执行传入的任务
    • 执行完成之后 继续执行任务队列中的任务
    • 全部结束之后删除该线程
/**
 * 线程池
 */
class ThreadPool {

    /**
     * 任务队列
     */
    private BlockQueue<Runnable> taskQueue;

    /**
     * 线程集合
     */
    private HashSet<Worker> workers = new HashSet();

    /**
     * 核心线程数
     */
    private int coreSize;

    /**
     * 获取任务超时时间
     */
    private long timeout;

    /**
     * 时间工具
     */
    private TimeUnit timeUnit;


    /**
     * 执行任务
     * @param task
     */
    public void execute (Runnable task) {
        synchronized (workers) {
            // 当任务数没有超过核心线程数,直接交给线程执行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                System.out.println("新增worker对象"+worker);
                worker.start();
            }else { //超过核心线程数 就加入任务队列暂存
                System.out.println("线程数满,将任务加入任务队列" + task);
                taskQueue.put(task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueCapacity);
    }

    /**
     * 真正执行任务的线程
     */
    private class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在执行任务"+task);
                    task.run();
                }catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("删除"+this+"线程");
                workers.remove(this);
            }
        }
    }
}

3. 测试

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);

        for (int i = 0; i < 5; i++) {
            int j = i;
            threadPool.execute(() -> {
                System.out.println("任务"+j + "执行完成");
            });
        }
    }

在这里插入图片描述

  • 设置线程数为2个,执行代码 前两个任务进入时 创建两个线程执行任务,之后任务则无法执行进入任务队列
  • 等两个线程执行完任务继续 获取任务队列中的任务执行,如果超过任务获取等待时间,退出执行任务循环,没有任务后删除线程。

以上实现了基本的线程池,但是如果任务数量庞大,并且执行任务比较缓慢,任务队列满后,迟迟等不到解决,并且有新的任务来,会一直处于等待状态,所以要添加拒绝策略

4. 拒绝策略

4.1 带超时的添加任务

    /**
     * 待超时的添加任务
     * @param task
     * @param timeout
     * @param timeUnit
     * @return
     */
    public boolean put(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

4.2 拒绝策略(策略模式)

队列满后,新的任务可以选择继续死等,带超时的等待,放弃执行任务,抛出异常等等很多解决策略,所以应该把选择权交给工程师,提高可扩展性,把所有操作抽象成接口,让使用者自己实现

/**
 * 拒绝策略
 * @param <T>
 */
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue<T> queue, T task);
}

给 ThreadPool 线程池加入拒绝策略属性,并在构造方法中初始化

    /**
     * 拒绝策略
     */
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

将执行任务中 加入任务队列的方法改成 tryPut() 尝试加入队列

    /**
     * 执行任务
     * @param task
     */
    public void execute (Runnable task) {
        synchronized (workers) {
            // 当任务数没有超过核心线程数,直接交给线程执行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                System.out.println("新增worker对象"+worker);
                worker.start();
            }else { //超过核心线程数 就加入任务队列暂存
                //taskQueue.put(task);
                taskQueue.tryPut(rejectPolicy, task);

            }
        }
    }

并且在阻塞队列中实现该方法

  • 上锁,保证队列的线程安全
  • 如果队列满了,则调用自己实现的拒绝策略处理
  • 没有满则直接加入任务队列
    /**
     * 尝试加入任务队列
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity) {
                rejectPolicy.reject(this,task);
            }else {
                System.out.println("加入任务队列"+task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }

4.3 测试利用带超时时间的拒绝策略

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
                1, (queue, task) -> {
            queue.put(task,500,TimeUnit.MILLISECONDS);//测试使用有超时时间的拒绝策略
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务"+j + "执行完成");
            });
        }
    }

在这里插入图片描述

  • 设置阻塞队列大小为 1, 线程池线程数为1,执行第一个任务时, 第二个任务进入任务队列,第三个任务等待进入队列,0.5秒后第一个任务没有完成,任务队列还是满的,所以第三个任务放弃加入任务队列,所以最后只完成了两个任务。

5. 完整代码

package threadPool_Test;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Description: 自定义线程池
 * @Author: Aiguodala
 * @CreateDate: 2021/4/11 14:59
 */

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
                1, (queue, task) -> {
            queue.put(task,500,TimeUnit.MILLISECONDS);//测试使用有超时时间的拒绝策略
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务"+j + "执行完成");
            });
        }
    }
}

/**
 * 拒绝策略
 * @param <T>
 */
@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockQueue<T> queue, T task);
}

/**
 * 线程池
 */
class ThreadPool {

    /**
     * 任务队列
     */
    private BlockQueue<Runnable> taskQueue;

    /**
     * 线程集合
     */
    private HashSet<Worker> workers = new HashSet();

    /**
     * 核心线程数
     */
    private int coreSize;

    /**
     * 获取任务超时时间
     */
    private long timeout;

    /**
     * 时间工具
     */
    private TimeUnit timeUnit;

    /**
     * 拒绝策略
     */
    private RejectPolicy<Runnable> rejectPolicy;


    /**
     * 执行任务
     * @param task
     */
    public void execute (Runnable task) {
        synchronized (workers) {
            // 当任务数没有超过核心线程数,直接交给线程执行
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                System.out.println("新增worker对象"+worker);
                worker.start();
            }else { //超过核心线程数 就加入任务队列暂存
                //taskQueue.put(task);
                taskQueue.tryPut(rejectPolicy, task);

            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 真正执行任务的线程
     */
    private class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
                try {
                    System.out.println("正在执行任务"+task);
                    task.run();
                }catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println("删除"+this+"线程");
                workers.remove(this);
            }
        }
    }
}

/**
 * 阻塞队列
 * @param <T>
 */
class BlockQueue<T> {

    /**
     * 任务队列
     */
    private Deque<T> queue = new ArrayDeque<>();

    /**
     * 锁
     */
    private ReentrantLock lock = new ReentrantLock();

    /**
     * 任务队列满后 生产者进入等待队列等待
     */
    private Condition fullWaitSet = lock.newCondition();

    /**
     * 任务队列空时,消费者进入等待队列等待
     */
    private Condition emptyWaitSet = lock.newCondition();

    /**
     * 容量
     */
    private int capacity;

    public BlockQueue(int capacity) {
        this.capacity = capacity;
    }

    /**
     * 从任务队列获取任务
     * @return
     */
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 有超时时间的获取任务
     * @param timeout
     * @param unit
     * @return
     */
    public T take(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            //将timeout 转换成纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    //返回剩余的时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            fullWaitSet.signal();
            return task;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 添加任务方法
     * @param task
     */
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    System.out.println("等待加入任务队列"+task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列"+task);
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }


    /**
     * 待超时的添加任务
     * @param task
     * @param timeout
     * @param timeUnit
     * @return
     */
    public boolean put(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    System.out.println("等待加入任务队列");
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("加入任务队列"+task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    /**
     * 获取任务队列任务数
     * @return
     */
    public int size () {
        lock.lock();
        try {
            return queue.size();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 尝试加入任务队列
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity) {
                rejectPolicy.reject(this,task);
            }else {
                System.out.println("加入任务队列"+task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

参考《Java 并发编程的艺术》

;原文链接:https://blog.csdn.net/weixin_48922154/article/details/115681974
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文


随机推荐