您的位置:首页 > 产品设计 > UI/UE

Java多线程 -- JUC包源码分析9 -- AbstractQueuedSynchronizer深入分析-- Semaphore与CountDownLatch

2016-09-07 23:51 1221 查看
在前面分析ReentrantLock/ReentrantReadWriteLock的时候,我们已经对AQS进行过分析。在初步了解了AQS之后,本篇试图对其进行一个更为系统性的分析。因为AQS是为整个同步框架的基石,不光是锁,很多其他同步组件,比如Semaphore, CountDownLatch,也都是建立在AQS之上。

-AQS–同步框架的基石

-AQS的3个核心技术原理

AQS源码解析

AQS独占模式与共享模式

Semaphore与CountDownLatch

AQS-同步框架的基石

下图展示了整个JUC包中,继承自AQS的类。可以看到,ReentrantLock/ReentrantReadWriteLock, Semaphone, CountDownLatch, FutureTask等组件,都是建立在AQS之上。



AQS的3个核心技术原理

(1) 一个int型的原子变量state。所有线程对这个变量进行cas访问,来判断自己是应该阻塞,还是进入临界区。

(2)一对park/unpark原语,实现1个线程对另1个线程的精确控制:阻塞,唤醒

(3)用无所链表实现的,所有阻塞的线程,形成的一个阻塞队列

基本思路如下:

acquire的时候,判断state,state条件满足,进入临界区执行代码,不满足,把自己加入阻塞队列,然后阻塞自己;

release的时候,更新state,同时唤醒队列中的继任者,继任者唤醒之后,再次acquire拿锁,走acquire流程。

下面对AQS源码里面的关键函数进行分析;

AQS源码解析

AQS的接口层

AQS的对外提供的接口主要有以下几个:

//独占模式

acquire

acquireInterruptibly

release

//共享模型

acquireShared

acquireSharedInterruptibly

releaseShared

public final void acquire(int arg) {
if (!tryAcquire(arg) &&   //模板方法
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))     //模板方法
doAcquireInterruptibly(arg);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {       //模板方法
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)     //模板方法
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)      //模板方法
doAcquireSharedInterruptibly(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {       //模板方法
doReleaseShared();
return true;
}
return false;
}


在上面的接口中,留了4个模板方法供子类来实现,也就是类图中的各种Sync:

tryAcquire

tryRelease

tryAcquireShared

tryReleaseShared

AQS的几个核心函数

从上面的代码中,可以看到,AQS内部的实现细节,主要一下几个函数:

独占模式下的3个关键函数:

private void acquireQueued() //屏蔽中断,进入队列

private void doAcquireInterruptibly(int arg) //响应中断

private void unparkSuccessor(Node node) //减,唤醒继任者

共享模式下的2个关键函数:

private void doAcquireSharedInterruptibly(int arg) //加

private void doReleaseShared() { //减

下面一一分析:

独占模式核心代码分析

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)); //addWaiter,入队列

final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {    //去拿锁
setHead(node);   //出对列
p.next = null;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())   //阻塞自己
interrupted = true;   //从阻塞中被唤醒,回到上面。继续死循环
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);   //入队列
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {  //去拿锁
setHead(node);  //出对列
p.next = null;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())   //阻塞自己
break;                     //被中断唤醒,直接退出
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);  //被中断唤醒,没拿到锁退出了,取消accquire
throw new InterruptedException();
}

private void unparkSuccessor(Node node) {  //唤醒后继者
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}


这里说2个关键点:

(1)acquireQueued屏蔽中断,doAcquireInterruptibly响应中断。唯一区别就是,前者被中断唤醒之后,继续死循环拿锁,直到拿到为止;

后者被中断唤醒之后,直接break跳出for死循环了。

(2)不管入队列,还是出队列,都是在acquire的时候,发生的。 release的时候,只负责唤醒继任者。所以release的逻辑相当简单,关键acquire的比较复杂,整体思路如下:

