前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >程序员如何控制并发流程、CountDownLatch等四大工具

程序员如何控制并发流程、CountDownLatch等四大工具

原创
作者头像
Joseph_青椒
修改2023-09-03 20:35:26
1960
修改2023-09-03 20:35:26
举报
文章被收录于专栏:java_josephjava_joseph

程序员控制并发流程

线程协作、控制并发流程,我们开发,咋滴搞呢

什么是控制并发流程?

在不控制的情况下,并发下的多个线程,受线程调度器控制,不受我们程序员控制

控制并发流程的工具类,让程序员可以通过工具类控制线程合作,完成业务逻辑

比如,让线程A等待线程B执行完,再执行

有哪些控制并发流程的工具类

image-20230902205149683
image-20230902205149683

countDownLatch门闩

作用

控制并发流程

countDown是倒数,Latch是门闩

倒数门闩

比如,拼多多,人满才发货,就是这个思想

流程:开始--进入等待---倒数---继续工作

认识

CountDownLatch(int count)

一个构造函数,count是用来倒数的值

await:调用await方法的线程会被挂起,等待直到count为0才会执行

countDown,就是count--,为0时,线程会被重新唤起

image-20230902210335544
image-20230902210335544

图解认识下,

现在看代码演示

用法

用法一:

一个线程等待多个线程都执行完毕,再继续自己工作

这个场景是一等多的场景,比如去医院看病,检查完一项,盖一个章,盖好了再去主治医生进行下一步,或者拼多多,需要等其他人操作完,比如5个人拼团,你得等其他4个人执行countDown,才能拼单成功

代码语言:javascript
复制
/**
 * 工厂中,质检,5个人检查完才算通过
 */
public class CountDownLatchDemo1 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService  service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i+1;
            Runnable runnable = () -> {
                try {
                    Thread.sleep((long) (Math.random() * 10000));
                    System.out.println("No." + no + "完成了检查");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
?
            };
            service.submit(runnable);
        }
        System.out.println("等待5个人检查完.........");
        latch.await();
        System.out.println("所有人都检查完了");
    }
}

用法2:

多等一的场景,比如多个线程,要同时启动,等待一个人发令,类似于百米赛跑

真实场景:比如并发模拟的时候,压测,需要让线程都等着,同时的执行任务,模拟高并发场景,也会用到countDownLatch

代码语言:javascript
复制
/**
 * @Author:Joseph
 * 百米赛跑等射枪
 */
