您的位置:首页 > 编程语言 > Java开发

关于Java并发CountDownLatch与CyclicBarrier

2013-03-26 09:24 176 查看
最近在工作中有一个需求,要求N个子模块线程同时启动,等这N个子模块线程都完成后,再启动一个线程进行一些收尾工作,Java并发包中CountDownLatch与CyclicBarrier都可以达到此要求。
CountDownLatch使用场景如下:

public class Test {
public static void main(String[] args) throws Exception{
Room r = new Room();
new Thread(r).start();
r.startLatch.countDown();

}

}

class Room implements Runnable {

final CountDownLatch startLatch = new CountDownLatch(1);

public void run() {
try {
startLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("123");
}
}


这里只有当countDown()方法执行后,Room线程才会执行下去并打印123,CyclicBarrier使用场景如下:

public class Test {

public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(4);
final CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("it is ok!");
}
});

for (int i = 0; i < 4; i++) {
exec.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ "执行完毕!");
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
exec.shutdown();
}
}

 
两个类实现的功能基本是差不多的,那么接下来研究一下CountDownLatch的内部实现,

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}


构造方法实现了一个类Sync,而Sync实现了Java并发包的核心类AQS(AbstractQueuedSynchronizer),

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
}

public boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}


重写了tryAcquireShared与tryReleaseShared方法,所以CountDownLatch是共享锁的一个实现,所谓共享锁是说所有共享锁的线程共享同一个资源,一旦任意一个线程拿到共享资源,那么所有线程就都拥有的同一份资源。也就是通常情况下共享锁只是一个标志,所有线程都等待这个标识是否满足,一旦满足所有线程都被激活,在CountDownLatch的await方法中,最终就会调用到AQS的doAcquireSharedInterruptibly方法

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}

 
该方法首先会将该线程以结点的形式加到AQS的FIFO任务队列,然后将会调用到parkAndCheckInterrupt方法直接调用了LockSupport的底层代码将该线程阻塞住,而CountDownLatch的countDown方法则很简单,将其状态位每次减1,减到为0时调用到了AQS的doReleaseShared方法

private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

 
在这里unparkSuccessor方法会将AQS FIFO队列头结点后的第一个结点(await方法阻塞住的结点)解锁

而CyclicBarrier的实现较CountDownLatch简单,没有用到复杂的AQS,CyclicBarrier的实现只用到了锁及其条件变量Condition

其构造方法将其变量传入

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}


调用期await方法时则调用了doawait方法,并在加锁的情况下进行

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) {  // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

 
总体来说,每个线程调用则用条件变量Condition将该线程阻塞住(BlockingQuere中也用到了类似的机制)并将状态位减1,当減到为0时运行传入的线程,最后调用条件变量的signalAll方法将阻塞住的线程全部释放

private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: