java中的并发工具类
2017-03-01 11:18
369 查看
概述
CountDownLatch:允许一个或多个线程 等待其他线程完成操作CyclicBarrier:当一组线程达到同步点时被阻塞,直到最后一个线程到达同步点时,所有被拦截的线程才会继续运行
Semaphore:信号量,用来控制同时访问特定资源的线程数量,它通过协调各线程以保证合理的使用公共资源
Exchanger:用于进行线程间的数据交换
CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程执行完成。CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。构造函数:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
CountDownLatch的构造函数可以传入一个int类型的参数N,表示需要等待N个点完成。调用CountDownLatch的countDown()方法时,N会减1。CountDownLatch的await方法会阻塞当前线程,直到N变为0.
CountDownLatch实例:
/** * 十个人分成两组比赛,每组5个,当发令枪响时,开始比赛 * @author Administrator * */ public class CountDownLatchDemo { private static final int GROUP_SIZE=5; private static final Random RANDOM = new Random(); private static void processOneGroup(final String groupName){ //等待所以选手准备就绪 final CountDownLatch preCountDownLatch = new CountDownLatch(GROUP_SIZE); //等待比赛开始 final CountDownLatch startCountDownLatch = new CountDownLatch(1); //等待比赛结束 final CountDownLatch endCountDownLatch = new CountDownLatch(GROUP_SIZE); System.out.println(groupName+",比赛开始:"); for (int i = 0; i < GROUP_SIZE; i++) { new Thread(String.valueOf(i)){ public void run() { preCountDownLatch.countDown();//准备完毕 System.out.println("第"+GROUP_SIZE+"组,第"+this.getName()+"号选手,已准备完毕!~"); try { //等待裁判发出开始指令 startCountDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { //运行一个随机时间,表示选手实际的比赛时间 Thread.sleep(Math.abs(RANDOM.nextInt())%1000); } catch (InterruptedException e) { e.printStackTrace(); } //完成比赛 endCountDownLatch.countDown(); }; }.start(); } try { preCountDownLatch.await();//等待所有的选手准备完毕 } catch (InterruptedException e) { e.printStackTrace(); } startCountDownLatch.countDown();//开始比赛 try { endCountDownLatch.await();//等待所有的选手结束比赛 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(groupName+" 比赛结束!"); } public static void main(String[] args) { processOneGroup("分组1"); processOneGroup("分组2"); } }
输出结果:
分组1,比赛开始:
第5组,第0号选手,已准备完毕!~
第5组,第2号选手,已准备完毕!~
第5组,第1号选手,已准备完毕!~
第5组,第3号选手,已准备完毕!~
第5组,第4号选手,已准备完毕!~
分组1 比赛结束!
分组2,比赛开始:
第5组,第0号选手,已准备完毕!~
第5组,第1号选手,已准备完毕!~
第5组,第2号选手,已准备完毕!~
第5组,第3号选手,已准备完毕!~
第5组,第4号选手,已准备完毕!~
分组2 比赛结束!
countDown方法的实现如下:
/** * 减少锁存器的计数,如果计数达到零,则释放所有等待的线程。 * 如果当前计数大于零,那么它被递减。如果新计数为零,则所有等待的线程被重新启用用于线程调度目的。如果当前计数等于零,则什么也不发生。 */ public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { // 递减计数; 直到信号转换为零时 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
/** * 确保释放传播,即使有其他进行中的获取/释放。 如果它需要信号,则以通常的方式尝试解除头的处理器。 但如果不是,状态设置为PROPAGATE以确保在释放后,传播继续。 另外,我们必须循环,以便在我们这样做的时候添加一个新的节点。 此外,与unparkSuccessor的其他用法不同,我们需要知道CAS是否重置状态失败,如果这样重新检查。 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
CountDownLatch与Thread.join()的区别;
调用thread.join() 方法必须等thread 执行完毕,当前线程才能继续往下执行,而CountDownLatch通过计数器提供了更灵活的控制,只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕.
CyclicBarrier
CyclicBarrier可以让一组线程到达一个同步点时被阻塞,直到最后一个线程到达同步点时,所有被阻塞的线程才会继续运行。默认构造方法:parties表示阻塞在同步点的线程数。
public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties;//表示仍在等待的线程数,只要一个线程到达同步点,count就减1,直到0为止 this.barrierCommand = barrierAction;//当线程全部到达同步点时,优先执行barrierAction }
每个线程调用await方法,告诉CyclicBarrier已经到达同步点,然后当前线程被阻塞.
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
CyclicBarrier实例如下:
/** * 用一个Excel保存用户所有银行流水,每个Sheet保存一个账号近一年的每笔银行 * 流水,现在需要统计用户的日军银行流水,先用多线程处理每个sheet,都执行完 * 之后,得到每个sheet的日均银行流水,最后用barrierAction技术整个Excel的 * 日均银行流水 * @author Administrator * */ public class CyclicBarrierDemo implements Runnable{ // private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this); //假设只有4个sheet,启动4个线程 private Executor executor = Executors.newFixedThreadPool(4); //保存每个sheet计算结果 private ConcurrentHashMap<String,Integer> sheetBankWaterCount = new ConcurrentHashMap<>(); private void count(){ for (int i = 0; i < 4; i++) { executor.execute(new Runnable() { @Override public void run() { //业务操作,计算sheet的银行流水数据,假设结果为1,代码省略 sheetBankWaterCount.put(Thread.currentThread().getName(), 1); try { cyclicBarrier.await(); } catch (Exception e) { } } }); } } //汇总操作 @Override public void run() { int result = 0; //汇总每个sheet计算的结果 for (Entry<String,Integer> entry : sheetBankWaterCount.entrySet()) { result+=entry.getValue(); } //将结果输出 System.out.println(result); } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.count(); } }
结果输出:
4
CyclicBarrier和CountDownLatch的区别:
CountDownLatch到达计算器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,所有CyclicBarrier可以使用更复杂的业务场景。
CyclicBarrier还提供了getNumberWaiting方法可以获得阻塞的线程数量。isBroken可以获得阻塞的线程是否被中断
需要注意的是,如果CyclicBarrier的parriers数比时间的线程数大时,那么这组线程将会被一直阻塞
Semaphore
Semaphore是用来控制同时访问特定资源 的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Semaphore可以用于做流量控制,特别是公有资源有限的应用场景,比如数据库链接。
Semaphore的构造方法Semaphore(int permits)接受一个整型的数组,表示最大的并发数。线程使用Semaphore的acquire()方法获得一个许可,使用之后调用release()方法归还许可。
Semaphore示例:
/** * 假设有10个线程要访问某个资源,但是我们的并发量控制为3 * @author Administrator * */ public class SemaphoreDemo { private static final int THREAD_COUNT=10; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(3); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println("线程"+Thread.currentThread().getName()+"获得访问资源的权利!"); Thread.sleep(TimeUnit.SECONDS.toSeconds(5)); System.out.println("线程"+Thread.currentThread().getName()+"访问结束!"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
输出结果如下:
线程pool-1-thread-1获得访问资源的权利!
线程pool-1-thread-2获得访问资源的权利!
线程pool-1-thread-3获得访问资源的权利!
线程pool-1-thread-2访问结束!
线程pool-1-thread-3访问结束!
线程pool-1-thread-4获得访问资源的权利!
线程pool-1-thread-1访问结束!
线程pool-1-thread-5获得访问资源的权利!
线程pool-1-thread-6获得访问资源的权利!
线程pool-1-thread-5访问结束!
线程pool-1-thread-6访问结束!
线程pool-1-thread-4访问结束!
线程pool-1-thread-8获得访问资源的权利!
线程pool-1-thread-9获得访问资源的权利!
线程pool-1-thread-7获得访问资源的权利!
线程pool-1-thread-7访问结束!
线程pool-1-thread-10获得访问资源的权利!
线程pool-1-thread-8访问结束!
线程pool-1-thread-9访问结束!
线程pool-1-thread-10访问结束!
通过打印结果我们可以看出,当已经有三个线程获取了资源访问的权利后,后面的线程只能等这三个线程中的一个释放了访问权利,才能接着访问资源。
Exchanger
Exchanger用于线程间的协作,进行数据交互。它提供一个同步点,在这个同步点,两个线程可用交互彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,他会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将生产出来的数据传递给对方。Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需要将纸质银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗,两人进行录入,录入到Excel后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否一致。
代码实例:
public class ExchangerDemo { /** * * 生产者 * */ static class Producer implements Runnable{ private List<String> producerBuffer; private final Exchanger<List<String>> exchanger; public Producer(List<String> producerBuffer,Exchanger<List<String>> exchanger){ this.producerBuffer=producerBuffer; this.exchanger=exchanger; } @Override public void run() { for (int i = 0; i < 5; i++) { producerBuffer.add("生产者"+i); System.out.println("第"+i+"次生产者在等待与消费者交换数据"); try { producerBuffer=exchanger.exchange(producerBuffer); System.out.println("第"+i+"次生产者与消费者交换后的数据为:"+producerBuffer.get(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** *消费者 */ static class Consumer implements Runnable{ private List<String> consumerBuffer; private final Exchanger<List<String>> exchanger; public Consumer(List<String> consumerBuffer,Exchanger<List<String>> exchanger) { this.consumerBuffer=consumerBuffer; this.exchanger=exchanger; } @Override public void run() { for (int i = 0; i < 5; i++) { consumerBuffer.add("消费者"+i); try { System.out.println("第"+i+"次消费者正在等待与生产者交换数据!"); consumerBuffer=exchanger.exchange(consumerBuffer); System.out.println("第"+i+"次消费者交换后的数据为:"+consumerBuffer.get(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { List<String> producerBuffer = new ArrayList<>(); List<String> consumerBuffer = new ArrayList<>(); Exchanger<List<String>> exchanger = new Exchanger<>(); ExchangerDemo.Producer producer = new Producer(producerBuffer, exchanger); ExchangerDemo.Consumer consumer = new Consumer(consumerBuffer, exchanger); Thread producerThread = new Thread(producer); Thread consumerThread = new Thread(consumer); producerThread.start(); consumerThread.start(); } }
输出结果为
第0次生产者在等待与消费者交换数据
第0次消费者正在等待与生产者交换数据!
第0次消费者交换后的数据为:生产者0
第0次生产者与消费者交换后的数据为:消费者0
第1次消费者正在等待与生产者交换数据!
第1次生产者在等待与消费者交换数据
第1次消费者交换后的数据为:生产者1
第1次生产者与消费者交换后的数据为:消费者1
第2次消费者正在等待与生产者交换数据!
第2次生产者在等待与消费者交换数据
第2次生产者与消费者交换后的数据为:消费者2
第2次消费者交换后的数据为:生产者2
第3次生产者在等待与消费者交换数据
第3次消费者正在等待与生产者交换数据!
第3次生产者与消费者交换后的数据为:消费者3
第3次消费者交换后的数据为:生产者3
第4次生产者在等待与消费者交换数据
第4次消费者正在等待与生产者交换数据!
第4次消费者交换后的数据为:生产者4
第4次生产者与消费者交换后的数据为:消费者4
如果两个线程,有一个线程没有调用exchange()方法,那么则会一直等待,因此,为避免一直等待,可以使用exchange(V x,long timeout,TimeUnit unit)设置等待时长
相关文章推荐
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 【Java并发编程的艺术】【学习笔记】并发工具类
- Java并发编程之2——同步工具类的使用(CountDownLatch,CyclicBarrier,BlockungQueue,Semaphore)
- Java中的并发工具类
- Java并发工具类详解
- java并发——同步工具类
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- Java并发系列7--LockSupport线程阻塞工具类
- java并发编程的艺术笔记第八章——java中的并发工具类
- Java并发工具类(四):线程间交换数据的Exchanger
- Java并发工具类Semaphore应用实例
- Java并发工具类
- java多线程解说【拾伍】_并发工具类:CountDownLatch
- java并发之同步工具类一之闭锁Latch
- java并发之同步工具类三之栅栏Barrier
- Java并发工具类之CountDownLatch
- Java并发工具类之同步屏障CyclicBarrier
- 【死磕Java并发】—– J.U.C之并发工具类:Semaphore
- 【死磕Java并发】-----J.U.C之并发工具类:Exchanger