public class CounDownLatchDemo2 {
?
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i+1;
            Runnable runnable = ()->{
                System.out.println("No,"+no+"准备完毕,等待法令枪");
                try {
                    latch.await();
                    System.out.println("No,"+no+"开始跑步!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            service.submit(runnable);
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("开枪了!!!!!!");
        latch.countDown();
    }
?
}

puls玩法,重点裁判记录最后一个人跑完

代码语言:javascript
复制
/**
 * @Author:Joseph
 * 百米赛跑等射枪
 * 终点等待最后一个结束
 */
public class CounDownLatchDemo3 {
?
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i+1;
            Runnable runnable = ()->{
                System.out.println("No,"+no+"准备完毕,等待法令枪");
                try {
                    begin.await();
                    System.out.println("No,"+no+"开始跑步!");
                    Thread.sleep((long) (Math.random()*10000));
                    System.out.println("No,"+no+"到终点了");
                    end.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            service.submit(runnable);
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("开枪了!!!!!!");
        begin.countDown();
        end.await();
        System.out.println("所有人都到了,裁判宣布比赛结束");
    }
?
}

注意

countDownLatch是不能够重用的,结束了就失效了,无法进行复用

只能新new一个,或者用CyclicBarrier

总结:

构造方法,count代表倒数几次,

await等待,countDown倒数

用法:一等多,或者多等一,经典用法

注意不能回滚重置

Semaphore

认识

Semaphore不仅再java中有,操作系统os也是有的,很多人肯定学过!

java来模拟了一下这个而已

用途:限制或者管理有限资源的适用情况,

可以类比一个许可证,而且这个许可证是有限的,拿到许可证才能用这个任务

实际应用场景:面向B短的服务,一次查询数据超级 大,很容易拖垮线程,设计信号量为3,那么同时最多三个线程拿到这个资源进行适用。

比如:澡堂,一次只能容纳10个人,那么信号量的许可证设置为10,进去一个人,就-1,出来一个人就+1

代码层面

acquire获取许可证,release归还许可证

使用流程:

1:初始化Semphore并指定许可证数量

2:执行任务之前,调用acquire()方法

3:用完只会,调用release释放许可证

重要方法介绍

new Semaphore(int permits,boolean fair)

fair,ture是公平,不能插队,只能在等待对列里等待,

false的话,可以在一定情况下插队

acquire()

acquireUninterruptibly()

不响应中断,不会有异常来让这个线程处理

tryAcquire()

尝试获取许可证,不必一直等待,陷入阻塞,过一会儿在去查看许可证的空闲情况

tryAcqure(timeout)

等待几秒,获取不到,就去做别的事儿 ,与tryAcqure的区别就是会等几秒再走

release

归还许可证

用法

代码语言:javascript
复制
public class SemaphoreDemo {
?
    static Semaphore semaphore = new Semaphore(3,true);
?
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(50);
        Runnable runnable =  ()->{
            try {
                //可以获取多个许可证来分配权重,但是注意获取和释放许可证数量一样
                semaphore.acquire(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"拿到了许可证");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"释放了许可证");
            semaphore.release(3);
        };
        for (int i = 0; i < 100; i++) {
            service.submit(runnable);
        }
        service.shutdown();
    }
?
}

注意点

image-20230903132035896
image-20230903132035896
image-20230903132117875
image-20230903132117875

Condition接口(条件对象)

image-20230903184459900
image-20230903184459900

主要就是await,发起等待,等待其他线程的signal,这个和操作系统的semphore的方法名一样啊哈哈,学过操作系统的肯定直到,只不过这个,没有计数,

signalAll是唤醒全部的线程,

基本用法

代码语言:javascript
复制
/**
 * condition基本用法
 * 特点:绑定在锁上面
 */
public class ConditionDemo1  {
?
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    void methon1() throws InterruptedException {
        lock.lock();
        try {
            System.out.println("条件不满足,开始await");
            condition.await();
            System.out.println("条件满足了,执行后续任务");
        }finally {
            lock.unlock();
        }
    }
    void method2(){
        lock.lock();
        try {
            System.out.println("准备工作完成,开始唤醒其他的线程");
            condition.signal();
        }finally {
            lock.unlock();
        }
    }
?
    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 conditionDemo1 = new ConditionDemo1();
        //注意,在主线程中,不能await再signal,已经阻塞了,不能再通过自己唤醒,所有new一个线程做
        new Thread(()->{
            try {
                Thread.sleep(1000);
                conditionDemo1.method2();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //注意,需要主线程启动到子线程这里之后再进行await,不然阻塞在上面,无法执行到创建线程这里
        conditionDemo1.methon1();
    }
}

用condition实现生成消费模型

代码语言:javascript
复制
**
 * 演示用Condition,实现生产者消费者模式
 */
public class ConditionDemo2 {
    /**
     * 生产消费模型必备,对列
     */
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();
?
    public static void main(String[] args) {
        ConditionDemo2 conditionDemo2 = new ConditionDemo2();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }
     class Consumer extends Thread{
        @Override
        public void run() {
            consume();
        }
?
        private void consume() {
            while (true){
                lock.lock();
                try {
                    while (queue.size()==0){
                        System.out.println("对列空,等待数据");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从对列中取走了一个数据,对列剩余"+queue.size()+"个元素");
                }finally {
                    lock.unlock();
                }
            }
        }
    }class Producer extends Thread{
        @Override
        public void run() {
            producer();
        }
?
        private void producer() {
            while (true){
                lock.lock();
                try {
                    while (queue.size()==queueSize){
                        System.out.println("对列满,不能再发数据");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("从对列中插入了一个数据,对列剩余"+queue.size()+"个元素");
                }finally {
                    lock.unlock();
                }
            }
        }
    }
?
?
}

注意:

Lock代替synchronzed,那么Conditon就是代替Object的wait,notify的,对应await signal

用法,和性质,几乎一样

await方法会自动释放持有的Lock锁,和Obect.wait,不 需要手动的释放锁,调用等待的方法的适合,就会释放锁,然后进入阻塞

所以两者的等待唤醒方法,调用的时候,必须持有锁,否则会抛出异常

conditon 优点是: condition可以更灵活的去做等待。

CyclicBarrier循环栅栏

CyclicBarrier和CountDownLatch类似,都可以实现不同的任务等待,然后同时运行,

CyclicBarrier与countDownLatch区别,

image-20230903195557776
image-20230903195557776

另外,cyclicBarrier到条件之后,可以再执行一个线程任务

看例子

代码语言:javascript
复制
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到了,统一出发");
            }
        });
        //cyclicBarrier相比与countDownLatch是可以重复用的,这里5改为10,相当于用了2次,可以自己改一下
        for (int i = 0; i < 5; i++) {
            new Thread(new Task(i,cyclicBarrier)).start();
        }
?
    }
    static class Task implements Runnable{
?
        private int id;
        private CyclicBarrier cyclicBarrier;
?
        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }
?
        @Override
        public void run() {
            System.out.println("线程"+id+"现在前往集合地点");
            try {
                Thread.sleep((long) (Math.random()*10000));
                System.out.println("线程"+id+"到了集合地点,等待其他人到达");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
?
}

总结分析

互相等待的时候,countDownLatch门闩很好用,想要复用工具的话,可以使用CyclicBarrier循环栅栏,

两者区别是countDownLatch扣减次数是以事件维度扣减,而CyclicBarrier是以线程扣减

涉及资源的有限分配,就可以用Semphore信号量,通过分配许可证来限制并发的数量

Condition条件对象是和lock配合的,通过这个可以实现线程的阻塞,和Object的wait、notify很像,

都是锁与阻塞配合。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是控制并发流程?
  • 有哪些控制并发流程的工具类
    • 作用
      • 认识
        • 用法
          • 注意
            • 总结:
            • Semaphore
              • 认识
                • 使用流程:
                  • 重要方法介绍
                    • 用法
                      • 注意点
                      • Condition接口(条件对象)
                        • 基本用法
                          • 用condition实现生成消费模型
                            • 注意:
                            • CyclicBarrier循环栅栏
                            • 总结分析
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
                            http://www.vxiaotou.com