关于Java并发CountDownLatch与CyclicBarrier
2013-03-26 09:24
176 查看
最近在工作中有一个需求,要求N个子模块线程同时启动,等这N个子模块线程都完成后,再启动一个线程进行一些收尾工作,Java并发包中CountDownLatch与CyclicBarrier都可以达到此要求。
CountDownLatch使用场景如下:
这里只有当countDown()方法执行后,Room线程才会执行下去并打印123,CyclicBarrier使用场景如下:
两个类实现的功能基本是差不多的,那么接下来研究一下CountDownLatch的内部实现,
构造方法实现了一个类Sync,而Sync实现了Java并发包的核心类AQS(AbstractQueuedSynchronizer),
重写了tryAcquireShared与tryReleaseShared方法,所以CountDownLatch是共享锁的一个实现,所谓共享锁是说所有共享锁的线程共享同一个资源,一旦任意一个线程拿到共享资源,那么所有线程就都拥有的同一份资源。也就是通常情况下共享锁只是一个标志,所有线程都等待这个标识是否满足,一旦满足所有线程都被激活,在CountDownLatch的await方法中,最终就会调用到AQS的doAcquireSharedInterruptibly方法
该方法首先会将该线程以结点的形式加到AQS的FIFO任务队列,然后将会调用到parkAndCheckInterrupt方法直接调用了LockSupport的底层代码将该线程阻塞住,而CountDownLatch的countDown方法则很简单,将其状态位每次减1,减到为0时调用到了AQS的doReleaseShared方法
在这里unparkSuccessor方法会将AQS FIFO队列头结点后的第一个结点(await方法阻塞住的结点)解锁
而CyclicBarrier的实现较CountDownLatch简单,没有用到复杂的AQS,CyclicBarrier的实现只用到了锁及其条件变量Condition
其构造方法将其变量传入
调用期await方法时则调用了doawait方法,并在加锁的情况下进行
总体来说,每个线程调用则用条件变量Condition将该线程阻塞住(BlockingQuere中也用到了类似的机制)并将状态位减1,当減到为0时运行传入的线程,最后调用条件变量的signalAll方法将阻塞住的线程全部释放
CountDownLatch使用场景如下:
public class Test { public static void main(String[] args) throws Exception{ Room r = new Room(); new Thread(r).start(); r.startLatch.countDown(); } } class Room implements Runnable { final CountDownLatch startLatch = new CountDownLatch(1); public void run() { try { startLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("123"); } }
这里只有当countDown()方法执行后,Room线程才会执行下去并打印123,CyclicBarrier使用场景如下:
public class Test { public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(4); final CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { System.out.println("it is ok!"); } }); for (int i = 0; i < 4; i++) { exec.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+ "执行完毕!"); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } exec.shutdown(); } }
两个类实现的功能基本是差不多的,那么接下来研究一下CountDownLatch的内部实现,
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
构造方法实现了一个类Sync,而Sync实现了Java并发包的核心类AQS(AbstractQueuedSynchronizer),
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } public int tryAcquireShared(int acquires) { return getState() == 0? 1 : -1; } public boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
重写了tryAcquireShared与tryReleaseShared方法,所以CountDownLatch是共享锁的一个实现,所谓共享锁是说所有共享锁的线程共享同一个资源,一旦任意一个线程拿到共享资源,那么所有线程就都拥有的同一份资源。也就是通常情况下共享锁只是一个标志,所有线程都等待这个标识是否满足,一旦满足所有线程都被激活,在CountDownLatch的await方法中,最终就会调用到AQS的doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }
该方法首先会将该线程以结点的形式加到AQS的FIFO任务队列,然后将会调用到parkAndCheckInterrupt方法直接调用了LockSupport的底层代码将该线程阻塞住,而CountDownLatch的countDown方法则很简单,将其状态位每次减1,减到为0时调用到了AQS的doReleaseShared方法
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
在这里unparkSuccessor方法会将AQS FIFO队列头结点后的第一个结点(await方法阻塞住的结点)解锁
而CyclicBarrier的实现较CountDownLatch简单,没有用到复杂的AQS,CyclicBarrier的实现只用到了锁及其条件变量Condition
其构造方法将其变量传入
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
调用期await方法时则调用了doawait方法,并在加锁的情况下进行
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
总体来说,每个线程调用则用条件变量Condition将该线程阻塞住(BlockingQuere中也用到了类似的机制)并将状态位减1,当減到为0时运行传入的线程,最后调用条件变量的signalAll方法将阻塞住的线程全部释放
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
相关文章推荐
- Java并发工具类CountDownLatch和CyclicBarrier
- 面试问题:关于java并发方面的
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 关于java集合的遍历以及ConcurrentModificationException(并发操作异常)
- Java中关于同步,异步,多线程,多线程同步,并发,并行的一些总结
- 关于JAVA中的static方法、并发问题以及JAVA运行时内存模型
- java关于并发的总结之二
- Java 并发编程之 CountDownLatch
- <关于并发框架>Java原生线程池原理及Guava与之的补充
- 关于java高并发的一些感悟和经验已经我自己找的资料
- 【死磕Java并发】-----J.U.C之并发工具类:CountDownLatch
- [Java并发包学习五]CountDownLatch和CyclicBarrier介绍
- Java并发工具类CountDownLatch
- 【Java8源码分析】并发包-CyclicBarrier
- java 多线程并发之-- CountDownLatch
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- java并发编程之--CountDownLatch
- java编程思想笔记-并发之CountDownLatch
- Java并发工具类之同步屏障CyclicBarrier
- Java线程总结(十二):并发包------CountDownLatch