Java并发32:CyclicBarrier的基本方法和应用场景实例
2018-04-01 17:17
731 查看
[超级链接:Java并发学习系列-绪论]
本章主要对CyclicBarrier进行学习。
所谓Cyclic即 循环 的意思,所谓Barrier即 屏障 的意思。
所以综合起来,CyclicBarrier指的就是 循环屏障,虽然这个叫法很奇怪,但是确能很好地表示它的作用。
其作用在JDK注释中是这样描述的:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
The barrier is called cyclic because it can be re-used after the waiting threads are released.
翻译过来,如下:
CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用CyclicBarrier很有帮助。
这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的。
CyclicBarrier的简单理解
其实,我更喜欢[人满发车]这个词来理解CyclicBarrier的作用:
长途汽车站提供长途客运服务。
当等待坐车的乘客到达20人时,汽车站就会发出一辆长途汽车,让这20个乘客上车走人。
等到下次等待的乘客又到达20人是,汽车站就会又发出一辆长途汽车。
CyclicBarrier的应用场景
CyclicBarrier常用于多线程分组计算。
——CyclicBarrier(parties)
初始化相互等待的线程数量的构造方法。
——CyclicBarrier(parties,Runnable barrierAction)
初始化相互等待的线程数量以及屏障线程的构造方法。
屏障线程的运行时机:等待的线程数量=parties之后,CyclicBarrier打开屏障之前。
举例:在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。
——getParties()
获取CyclicBarrier打开屏障的线程数量,也成为方数。
——getNumberWaiting()
获取正在CyclicBarrier上等待的线程数量。
——await()
在CyclicBarrier上进行阻塞等待,直到发生以下情形之一:
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
——await(timeout,TimeUnit)
在CyclicBarrier上进行限时的阻塞等待,直到发生以下情形之一:
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
——isBroken()
获取是否破损标志位broken的值,此值有以下几种情况:
CyclicBarrier初始化时,broken=false,表示屏障未破损。
如果正在等待的线程被中断,则broken=true,表示屏障破损。
如果正在等待的线程超时,则broken=true,表示屏障破损。
如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。
——reset()
使得CyclicBarrier回归初始状态,直观来看它做了两件事:
如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。
了解CyclicBarrier(parties)/getParties()/await()/getNumberWaiting()的基本用法。
理解循环的意义。
示例代码:
运行结果:
从运行结果,可以更好的理解循环的意义。
熟悉reset()的用法
理解回归初始状态的意义
实例代码:
执行结果:
练习await()/await(timeout,TimeUnit)/isBroken()的使用方法
理解破损标志位broken的状态转换
实例代码:
运行结果:
练习CyclicBarrier(int parties, Runnable barrierAction)的用法
理解屏障线程的意义
实例代码:
运行结果:
模拟多线程分组计算
有一个大小为50000的随机数组,用5个线程分别计算10000个元素的和
然后在将计算结果进行合并,得出最后的结果。
重点分析:
用5个线程分别计算:定义一个大小为5的线程池。
计算结果进行合并:定义一个屏障线程,将上面5个线程计算的子结果信息合并。
实例代码:
运行结果:
本章主要对CyclicBarrier进行学习。
1.CyclicBarrier简介
CyclicBarrier,是JDK1.5的java.util.concurrent并发包中提供的一个并发工具类。所谓Cyclic即 循环 的意思,所谓Barrier即 屏障 的意思。
所以综合起来,CyclicBarrier指的就是 循环屏障,虽然这个叫法很奇怪,但是确能很好地表示它的作用。
其作用在JDK注释中是这样描述的:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
The barrier is called cyclic because it can be re-used after the waiting threads are released.
翻译过来,如下:
CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用CyclicBarrier很有帮助。
这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的。
CyclicBarrier的简单理解
其实,我更喜欢[人满发车]这个词来理解CyclicBarrier的作用:
长途汽车站提供长途客运服务。
当等待坐车的乘客到达20人时,汽车站就会发出一辆长途汽车,让这20个乘客上车走人。
等到下次等待的乘客又到达20人是,汽车站就会又发出一辆长途汽车。
CyclicBarrier的应用场景
CyclicBarrier常用于多线程分组计算。
2.CyclicBarrier方法说明
CyclicBarrier提供的方法有:——CyclicBarrier(parties)
初始化相互等待的线程数量的构造方法。
——CyclicBarrier(parties,Runnable barrierAction)
初始化相互等待的线程数量以及屏障线程的构造方法。
屏障线程的运行时机:等待的线程数量=parties之后,CyclicBarrier打开屏障之前。
举例:在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。
——getParties()
获取CyclicBarrier打开屏障的线程数量,也成为方数。
——getNumberWaiting()
获取正在CyclicBarrier上等待的线程数量。
——await()
在CyclicBarrier上进行阻塞等待,直到发生以下情形之一:
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
——await(timeout,TimeUnit)
在CyclicBarrier上进行限时的阻塞等待,直到发生以下情形之一:
在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。
当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。
当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。
其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。
——isBroken()
获取是否破损标志位broken的值,此值有以下几种情况:
CyclicBarrier初始化时,broken=false,表示屏障未破损。
如果正在等待的线程被中断,则broken=true,表示屏障破损。
如果正在等待的线程超时,则broken=true,表示屏障破损。
如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。
——reset()
使得CyclicBarrier回归初始状态,直观来看它做了两件事:
如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。
3.CyclicBarrier方法练习
3.1.练习一
练习目的:了解CyclicBarrier(parties)/getParties()/await()/getNumberWaiting()的基本用法。
理解循环的意义。
示例代码:
//构造函数1:初始化-开启屏障的方数 CyclicBarrier barrier0 = new CyclicBarrier(2); //通过barrier.getParties()获取开启屏障的方数 LOGGER.info("barrier.getParties()获取开启屏障的方数:" + barrier0.getParties()); System.out.println(); //通过barrier.getNumberWaiting()获取正在等待的线程数 LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:初始----" + barrier0.getNumberWaiting()); System.out.println(); new Thread(() -> { //添加一个等待线程 LOGGER.info("添加第1个等待线程----" + Thread.currentThread().getName()); try { barrier0.await(); LOGGER.info(Thread.currentThread().getName() + " is running..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } LOGGER.info(Thread.currentThread().getName() + " is terminated."); }).start(); Thread.sleep(10); //通过barrier.getNumberWaiting()获取正在等待的线程数 LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:添加第1个等待线程---" + barrier0.getNumberWaiting()); Thread.sleep(10); System.out.println(); new Thread(() -> { //添加一个等待线程 LOGGER.info("添加第2个等待线程----" + Thread.currentThread().getName()); try { barrier0.await(); LOGGER.info(Thread.currentThread().getName() + " is running..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } LOGGER.info(Thread.currentThread().getName() + " is terminated."); }).start(); Thread.sleep(100); System.out.println(); //通过barrier.getNumberWaiting()获取正在等待的线程数 LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting()); //已经打开的屏障,再次有线程等待的话,还会重新生效--视为循环 new Thread(() -> { LOGGER.info("屏障打开之后,再有线程加入等待:" + Thread.currentThread().getName()); try { //BrokenBarrierException barrier0.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } LOGGER.info(Thread.currentThread().getName() + " is terminated."); }).start(); System.out.println(); Thread.sleep(10); LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting()); Thread.sleep(10); new Thread(() -> { LOGGER.info("屏障打开之后,再有线程加入等待:" + Thread.currentThread().getName()); try { //BrokenBarrierException barrier0.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } LOGGER.info(Thread.currentThread().getName() + " is terminated."); }).start(); Thread.sleep(10); LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting());
运行结果:
2018-04-01 13:27:55 INFO - barrier.getParties()获取开启屏障的方数:2 2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:初始----0 2018-04-01 13:27:55 INFO - 添加第1个等待线程----Thread-0 2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:添加第1个等待线程---1 2018-04-01 13:27:55 INFO - 添加第2个等待线程----Thread-1 2018-04-01 13:27:55 INFO - Thread-1 is running... 2018-04-01 13:27:55 INFO - Thread-0 is running... 2018-04-01 13:27:55 INFO - Thread-1 is terminated. 2018-04-01 13:27:55 INFO - Thread-0 is terminated. 2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---0 2018-04-01 13:27:55 INFO - 屏障打开之后,再有线程加入等待:Thread-2 2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---1 2018-04-01 13:27:55 INFO - 屏障打开之后,再有线程加入等待:Thread-3 2018-04-01 13:27:55 INFO - Thread-3 is terminated. 2018-04-01 13:27:55 INFO - Thread-2 is terminated. 2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---0
从运行结果,可以更好的理解循环的意义。
3.2.练习二
练习目的:熟悉reset()的用法
理解回归初始状态的意义
实例代码:
CyclicBarrier barrier2 = new CyclicBarrier(2); //如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生 LOGGER.info("如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生"); barrier2.reset(); System.out.println(); Thread.sleep(100); //如果是一个已经打开一次的CyclicBarrier,则reset()之后,什么也不会发生 ExecutorService executorService2 = Executors.newCachedThreadPool(); //等待两次 for (int i = 0; i < 2; i++) { executorService2.submit(() -> { try { barrier2.await(); LOGGER.info("222屏障已经打开."); } catch (InterruptedException e) { //e.printStackTrace(); LOGGER.info("222被中断"); } catch (BrokenBarrierException e) { //e.printStackTrace(); LOGGER.info("222被重置"); } }); } barrier2.reset(); Thread.sleep(100); System.out.println(); //如果是一个 有线程正在等待的线程,则reset()方法会使正在等待的线程抛出异常 executorService2.submit(() -> { executorService2.submit(() -> { try { barrier2.await(); LOGGER.info("333屏障已经打开."); } catch (InterruptedException e) { //e.printStackTrace(); LOGGER.info("333被中断"); } catch (BrokenBarrierException e) { LOGGER.info("在等待过程中,执行reset()方法,等待的线程抛出BrokenBarrierException异常,并不再等待"); //e.printStackTrace(); } }); }); Thread.sleep(100); barrier2.reset(); executorService2.shutdown(); break;
执行结果:
2018-04-01 16:53:12 INFO - 如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生 2018-04-01 16:53:13 INFO - 222屏障已经打开. 2018-04-01 16:53:13 INFO - 222屏障已经打开. 2018-04-01 16:53:13 INFO - 在等待过程中,执行reset()方法,等待的线程跑出BrokenBarrierException异常,并不再等待
3.3.练习三
练习目的:练习await()/await(timeout,TimeUnit)/isBroken()的使用方法
理解破损标志位broken的状态转换
实例代码:
CyclicBarrier barrier1 = new CyclicBarrier(3); ExecutorService executorService = Executors.newCachedThreadPool(); //添加一个用await()等待的线程 executorService.submit(() -> { try { //等待,除非:1.屏障打开;2.本线程被interrupt;3.其他等待线程被interrupted;4.其他等待线程timeout;5.其他线程调用reset() barrier1.await(); } catch (InterruptedException e) { LOGGER.info(Thread.currentThread().getName() + " is interrupted."); //e.printStackTrace(); } catch (BrokenBarrierException e) { LOGGER.info(Thread.currentThread().getName() + " is been broken."); //e.printStackTrace(); } }); Thread.sleep(10); LOGGER.info("刚开始,屏障是否破损:" + barrier1.isBroken()); //添加一个等待线程-并超时 executorService.submit(() -> { try { //等待1s,除非:1.屏障打开(返回true);2.本线程被interrupt;3.本线程timeout;4.其他等待线程被interrupted;5.其他等待线程timeout;6.其他线程调用reset() barrier1.await(1, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.info(Thread.currentThread().getName() + " is interrupted."); //e.printStackTrace(); } catch (BrokenBarrierException e) { LOGGER.info(Thread.currentThread().getName() + " is been reset()."); //e.printStackTrace(); } catch (TimeoutException e) { LOGGER.info(Thread.currentThread().getName() + " is timeout."); //e.printStackTrace(); } }); Thread.sleep(100); LOGGER.info("当前等待线程数量:" + barrier1.getNumberWaiting()); Thread.sleep(1000); LOGGER.info("当前等待线程数量:" + barrier1.getNumberWaiting()); LOGGER.info("当等待的线程timeout时,当前屏障是否破损:" + barrier1.isBroken()); LOGGER.info("等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。"); System.out.println(); Thread.sleep(5000); //通过reset()重置屏障回初始状态,也包括是否破损 barrier1.reset(); LOGGER.info("reset()之后,当前屏障是否破损:" + barrier1.isBroken()); LOGGER.info("reset()之后,当前等待线程数量:" + barrier1.getNumberWaiting()); executorService.shutdown();
运行结果:
2018-04-01 17:01:16 INFO - 刚开始,屏障是否破损:false 2018-04-01 17:01:16 INFO - 当前等待线程数量:2 2018-04-01 17:01:17 INFO - pool-1-thread-1 is been broken. 2018-04-01 17:01:17 INFO - pool-1-thread-2 is timeout. 2018-04-01 17:01:17 INFO - 当前等待线程数量:0 2018-04-01 17:01:17 INFO - 当等待的线程timeout时,当前屏障是否破损:true 2018-04-01 17:01:17 INFO - 等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。 2018-04-01 17:01:22 INFO - reset()之后,当前屏障是否破损:false 2018-04-01 17:01:22 INFO - reset()之后,当前等待线程数量:0
3.4.练习四
练习目的:练习CyclicBarrier(int parties, Runnable barrierAction)的用法
理解屏障线程的意义
实例代码:
//构造器:设置屏障放开前做的事情 CyclicBarrier barrier3 = new CyclicBarrier(2, () -> { LOGGER.info("屏障放开,[屏障线程]先运行!"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } LOGGER.info("[屏障线程]的事情做完了!"); }); for (int i = 0; i < 2; i++) { new Thread(() -> { LOGGER.info(Thread.currentThread().getName() + " 等待屏障放开"); try { barrier3.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } LOGGER.info(Thread.currentThread().getName() + "开始干活...干活结束"); }).start(); }
运行结果:
2018-04-01 17:01:56 INFO - Thread-0 等待屏障放开 2018-04-01 17:01:56 INFO - Thread-1 等待屏障放开 2018-04-01 17:01:56 INFO - 屏障放开,[屏障线程]先运行! 2018-04-01 17:01:58 INFO - [屏障线程]的事情做完了! 2018-04-01 17:01:58 INFO - Thread-1开始干活...干活结束 2018-04-01 17:01:58 INFO - Thread-0开始干活...干活结束
4.应用场景
场景说明:模拟多线程分组计算
有一个大小为50000的随机数组,用5个线程分别计算10000个元素的和
然后在将计算结果进行合并,得出最后的结果。
重点分析:
用5个线程分别计算:定义一个大小为5的线程池。
计算结果进行合并:定义一个屏障线程,将上面5个线程计算的子结果信息合并。
实例代码:
/** * <p>CyclicBarrier-循环屏障-模拟多线程计算</p> * * @author hanchao 2018/3/29 22:48 **/ public static void main(String[] args) { //数组大小 int size = 50000; //定义数组 int[] numbers = new int[size]; //随机初始化数组 for (int i = 0; i < size; i++) { numbers[i] = RandomUtils.nextInt(100, 1000); } //单线程计算结果 System.out.println(); Long sum = 0L; for (int i = 0; i < size; i++) { sum += numbers[i]; } LOGGER.info("单线程计算结果:" + sum); //多线程计算结果 //定义线程池 ExecutorService executorService = Executors.newFixedThreadPool(5); //定义五个Future去保存子数组计算结果 final int[] results = new int[5]; //定义一个循环屏障,在屏障线程中进行计算结果合并 CyclicBarrier barrier = new CyclicBarrier(5, () -> { int sums = 0; for (int i = 0; i < 5; i++) { sums += results[i]; } LOGGER.info("多线程计算结果:" + sums); }); //子数组长度 int length = 10000; //定义五个线程去计算 for (int i = 0; i < 5; i++) { //定义子数组 int[] subNumbers = Arrays.copyOfRange(numbers, (i * length), ((i + 1) * length)); //盛放计算结果 int finalI = i; executorService.submit(() -> { for (int j = 0; j < subNumbers.length; j++) { results[finalI] += subNumbers[j]; } //等待其他线程进行计算 try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); } //关闭线程池 executorService.shutdown(); }
运行结果:
2018-04-01 17:05:47 INFO - 单线程计算结果:27487277 2018-04-01 17:05:47 INFO - 多线程计算结果:27487277
相关文章推荐
- Java并发33:Semaphore基本方法与应用场景实例
- Java中CountDownLatch、CyclicBarrier、Thread.join方法基本应用
- Java并发28:ThreadLocal学习笔记-简介、基本方法及应用场景
- Java并发19:Lock系列-Lock接口基本方法学习实例
- Java并发编程核心方法与框架-CyclicBarrier的使用
- Java并发20:Lock系列-Condition接口基本方法学习实例
- Java并发21:Lock系列-ReadWriteLock接口和ReentrantReadWriteLock类基本方法学习实例
- Java并发实例之CyclicBarrier的使用
- Java并发编程核心方法与框架-CyclicBarrier的使用
- Java多线程与并发库高级应用之公共屏障点CyclicBarrier
- Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解
- Java并发06:Thread的基本方法(3)-yield方法的分析与实例说明
- Java多线程并发常用类实例之:CyclicBarrier
- Java 多线程CountDownLatch、CyclicBarrier、Thread.join方法基本用法
- java 并发编程实战第三章同步辅助类CyclicBarrier解析
- java 1.5 并发流程控制CountDownLatch,CyclicBarrier,Semaphore
- java多线程之CyclicBarrier类基本用法
- Java并发学习笔记(四)-栅栏CyclicBarrier
- java并发中的协同工具类介绍-CountDownLatch-CyclicBarrier-Semphone-Exchanger
- JAVA四种基本排序方法实例总结