学习笔记六:线程间的协作与通信之并发工具类
2018-07-26 17:15
519 查看
继续 上一篇 《 学习笔记五:线程间的协作与通信》
在jdk的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段,如下主要介绍工具类的使用。
CountDownLatch类实际上是使用计数器的方式去控制的,当我们初始化CountDownLatch的时候传入了一个int变量这个时候在类的内部初始化一个int的变量,每当我们调用countDownt()方法的时候就使得这个变量的值减1,而对于await()方法则去判断这个int的变量的值是否为0,是则表示所有的操作都已经完成,否则继续等待。
构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。计数器的值必须大于等于0,只是等于0的时候,计数器就是零,调用await方法时不会阻塞当前线程;这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值;一个线程调用countDown方法happen-before,另外一个线程调用await方法。
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
示例:boss等待所有员工来开会,当所有人员都到齐之后,boss宣布开始会议!!!
运行结果:注意,至于是谁先到会议室,每次运行结果都会不一样。因为主线程和子线程的调用时由CPU决定的
如果某个人缺席会议,我们不能让主线程一直等待,所以可以使用另外一个带指定时间的await方法-await(long time,TimeUtil unit)的那个带指定时间后,就好不再阻塞当前线程。jion也有类似的方法。
CyclicBarrier默认构造方法时CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达屏障了,然后当前线程被阻塞。 示例代码如下:
运行结果:因为主线程和子线程的调用时由CPU决定的,两个线程都有可能先执行,所以会产生不同的结果
如果把new CyclicBarrier(2) 修改为new CyclicBarrier(3),则主线程和子线程会永远等待,不会继续执行,因为第三个还没有到达屏障,所以之前到达屏障的两个线程都不会继续执行。
CyclicBarrier还提供了高级构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在现场到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,示例代码如下:
运行结果:当两个线程都到达屏障后,优先执行对象MyThread 的任务。
运行结果:最终结果:4
[b]所以CyclicBarrier可以处理更为复杂的业务场景,如:计算发生错误,可以重置计数器,并让线程重新执行一次。[/b]
CyclicBarrier其他用法及源代码如下,如:
isBroken() :阻塞的线程是否被中断,返回值boolean类型
getNumberWaiting() :获取Cyclic-Barrier阻塞的线程数量
假如有个需求,要读几万个文件的数据,因为是IO密集型任务,我们可以启动几十个线程并发读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库链接保存数据,否则会报错无法获取数据库链接。这时可以使用Semaphore来做流量控制,代码示例如下:
运行结果:
Semaphore简单用法如下:
上一篇:学习笔记五:线程间的协作与通信
下一篇:待续....
在jdk的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段,如下主要介绍工具类的使用。
1、等待多线程完成的CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。CountDownLatch类实际上是使用计数器的方式去控制的,当我们初始化CountDownLatch的时候传入了一个int变量这个时候在类的内部初始化一个int的变量,每当我们调用countDownt()方法的时候就使得这个变量的值减1,而对于await()方法则去判断这个int的变量的值是否为0,是则表示所有的操作都已经完成,否则继续等待。
构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。计数器的值必须大于等于0,只是等于0的时候,计数器就是零,调用await方法时不会阻塞当前线程;这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值;一个线程调用countDown方法happen-before,另外一个线程调用await方法。
与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
示例:boss等待所有员工来开会,当所有人员都到齐之后,boss宣布开始会议!!!
package com.black.example.mutilThread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * Created by 10250H on 2018/7/26. */ public class CountDownLatchDemo { //声明countDownLatch 变量,初始化线程数量(内部计数器) static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) { new Thread(new MyRunner(countDownLatch, "小李", 2000)).start(); new Thread(new MyRunner(countDownLatch, "小张", 4000)).start(); new Thread(new MyRunner(countDownLatch, "小王", 5000)).start(); try { System.out.println("等待员工到来开会。。。。。。。"); //注意这里是await。主线程将会一直等待在这里,当所有线程都执行 countDownLatch.countDown();之后当前线程才会继续执行 countDownLatch.await(); startMeeting("老板"); } catch (InterruptedException e) { e.printStackTrace(); } } private static void startMeeting(String name) { System.out.println(name + "说:人齐了。会议开始!!"); } static class MyRunner implements Runnable { CountDownLatch countDownLatch; String name; int time; public MyRunner(CountDownLatch countDownLatch, String name, int time) { this.countDownLatch = countDownLatch; this.name = name; this.time = time; } @Override public void run() { try { System.out.println(name + " 开始出发去公司。"); TimeUnit.SECONDS.sleep(1); System.out.println(name + " 终于到会议室!!!"); countDownLatch.countDown(); System.out.println(name + " 准备好了!!"); } catch (Exception e) { e.printStackTrace(); } } } }
运行结果:注意,至于是谁先到会议室,每次运行结果都会不一样。因为主线程和子线程的调用时由CPU决定的
如果某个人缺席会议,我们不能让主线程一直等待,所以可以使用另外一个带指定时间的await方法-await(long time,TimeUtil unit)的那个带指定时间后,就好不再阻塞当前线程。jion也有类似的方法。
2、同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier),它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认构造方法时CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达屏障了,然后当前线程被阻塞。 示例代码如下:
package com.black.example.mutilThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; /** * Created by 10250H on 2018/7/26. */ public class CyclicBarrierDemo1 { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new MyThread(1)).start(); new Thread(new MyThread(2)).start(); } static class MyThread implements Runnable{ private int counter; public MyThread(int counter){ this.counter = counter; } @Override public void run() { try { System.out.println("当前值输出:"+counter); TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行"); } catch (InterruptedException e) { e.printStackTrace(); }catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
运行结果:因为主线程和子线程的调用时由CPU决定的,两个线程都有可能先执行,所以会产生不同的结果
如果把new CyclicBarrier(2) 修改为new CyclicBarrier(3),则主线程和子线程会永远等待,不会继续执行,因为第三个还没有到达屏障,所以之前到达屏障的两个线程都不会继续执行。
CyclicBarrier还提供了高级构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在现场到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,示例代码如下:
package com.black.example.mutilThread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; public class CyclicBarrierDemo2 { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new MyThread(3)); public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { try { System.out.println("通用计数当前值:1"); cyclicBarrier.await(); System.out.println("默认执行通用线程2"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { System.out.println("通用计数当前值:2"); cyclicBarrier.await(); System.out.println("默认执行通用线程2"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }).start(); } static class MyThread implements Runnable{ private int counter; public MyThread(int counter){ this.counter = counter; } @Override public void run() { try { System.out.println("优先执行:"+counter); TimeUnit.SECONDS.sleep(1); System.out.println("counter="+counter+",线程="+Thread.currentThread().getName()+",继续执行"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果:当两个线程都到达屏障后,优先执行对象MyThread 的任务。
CyclicBarrier的应用场景
可以用于多线程计算数据,最后合并计算结果的场景。例如:用一个Excel保存用户的所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水。最后再用barrierAction总结出整个Excel的日均银行流水,示例代码如下:package com.black.example.mutilThread; import java.util.Map; import java.util.concurrent.*; public class BankWaterService implements Runnable { //创建4个屏障,处理完之后执行当前类的run方法 private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this); //假设只有4个sheet,所以启动4个线程 private Executor executor = Executors.newFixedThreadPool(4); //保存每个sheet计算出的银行流水结果 private ConcurrentHashMap<String,Integer> sheetCountMap = new ConcurrentHashMap<String,Integer>(); private void count(){ for (int i=0;i<4;i++){ executor.execute(new Runnable() { @Override public void run() { //1:计算当前sheet的银行流水数据,计算代码省略....伪代码如下: sheetCountMap.put(Thread.currentThread().getName(),1); //计算完成,插入屏障 try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } } @Override public void run() { int result=0; //汇总每个sheet的计算结果 for (Map.Entry<String,Integer> entry:sheetCountMap.entrySet()){ result+=entry.getValue(); } //输出结果 sheetCountMap.put("result",result); System.out.println("最终结果:"+result); } public static void main(String[] args) { BankWaterService bankWaterService = new BankWaterService(); bankWaterService.count(); } }
运行结果:最终结果:4
3、CyclicBarrier与CountDownLatch的区别
CountDownLatch计数器只能使用一次,而CyclicBarrier[b]计数器可以使用reset()方法重置。[/b][b]所以CyclicBarrier可以处理更为复杂的业务场景,如:计算发生错误,可以重置计数器,并让线程重新执行一次。[/b]
CyclicBarrier其他用法及源代码如下,如:
isBroken() :阻塞的线程是否被中断,返回值boolean类型
getNumberWaiting() :获取Cyclic-Barrier阻塞的线程数量
4、控制并发线程数的Semaphore
Semaphore(信号量)用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。应用场景
Semaphore可以用于流量控制,特别是公用资源有限的应用场景,比如数据库链接。假如有个需求,要读几万个文件的数据,因为是IO密集型任务,我们可以启动几十个线程并发读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库链接保存数据,否则会报错无法获取数据库链接。这时可以使用Semaphore来做流量控制,代码示例如下:
package com.black.example.mutilThread; import java.util.concurrent.*; public class SemaphoreDemo { private static final int THREAD_COUNT=30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(10);//10个并发 public static void main(String[] args) { for (int i=0;i<THREAD_COUNT;i++){ threadPool.execute(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"-----请求资源"); //请求获取资源,如果有空闲资源则会立即获取,进入临界区,否则将会等待,一直等待到获取到临界区资源 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"----获取资源,保存数据!"); TimeUnit.SECONDS.sleep(1); semaphore.release();//释放资源 } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
运行结果:
pool-1-thread-4-----请求资源 pool-1-thread-2-----请求资源 pool-1-thread-4----获取资源,保存数据! pool-1-thread-1-----请求资源 pool-1-thread-5-----请求资源 pool-1-thread-3-----请求资源 pool-1-thread-5----获取资源,保存数据! pool-1-thread-8-----请求资源 pool-1-thread-1----获取资源,保存数据! pool-1-thread-9-----请求资源 pool-1-thread-2----获取资源,保存数据! pool-1-thread-7-----请求资源 pool-1-thread-9----获取资源,保存数据! pool-1-thread-10-----请求资源 pool-1-thread-11-----请求资源 pool-1-thread-8----获取资源,保存数据! pool-1-thread-6-----请求资源 pool-1-thread-3----获取资源,保存数据! pool-1-thread-13-----请求资源 pool-1-thread-11----获取资源,保存数据! pool-1-thread-10----获取资源,保存数据! pool-1-thread-12-----请求资源 pool-1-thread-7----获取资源,保存数据! pool-1-thread-15-----请求资源 pool-1-thread-14-----请求资源 pool-1-thread-16-----请求资源 pool-1-thread-17-----请求资源 pool-1-thread-18-----请求资源 pool-1-thread-19-----请求资源 pool-1-thread-21-----请求资源 pool-1-thread-20-----请求资源 pool-1-thread-22-----请求资源 pool-1-thread-23-----请求资源 pool-1-thread-24-----请求资源 pool-1-thread-25-----请求资源 pool-1-thread-27-----请求资源 pool-1-thread-26-----请求资源 pool-1-thread-28-----请求资源 pool-1-thread-29-----请求资源 pool-1-thread-30-----请求资源 pool-1-thread-6----获取资源,保存数据! pool-1-thread-13----获取资源,保存数据! pool-1-thread-12----获取资源,保存数据! pool-1-thread-15----获取资源,保存数据! pool-1-thread-16----获取资源,保存数据! pool-1-thread-17----获取资源,保存数据! pool-1-thread-14----获取资源,保存数据! pool-1-thread-21----获取资源,保存数据! pool-1-thread-19----获取资源,保存数据! pool-1-thread-18----获取资源,保存数据! pool-1-thread-20----获取资源,保存数据! pool-1-thread-22----获取资源,保存数据! pool-1-thread-23----获取资源,保存数据! pool-1-thread-26----获取资源,保存数据! pool-1-thread-27----获取资源,保存数据! pool-1-thread-25----获取资源,保存数据! pool-1-thread-24----获取资源,保存数据! pool-1-thread-30----获取资源,保存数据! pool-1-thread-29----获取资源,保存数据! pool-1-thread-28----获取资源,保存数据!
Semaphore简单用法如下:
//构造方法,可用的许可证数量,默认使用非公平锁的方式创建 public Semaphore(int permits){...} //尝试获取许可证 public boolean tryAcquire(){...} //尝试获取许可证,在指定时间内若获取不到则返回 public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException{...} //返回信号量中当前可用的许可证数 public int availablePermits(){...} //返回正在等待获取许可证的线程数(估计值) public final int getQueueLength() {...} //查询是否有任何线程等待获取许可证 public final boolean hasQueuedThreads(){...} //减少reduction个许可证,这个方法在使用的子类中很有用跟踪不可用资源的信号量 protected void reducePermits(int reduction) {...} //返回所有等待获取许可证的线程集合 protected Collection<Thread> getQueuedThreads(){...}
上一篇:学习笔记五:线程间的协作与通信
下一篇:待续....
相关文章推荐
- Java线程学习笔记之线程协作(通信)
- 学习笔记五:线程间的协作与通信
- Java并发学习笔记(13)线程之间的协作(Object.wait(),notifu(),notifyAll() Thread.join())
- 学习java多线程的笔记3-使用BlockingQueue阻塞队列来模拟两个线程之间的通信
- iOS学习笔记-104.多线程03——线程间通信图片下载与时间计算
- Java核心知识点学习----多线程并发之线程间的通信,notify,wait
- 【Java并发编程的艺术】【学习笔记】并发工具类
- 张孝祥java.concurrent线程并发学习笔记 - 传统线程回顾
- 黑马程序员—11—java基础:有关线程通信的学习笔记和学习心得体会
- java学习笔记之线程并发库
- python学习笔记:多并发(线程、进程、协程)
- Java并发学习笔记(4)线程的取消,关闭和异常终止
- Java学习笔记(65)---------线程协作
- 11.9-全栈Java笔记: 线程并发协作(生产者/消费者模式)
- java学习——java高级特性,线程,并发 笔记
- java并发编程学习3--线程通信练习
- JavaSE学习笔记--线程并发库
- Java学习笔记46(多线程三:线程之间的通信)
- Java线程学习笔记之并发集合类
- 多线程学习笔记1-概念、创建、互斥锁、线程间通信