step1: tryAcquire,成功,就直接执行后面代码了。失败, 没拿到锁

step2: 调用addWaiter把自己加入阻塞队列(注意,此时并没有阻塞!)

step3: 开始for循环, 尝试再去tryAcquire一次,如果拿到,出队列;

step4: 没有拿到,自己阻塞自己

step5: 被中断或者unparkSuccessor唤醒,唤醒之后;

如果不响应中断,回到步骤3,再尝试拿锁;拿不到,再阻塞;阻塞唤醒,再拿锁;。。。如此死循环,直到拿到锁,函数返回!!

如果响应中断,没拿到锁就返回,返回之前,取消acquire,即cancelAcquire;

以上就是独占模式的代码分析。在继续解析共享模式代码之前,先对“独占模式“与“共享模式“有个清晰的概念:

#独占模式与共享模式

独占模式:acquire的时候,只能有1个线程拿到资源;release的时候,1次也只能唤醒1个线程。ReentrantLock, ReentrantReadWriteLock.WriteLock,都用的是AQS的独占模式;

共享模式:acquire的时候,可以有多个线程同时拿到资源;release的时候,队列中的所有线程都被唤醒,也就是propagate,然后同时再去拿锁。

ReentrantReadWriteLock.ReadLock, Semaphore, CountDownLatch都是用的共享模式。

比如对于Semaphore来说:acquire的时候,只要state > 0,就可拿到锁;

release的时候,不一定只释放1个,可以release(n),也就意味着同时释放n个资源,阻塞队列中所有线程被唤醒,然后有n个线程可以同时拿到锁;

在比如对于CountDownLatch来说,release到countDown = 0 的时候,所有等待countDown = 0 的线程都被唤醒,都同时拿到锁。

共享模式代码分析

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);   //入队列
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);   //去拿锁
if (r >= 0) {
setHeadAndPropagate(node, r);  //拿到锁,出队,同时唤醒后继者
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())   //阻塞自己
break;                     //被中断唤醒
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);                //被中断唤醒,没拿到锁,退出
throw new InterruptedException();
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();   //关键点
}
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);   //唤醒继任者
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}


这里有个关键点:doReleaseShared会逐个唤醒等待队列中的所有线程,然后这所有线程又再次去争抢锁。对于CountDownLatch来说,所有线程都会争抢到countDown = 0,所以所有线程拿到锁;对于Semaphore.release(1)来说,只释放了1个资源,但唤醒了所有线程,然后只有1个线程可以拿到资源,剩下的n-1个线程,再次进入阻塞状态。

#Semaphore 与 CountDownLatch

搞清楚了AQS的共享模式原理,Semaphore和CountDownLatch就很简单了。2者只是对state变量的判断策略不一样而已,可以说刚好相反:

Semaphore: state > 0,进入;state = 0 阻塞。

CountDownLatch: state > 0, 阻塞;state = 0,进入;

下面比较一下2者的tryAcquireShared就知道了:

//CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;   //state = 0, 返回true
}

//Semaphore.NonFairSync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||   //跟CountDownLatch刚好相反,state >= 0 返回true
compareAndSetState(available, remaining))
return remaining;
}
}

//Semaphore.FairSync
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||    //跟CountDownLatch刚好相反,state > = 0 返回ture
compareAndSetState(available, remaining))
return remaining;
}
}


同时,tryRelaseShared也是刚好相反,一个是++,一个是–

//CountDownLatch.Sync
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;     //资源--
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

//Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;   //资源++
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}


总结:

共享模式与独占模式,最大的区别在于:共享模式下,一次release会唤醒队列中的所有线程,即使这次release只释放了一个资源,只有一个线程可以拿到资源。所有线程唤醒之后,再次去争夺资源,可能所有线程都能争夺到(比如CountDownLatch),也可能只有一个线程争夺到(比如Semaphore.release(1))。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息