CountDownLatch
以及CyclicBarrier
都是Java
里面的同步工具之一,本文介绍了两者的基本原理以及基本使用方法。
CountDownLatch
CountDownLatch
是一个同步工具类,常见的使用场景包括:
比如考虑这样一个场景,在一个电商网站中,用户点击了首页,需要一部分的商品,同时显示它们的价格,那么,调用的流程应该是:
解决这样的问题可以使用串行化或并行化操作,串行化就是逐一计算商品的售价,并返回,并行化就是获取商品后,并行计算每一个商品的售价,最后返回,显然后一种方案要比前一种要好,那么这时候就可以用上CountDownLatch
了。
一份简单的模拟代码如下:
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException{
List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
//计数器大小为商品列表的长度
final CountDownLatch latch = new CountDownLatch(list.size());
//线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
list.forEach(p-> executor.execute(()->{
System.out.println("Product "+p.id+" start calculate price ");
try{
//随机休眠模拟业务操作耗时
TimeUnit.SECONDS.sleep(current().nextInt(10));
p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
System.out.println("Product "+p.id+" calculate price completed");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
//每完成计算一个商品,将计数器减1,注意需要放在finally中
latch.countDown();
}
}));
//主线程阻塞直到所有的计数器为0,也就是等待所有的子任务计算价格完毕
latch.await();
System.out.println("All of prices calculate finished");
//手动终止,不然不会结束运行
executor.shutdown();
}
private static class Price{
private final int id;
private double price;
public Price(int id) {
this.id = id;
}
public int getId() {
return id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
}
输出:
代码比较简单,关键地方用上了注释,可以看到代码执行顺序如下:
值得注意的是计数器减1的操作需要放在finally
中,因为有可能会出现异常,如果出现异常导致计数器不能减少,那么主线程会一直阻塞。
另外,CountDownLatch
还有一个await(long timeout,TimeUnit unit)
方法,是带有超时参数的,也就是说,如果在超时时间内,计数器的值还是大于0(还有任务没执行完成),会使得当前线程退出阻塞状态。
CyclicBarrier
CyclicBarrier
与CountDownLatch
有很多类似的地方,也是一个同步工具类,允许多个线程在执行完相应的操作之后彼此等待到达同一个barrier point
(屏障点)。CyclicBarrier
也适合某个串行化的任务被拆分为多个并行化任务,这点与CountDownLatch
类似,但是CyclicBarrier
具备的一个更强大的功能是,CyclicBarrier
可以被重复使用。
先简单说一下CyclicBarrier
的实现原理:
CyclicBarrier
,传入一个int
参数,表示分片(parites
),通常意义上来说分片数就是任务的数量await()
,等待其他线程也到达barrier point
常见的使用方法是设置分片数为任务数+1,这样,可以在主线程中执行await()
,等待所有子任务完成。比如下面是使用CyclicBarrier
实现同样功能的模拟代码:
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
final CyclicBarrier barrier = new CyclicBarrier(11);
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
list.forEach(p-> executor.execute(()->{
System.out.println("Product "+p.id+" start calculate price ");
try{
TimeUnit.SECONDS.sleep(current().nextInt(10));
p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
System.out.println("Product "+p.id+" calculate price completed");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
try{
barrier.await();
}catch (InterruptedException | BrokenBarrierException e){
e.printStackTrace();
}
}
}));
barrier.await();
System.out.println("All of prices calculate finished");
executor.shutdown();
}
private static class Price{
private final int id;
private double price;
public Price(int id) {
this.id = id;
}
public int getId() {
return id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
}
输出相同,代码大部分相似,不同的地方有:
latch.countDown()
替换成了barrier.await()
latch.await()
替换成了barrier.await()
10
await()
方法会等待所有的线程到达barrier point
,上面代码执行流程简述如下:
CyclicBarrier
,分片数为11(子线程数+1)await()
,等待子线程执行完成await()
,等待其他线程也到达barrier point
注意一个很大的不同就是这里的线程池核心线程数目改成了 10,那么,为什么需要10?
因为如果是设置一个小于10的核心线程个数,由于线程池是会先创建核心线程来执行任务,核心线程满了之后,放进任务队列中,而假设只有5个核心线程,那么:
这样的话,会出现死锁,因为计算中的线程需要队列中的任务到达barrier point
才能结束,而队列中的任务需要核心线程计算完毕后,才能调度出来计算,这样死锁就出现了。
CyclicBarrier
与CountDownLatch
的一个最大不同是,CyclicBarrier
可以被重复使用,原理上来说,await()
会将内部计数器减1,当计数器减为0时,会自动进行计数器(分片数)重置。比如,在上面的代码中,由于遇上促销活动,需要对商品的价格再次进行计算:
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.ThreadLocalRandom.current;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException,BrokenBarrierException{
List<Price> list = IntStream.rangeClosed(1,10).mapToObj(Price::new).collect(Collectors.toList());
final CyclicBarrier barrier = new CyclicBarrier(11);
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
list.forEach(p-> executor.execute(()->{
System.out.println("Product "+p.id+" start calculate price.");
try{
TimeUnit.SECONDS.sleep(current().nextInt(10));
p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
System.out.println("Product "+p.id+" calculate price completed.");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
try{
barrier.await();
}catch (InterruptedException | BrokenBarrierException e){
e.printStackTrace();
}
}
}));
barrier.await();
System.out.println("All of prices calculate finished.");
//复制的一段相同代码
list.forEach(p-> executor.execute(()->{
System.out.println("Product "+p.id+" start calculate price again.");
try{
TimeUnit.SECONDS.sleep(current().nextInt(10));
p.setPrice(p.getPrice()*((p.getId() & 1) == 1 ? 0.9 : 0.7));
System.out.println("Product "+p.id+" calculate price completed.");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
try{
barrier.await();
}catch (InterruptedException | BrokenBarrierException e){
e.printStackTrace();
}
}
}));
barrier.await();
System.out.println("All of prices calculate finished again.");
executor.shutdown();
}
private static class Price{
private final int id;
private double price;
public Price(int id) {
this.id = id;
}
public int getId() {
return id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
}
将计算价格的代码复制一遍,其中没有手动修改计数器,只是调用await()
,输出如下:
可以看到,并没有对CycliBarrier
进行类似reset
之类的操作,但是依然能按正常逻辑运行,这是因为await()
内部会维护一个计数器,当计数器为0的时候,会自动进行重置,下面是await()
在OpenJDK 11
下的源码:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return this.dowait(false, 0L);
} catch (TimeoutException var2) {
throw new Error(var2);
}
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
ReentrantLock lock = this.lock;
lock.lock();
byte var9;
try {
//...
int index = --this.count;
if (index != 0) {
//计数器不为0的情况
//....
}
boolean ranAction = false;
try {
Runnable command = this.barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
this.nextGeneration();
var9 = 0;
} finally {
if (!ranAction) {
this.breakBarrier();
}
}
} finally {
lock.unlock();
}
return var9;
}
private void nextGeneration() {
this.trip.signalAll();
this.count = this.parties;
this.generation = new CyclicBarrier.Generation();
}
当计数器为0时,会生成新的Generation
,并将var9
置为0,最后返回var9
(在这个方法中var9
只有一处赋值,就是代码中的var9=0
,可以理解成直接返回0)。
CyclicBarrier
其他的一些常用方法CyclicBarrier(int parties,Runnable barrierAction)
:构造的时候传入一个Runnable
,表示所有线程到达barrier point
时,会调用该Runnable
await(long timeout,TimeUnit unit)
:与无参的await()
类似,底层调用的是相同的doWait()
,不过增加了超时功能isBroken()
:返回broken
状态,某个线程由于执行await
而进入阻塞,此时如果执行了中断操作(比如interrupt
),那么isBroken()
会返回true
。需要注意,处于broken
状态的CyclicBarrier
不能被直接使用,需要调用reset()
进行重置下面是CountDownLatch
与CyclicBarrier
的一些简单比较,相同点如下:
java.util.concurrent
包下的线程同步工具类不同点:
CountDownLatch
的await()
方法会等待计数器归0,而CyclicBarrier
的await()
会等待其他线程到达barrier point
CyclicBarrier
内部的计数器是可以被重置的,但是CountDownLatch
不可以CyclicBarrier
是由Lock
和Condition
实现的,而CountDownLatch
是由同步控制器AQS
实现的CyclicBarrier
不允许parties
为0,而CountDownLatch
允许count
为0事件的防抖和节流 防抖和节流函数是我们经常用到的函数,在实际的开发过程中,如...
一、概述 上一篇文章介绍了 使用 docker 部署 spring boot 并接入 skywalking ,...
使用Dreamweaver设计了一个个人简历,想要给简历添加自己的照片,该怎么添加并排...
再次感谢大家对 Flutter Engage China 活动 的关注和积极参与!我们在活动前后收...
An Lock Free ID Generator for Golang implementation View on GitHub . Snowfl...
记录下一个疑问,最近在重新看canvas做点Demo这样,时间是写在2019年11月5日,以...
前言 在上一篇文章中我为大家介绍了Simpe项目的一些 背景知识 以及如何使用 有限...
Dreamweaver中想要给文字中的个别文字添加背景色,该怎么添加呢?下面我们就来看...
研发背景,解决什么问题 点击约束:某个按钮触发一次点击后,待接口调用有结果都...
先看效果: 实现: 1.定义导航栏的文字标签: div class=tou sapn class=logo 北...