关于java多线程浅析七:CountDownLatch的原理分析和使用
2017-06-19 19:05
369 查看
什么是CountDownLatch
CountDownLatch与CyclicBarrier一样,也是一个用与同步的辅助类,它的使用场景是:在一个或者一组其他线程没有执行完毕之前,使当前线程进行等待,只有其他的线程全部完成执行完毕后,当前线程才能继续进行。前一篇我们介绍了CyclicBarrier,在这里说一下CountDownLatch与CyclicBarrier的区别。CyclicBarrier是执行了CyclicBarrier.await( )方法的几个线程之间互相等待,当所有的其他线程都执行了await方法时(除了主线程),所有线程继续执行;而CountDownLatch是一个线程等待一个或者多个其他线程都执行完毕,并且调用了CountDownLatch.countDown( )方法后,才能继续执行。
CountDownLatch和CyclicBarrier都有一个计数器的概念,但是CyclicBarrier的计数器可以重置继续使用(有一个 Generation 的概念,可以参考上一篇博客);而CountDownLatch则没有这个概念,计数器不能被重置,所以不能重复使用。
CountDownLatch使用示例
下面通过一个示例来了解一下CountDownLatch的使用。/** * Created by fei on 2017/6/9. */ public class CountDownLatchDemo { public static final int INIT_SIZE = 3; private static CountDownLatch countDownLatch; public static void main(String[] args) { try { System.out.println("我想结婚"); countDownLatch = new CountDownLatch(INIT_SIZE); new ThreadDemo("求婚成功").start(); new ThreadDemo("双方父母点头了").start(); new ThreadDemo("房子也买好了").start(); countDownLatch.await(); Thread.sleep(1000); System.out.println("好了,可以去领证了"); } catch (InterruptedException e) { e.printStackTrace(); } } static class ThreadDemo extends Thread { public ThreadDemo(String name) { super(name); } @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
哈哈,举了一个小例子,正常操作下,只有这几个条件都满足了,也就是这几个线程(求婚成功、双方父母点头了、房子也买好了)都执行完了才可能去执行领证操作。
我想结婚 求婚成功 房子也买好了 双方父母点头了 好了,可以去领证了
从示例的代码上可以看出来,这个CountDownLatch的使用和CyclicBarrier有异曲同工之妙。具体的,我们再放张图片来加强理解:
图中的示例和代码表达的意思是一样的,就是利用CountDownLatch来实现不同线程之间的协同。其中,这个 cnt 变量,也就是代码中的 INIT_SIZE int类型变量是关键,它代表的意思就是执行了CountDownLatch.await( )方法的线程需要另外的线程执行几次 CountDownLatch.countDown( )方法。下面,就根据具体方法的源码来看一下CountDownLatch是怎样实现的。
CountDownLatch源码解析
CountDownLatch的源码不是很长,我就将去掉注释的全部源码展示在这里:package java.util.concurrent; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; 4000 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
可以看出,CountDownLatch的源码不是很难,我们从主要的await方法和countDown方法讲起。
await方法
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
可以看出,await实际上调用的是AQS的acquireSharedInterruptibly(1)方法,acquireSharedInterruptibly()方法如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//如果线程是中断状态,则抛出中断异常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取锁操作 //如果尝试获取锁失败,则调用这个方法,会使线程一直不断的获取锁,直到获取到锁, //或者该线程中断 doAcquireSharedInterruptibly(arg); }
在上面的全部的源码中可以看到, tryAcquireShared方法已经被重写了:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
getState() = 0 就是表示此时的锁是没有被占用的,是可以获取的状态。 否则,执行doAcquireSharedInterruptibly( )方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //创建包裹当前线程的节点,初始化为“共享锁”,将Node节点添加到AQS维护的队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //获取前一个节点 final Node p = node.predecessor(); //如果前一个节点是头节点,就进行获取锁操作 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; //获取锁成功,返回 } } //到这里就是进行等待,一直不断的获取锁 //这两个方法下面拿出来讲 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire方法
// 根据名字就能猜到这个方法的意思 //获取锁失败后当前结点(线程)是否应该足阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前一个节点的状态 int ws = pred.waitStatus; // 如果前一个节点是SIGNAL状态,就意味这当前线程应该被unpark唤醒。 //并且这里返回true。 if (ws == Node.SIGNAL) return true; // 如果前一个节点的状态 > 0 //(这里代表的是线程已取消状态,有关Node节点,将在下一个章节中讲解) if (ws > 0) { //向前寻找,一直找到不为取消状态的节点(节点pred) do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;//把当前结点设置为节点pred的后继节点 } else { // 如果前一个节点 < 0,则设置前继节点为SIGNAL状态。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
可能看到这里会对waitStatus的状态有一些不理解,再详细讲解Node节点之前,我们先说一下,这里的waitStatus都有哪些状态。
//下面是AQS类中,Node节点的源码片段,针对的是waitStatus的状态值 // 线程已被取消 static final int CANCELLED = 1; // 当前结点(线程)被释放掉或者取消掉时,它的后继节点(线程)需要被唤醒 // 这个SIGNAL就可以理解成“我要通知下一个节点可以被唤醒了” static final int SIGNAL = -1; // 线程在等待Condition唤醒 static final int CONDITION = -2; // 其它线程获取到“共享锁” static final int PROPAGATE = -3; // 值得注意的是,当 waitStatus=0时,意味着当前线程不属于上面的任何一种状态。 volatile int waitStatus;
看完Node中关于节点状态的的解释,上面的代码应该容易理解多了。
parkAndCheckInterrupt方法
和shouldParkAfterFailedAcquire方法一样,parkAndCheckInterrupt也是定义在AQS类中的方法。private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
这个方法很简单,就是调用LockSupport的park方法,将当前线程进行阻塞(顺便提一嘴,JUC包中的线程的阻塞和唤醒,都是调用的LockSupport这个类),然后返回线程的中断状态。
countDown方法
public void countDown() { sync.releaseShared(1); }
countDown方法调用的是sync的releaseShared()方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
可以看出,releaseShared方法先是调用tryReleaseShared方法获取锁,如果获取失败,则调用doReleaseShared方法再进行获取锁操作。(大神不愧是大神!其实doReleaseShared方法中调用的还是tryReleaseShared方法,为啥要这样写?让我写可能就直接硬生生的去做获取锁操作了)
这里的tryReleaseShared被CountDownLatch覆盖了:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; // 调用CAS函数进行赋值。 if (compareAndSetState(c, nextc)) return nextc == 0; } }
这个方法也很好理解,就是针对 state 进行减操作。
源码分析总结
其实看完countDown方法就应该可以看出CountDownLatch具体的设计与使用思路了,就是使用CountDownLatch的时候,先进行”锁计数器”(也就是 INIT_SIZE 参数,或者更简单的说是CountDownLatch这个类的“Count”),这个数值很关键,它决定了调用CountDownLatch.await()方法的线程想要继续运行下去,则需要多少次调用CountDownLatch.countDown()方法。当调countDown方法时,锁计数器减 1 ,知道所计数器为 0 时,执行CountDownLatch.await()的线程才能获取到锁,从而继续执行。相关文章推荐
- CyclicBarrier、CountDownLatch以及Semaphore使用及其原理分析
- 关于Java的数据结构HashMap,ArrayList的使用总结及使用场景和原理分析
- java.util.concurrent 下的Semaphore CyclicBarrier CountDownLatch 分析使用
- CountDownLatch原理分析
- 分析CountDownLatch的实现原理
- Java 线程同步组件 CountDownLatch 与 CyclicBarrier 原理分析
- 关于前端惰性加载(jquery_lazyload)的使用和原理分析
- 关于前端惰性加载(jquery_lazyload)的使用和原理分析
- 关于CCTexure2D使用opengl实现绘制的原理分析
- 关于java多线程浅析六: CyclicBarrier的原理分析和使用
- Java并发Concurrent包的锁(六)——CountDownLatch源码分析及使用
- 关于使用synchronized实现线程安全的原理分析
- Web端服务器推送技术原理分析及dwr框架简单的使用
- 关于函数strtok和strtok_r的使用要点和实现原理(二)
- AIDL原理分析与使用
- Java NIO使用及原理分析(三)
- Java NIO使用及原理分析(三)
- spark 性能调优(一) 性能调优的本质、spark资源使用原理、调优要点分析
- SpringMVC关于json、xml自动转换的原理研究[附带源码分析]
- 使用ToolRunner运行Hadoop程序基本原理分析