Java多线程 -- JUC包源码分析9 -- AbstractQueuedSynchronizer深入分析-- Semaphore与CountDownLatch
2016-09-07 23:51
1221 查看
在前面分析ReentrantLock/ReentrantReadWriteLock的时候,我们已经对AQS进行过分析。在初步了解了AQS之后,本篇试图对其进行一个更为系统性的分析。因为AQS是为整个同步框架的基石,不光是锁,很多其他同步组件,比如Semaphore, CountDownLatch,也都是建立在AQS之上。
-AQS–同步框架的基石
-AQS的3个核心技术原理
–AQS源码解析
–AQS独占模式与共享模式
–Semaphore与CountDownLatch
(2)一对park/unpark原语,实现1个线程对另1个线程的精确控制:阻塞,唤醒
(3)用无所链表实现的,所有阻塞的线程,形成的一个阻塞队列
基本思路如下:
acquire的时候,判断state,state条件满足,进入临界区执行代码,不满足,把自己加入阻塞队列,然后阻塞自己;
release的时候,更新state,同时唤醒队列中的继任者,继任者唤醒之后,再次acquire拿锁,走acquire流程。
下面对AQS源码里面的关键函数进行分析;
//独占模式
acquire
acquireInterruptibly
release
//共享模型
acquireShared
acquireSharedInterruptibly
releaseShared
在上面的接口中,留了4个模板方法供子类来实现,也就是类图中的各种Sync:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
独占模式下的3个关键函数:
private void acquireQueued() //屏蔽中断,进入队列
private void doAcquireInterruptibly(int arg) //响应中断
private void unparkSuccessor(Node node) //减,唤醒继任者
共享模式下的2个关键函数:
private void doAcquireSharedInterruptibly(int arg) //加
private void doReleaseShared() { //减
下面一一分析:
这里说2个关键点:
(1)acquireQueued屏蔽中断,doAcquireInterruptibly响应中断。唯一区别就是,前者被中断唤醒之后,继续死循环拿锁,直到拿到为止;
后者被中断唤醒之后,直接break跳出for死循环了。
(2)不管入队列,还是出队列,都是在acquire的时候,发生的。 release的时候,只负责唤醒继任者。所以release的逻辑相当简单,关键acquire的比较复杂,整体思路如下:
step1: tryAcquire,成功,就直接执行后面代码了。失败, 没拿到锁
step2: 调用addWaiter把自己加入阻塞队列(注意,此时并没有阻塞!)
step3: 开始for循环, 尝试再去tryAcquire一次,如果拿到,出队列;
step4: 没有拿到,自己阻塞自己
step5: 被中断或者unparkSuccessor唤醒,唤醒之后;
如果不响应中断,回到步骤3,再尝试拿锁;拿不到,再阻塞;阻塞唤醒,再拿锁;。。。如此死循环,直到拿到锁,函数返回!!
如果响应中断,没拿到锁就返回,返回之前,取消acquire,即cancelAcquire;
以上就是独占模式的代码分析。在继续解析共享模式代码之前,先对“独占模式“与“共享模式“有个清晰的概念:
#独占模式与共享模式
独占模式:acquire的时候,只能有1个线程拿到资源;release的时候,1次也只能唤醒1个线程。ReentrantLock, ReentrantReadWriteLock.WriteLock,都用的是AQS的独占模式;
共享模式:acquire的时候,可以有多个线程同时拿到资源;release的时候,队列中的所有线程都被唤醒,也就是propagate,然后同时再去拿锁。
ReentrantReadWriteLock.ReadLock, Semaphore, CountDownLatch都是用的共享模式。
比如对于Semaphore来说:acquire的时候,只要state > 0,就可拿到锁;
release的时候,不一定只释放1个,可以release(n),也就意味着同时释放n个资源,阻塞队列中所有线程被唤醒,然后有n个线程可以同时拿到锁;
在比如对于CountDownLatch来说,release到countDown = 0 的时候,所有等待countDown = 0 的线程都被唤醒,都同时拿到锁。
这里有个关键点:doReleaseShared会逐个唤醒等待队列中的所有线程,然后这所有线程又再次去争抢锁。对于CountDownLatch来说,所有线程都会争抢到countDown = 0,所以所有线程拿到锁;对于Semaphore.release(1)来说,只释放了1个资源,但唤醒了所有线程,然后只有1个线程可以拿到资源,剩下的n-1个线程,再次进入阻塞状态。
#Semaphore 与 CountDownLatch
搞清楚了AQS的共享模式原理,Semaphore和CountDownLatch就很简单了。2者只是对state变量的判断策略不一样而已,可以说刚好相反:
Semaphore: state > 0,进入;state = 0 阻塞。
CountDownLatch: state > 0, 阻塞;state = 0,进入;
下面比较一下2者的tryAcquireShared就知道了:
同时,tryRelaseShared也是刚好相反,一个是++,一个是–
总结:
共享模式与独占模式,最大的区别在于:共享模式下,一次release会唤醒队列中的所有线程,即使这次release只释放了一个资源,只有一个线程可以拿到资源。所有线程唤醒之后,再次去争夺资源,可能所有线程都能争夺到(比如CountDownLatch),也可能只有一个线程争夺到(比如Semaphore.release(1))。
-AQS–同步框架的基石
-AQS的3个核心技术原理
–AQS源码解析
–AQS独占模式与共享模式
–Semaphore与CountDownLatch
AQS-同步框架的基石
下图展示了整个JUC包中,继承自AQS的类。可以看到,ReentrantLock/ReentrantReadWriteLock, Semaphone, CountDownLatch, FutureTask等组件,都是建立在AQS之上。AQS的3个核心技术原理
(1) 一个int型的原子变量state。所有线程对这个变量进行cas访问,来判断自己是应该阻塞,还是进入临界区。(2)一对park/unpark原语,实现1个线程对另1个线程的精确控制:阻塞,唤醒
(3)用无所链表实现的,所有阻塞的线程,形成的一个阻塞队列
基本思路如下:
acquire的时候,判断state,state条件满足,进入临界区执行代码,不满足,把自己加入阻塞队列,然后阻塞自己;
release的时候,更新state,同时唤醒队列中的继任者,继任者唤醒之后,再次acquire拿锁,走acquire流程。
下面对AQS源码里面的关键函数进行分析;
AQS源码解析
AQS的接口层
AQS的对外提供的接口主要有以下几个://独占模式
acquire
acquireInterruptibly
release
//共享模型
acquireShared
acquireSharedInterruptibly
releaseShared
public final void acquire(int arg) { if (!tryAcquire(arg) && //模板方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) //模板方法 doAcquireInterruptibly(arg); } public final boolean release(int arg) { if (tryRelease(arg)) { //模板方法 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //模板方法 doAcquireShared(arg); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //模板方法 doAcquireSharedInterruptibly(arg); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //模板方法 doReleaseShared(); return true; } return false; }
在上面的接口中,留了4个模板方法供子类来实现,也就是类图中的各种Sync:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
AQS的几个核心函数
从上面的代码中,可以看到,AQS内部的实现细节,主要一下几个函数:独占模式下的3个关键函数:
private void acquireQueued() //屏蔽中断,进入队列
private void doAcquireInterruptibly(int arg) //响应中断
private void unparkSuccessor(Node node) //减,唤醒继任者
共享模式下的2个关键函数:
private void doAcquireSharedInterruptibly(int arg) //加
private void doReleaseShared() { //减
下面一一分析:
独占模式核心代码分析
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)); //addWaiter,入队列 final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //去拿锁 setHead(node); //出对列 p.next = null; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //阻塞自己 interrupted = true; //从阻塞中被唤醒,回到上面。继续死循环 } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } } private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); //入队列 try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //去拿锁 setHead(node); //出对列 p.next = null; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //阻塞自己 break; //被中断唤醒,直接退出 } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); //被中断唤醒,没拿到锁退出了,取消accquire throw new InterruptedException(); } private void unparkSuccessor(Node node) { //唤醒后继者 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
这里说2个关键点:
(1)acquireQueued屏蔽中断,doAcquireInterruptibly响应中断。唯一区别就是,前者被中断唤醒之后,继续死循环拿锁,直到拿到为止;
后者被中断唤醒之后,直接break跳出for死循环了。
(2)不管入队列,还是出队列,都是在acquire的时候,发生的。 release的时候,只负责唤醒继任者。所以release的逻辑相当简单,关键acquire的比较复杂,整体思路如下:
step1: tryAcquire,成功,就直接执行后面代码了。失败, 没拿到锁
step2: 调用addWaiter把自己加入阻塞队列(注意,此时并没有阻塞!)
step3: 开始for循环, 尝试再去tryAcquire一次,如果拿到,出队列;
step4: 没有拿到,自己阻塞自己
step5: 被中断或者unparkSuccessor唤醒,唤醒之后;
如果不响应中断,回到步骤3,再尝试拿锁;拿不到,再阻塞;阻塞唤醒,再拿锁;。。。如此死循环,直到拿到锁,函数返回!!
如果响应中断,没拿到锁就返回,返回之前,取消acquire,即cancelAcquire;
以上就是独占模式的代码分析。在继续解析共享模式代码之前,先对“独占模式“与“共享模式“有个清晰的概念:
#独占模式与共享模式
独占模式:acquire的时候,只能有1个线程拿到资源;release的时候,1次也只能唤醒1个线程。ReentrantLock, ReentrantReadWriteLock.WriteLock,都用的是AQS的独占模式;
共享模式:acquire的时候,可以有多个线程同时拿到资源;release的时候,队列中的所有线程都被唤醒,也就是propagate,然后同时再去拿锁。
ReentrantReadWriteLock.ReadLock, Semaphore, CountDownLatch都是用的共享模式。
比如对于Semaphore来说:acquire的时候,只要state > 0,就可拿到锁;
release的时候,不一定只释放1个,可以release(n),也就意味着同时释放n个资源,阻塞队列中所有线程被唤醒,然后有n个线程可以同时拿到锁;
在比如对于CountDownLatch来说,release到countDown = 0 的时候,所有等待countDown = 0 的线程都被唤醒,都同时拿到锁。
共享模式代码分析
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //入队列 try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); //去拿锁 if (r >= 0) { setHeadAndPropagate(node, r); //拿到锁,出队,同时唤醒后继者 p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //阻塞自己 break; //被中断唤醒 } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); //被中断唤醒,没拿到锁,退出 throw new InterruptedException(); } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); //关键点 } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //唤醒继任者 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
这里有个关键点:doReleaseShared会逐个唤醒等待队列中的所有线程,然后这所有线程又再次去争抢锁。对于CountDownLatch来说,所有线程都会争抢到countDown = 0,所以所有线程拿到锁;对于Semaphore.release(1)来说,只释放了1个资源,但唤醒了所有线程,然后只有1个线程可以拿到资源,剩下的n-1个线程,再次进入阻塞状态。
#Semaphore 与 CountDownLatch
搞清楚了AQS的共享模式原理,Semaphore和CountDownLatch就很简单了。2者只是对state变量的判断策略不一样而已,可以说刚好相反:
Semaphore: state > 0,进入;state = 0 阻塞。
CountDownLatch: state > 0, 阻塞;state = 0,进入;
下面比较一下2者的tryAcquireShared就知道了:
//CountDownLatch.Sync protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; //state = 0, 返回true } //Semaphore.NonFairSync final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || //跟CountDownLatch刚好相反,state >= 0 返回true compareAndSetState(available, remaining)) return remaining; } } //Semaphore.FairSync protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || //跟CountDownLatch刚好相反,state > = 0 返回ture compareAndSetState(available, remaining)) return remaining; } }
同时,tryRelaseShared也是刚好相反,一个是++,一个是–
//CountDownLatch.Sync protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; //资源-- if (compareAndSetState(c, nextc)) return nextc == 0; } } //Semaphore.Sync protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; //资源++ if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
总结:
共享模式与独占模式,最大的区别在于:共享模式下,一次release会唤醒队列中的所有线程,即使这次release只释放了一个资源,只有一个线程可以拿到资源。所有线程唤醒之后,再次去争夺资源,可能所有线程都能争夺到(比如CountDownLatch),也可能只有一个线程争夺到(比如Semaphore.release(1))。
相关文章推荐
- Java并发系列之AbstractQueuedSynchronizer源码分析(概要分析)
- 【Java8源码分析】locks包-AbstractQueuedSynchronizer同步器
- Java并发系列之AbstractQueuedSynchronizer源码分析(独占模式)
- JUC-AbstractQueuedSynchronizer(AQS)源码分析
- 深入分析AbstractQueuedSynchronizer共享锁的实现原理:CountDownLatch
- Java并发包源码学习之AQS框架(四)AbstractQueuedSynchronizer源码分析
- JUC - AbstractQueuedSynchronizer(AQS) 源码分析
- Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)
- Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)
- Java并发编程-AbstractQueuedSynchronizer源码分析
- Java锁及AbstractQueuedSynchronizer源码分析
- JAVA FutureTask之AbstractQueuedSynchronizer 源码分析
- 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(上)
- 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(上)
- AQS源码分析(AbstractQueuedSynchronizer)
- Java多线程 -- JUC包源码分析1 -- CAS/乐观锁
- 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(上)
- 深度解析Java8 – AbstractQueuedSynchronizer的实现分析(下)
- java并发:AbstractQueuedSynchronizer的介绍和原理分析
- JUC包之AbstractQueuedSynchronizer源码学习