您的位置:首页 > 其它

关于多线程同步的初步教程--Barrier的设计及使用

2008-03-04 10:12 543 查看
Barrier是一个多线程编程中经常要用到的同步工具,尤其多用于大数据量的计算过程中的同步。本文以广为流程的Doug Lea的concurrent工具包的Barrier实现为例,进行一点探讨。在Doug Lea的concurrent工具包中,Barrier是一个接口,在concurrent包中提供了两个Barrier的实现:CyclicBarrier和Rendezvous。下面是Barrier接口的定义:

public interface Barrier {

/**

* Return the number of parties that must meet per barrier

* point. The number of parties is always at least 1.

**/

public int parties();

/**

* Returns true if the barrier has been compromised

* by threads leaving the barrier before a synchronization

* point (normally due to interruption or timeout).

* Barrier methods in implementation classes throw

* throw BrokenBarrierException upon detection of breakage.

* Implementations may also support some means

* to clear this status.

**/

public boolean broken();

}

Barrier接口中的方法非常简单,parties()返回所有需要在屏障处同步的线程数;broken()返回一个标志,指示释放是否已被破坏。Barrier接口中并没有提供加入屏障的方法,而是在c和Rendezvous的Barrier实现中提供的。你可以会疑问,为什么不在Barrier接口中提供这些方法呢?因为这些实现的差异迥异,以至于很难在这些实现中提炼出一个共用的方法签名。比如,对于CyclicBarrier加入屏障的方法是:barrier(),

// CyclicBarrier.java

public int barrier() throws InterruptedException, BrokenBarrierException {

return doBarrier(false, 0);

}

protected synchronized int doBarrier(boolean timed, long msecs)

throws InterruptedException, TimeoutException, BrokenBarrierException {

int index = --count_;

if (broken_) {

throw new BrokenBarrierException(index);

}

else if (Thread.interrupted()) {

broken_ = true;

notifyAll();

throw new InterruptedException();

}

else if (index == 0) { // tripped

count_ = parties_;

++resets_;

notifyAll();

try {

if (barrierCommand_ != null)

barrierCommand_.run();

return 0;

}

catch (RuntimeException ex) {

broken_ = true;

return 0;

}

}

else if (timed && msecs <= 0) {

broken_ = true;

notifyAll();

throw new TimeoutException(msecs);

}

else { // wait until next reset

int r = resets_;

long startTime = (timed)? System.currentTimeMillis() : 0;

long waitTime = msecs;

for (;;) {

try {

wait(waitTime);

}

catch (InterruptedException ex) {

// Only claim that broken if interrupted before reset

if (resets_ == r) {

broken_ = true;

notifyAll();

throw ex;

}

else {

Thread.currentThread().interrupt(); // propagate

}

}

if (broken_)

throw new BrokenBarrierException(index);

else if (r != resets_)

return index;

else if (timed) {

waitTime = msecs - (System.currentTimeMillis() - startTime);

if (waitTime <= 0) {

broken_ = true;

notifyAll();

throw new TimeoutException(msecs);

}

}

}

}

}

而Rendezvous中则是:rendezvous(Object x):

// Rendezvous.java

public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException {

return doRendezvous(x, false, 0);

}

protected Object doRendezvous(Object x, boolean timed, long msecs)

throws InterruptedException, TimeoutException, BrokenBarrierException {

// rely on semaphore to throw interrupt on entry

long startTime;

if (timed) {

startTime = System.currentTimeMillis();

if (!entryGate_.attempt(msecs)) {

throw new TimeoutException(msecs);

}

}

else {

startTime = 0;

entryGate_.acquire();

}

synchronized(this) {

Object y = null;

int index = entries_++;

slots_[index] = x;

try {

// last one in runs function and releases

if (entries_ == parties_) {

departures_ = entries_;

notifyAll();

try {

if (!broken_ && rendezvousFunction_ != null)

rendezvousFunction_.rendezvousFunction(slots_);

}

catch (RuntimeException ex) {

broken_ = true;

}

}

else {

while (!broken_ && departures_ < 1) {

long timeLeft = 0;

if (timed) {

timeLeft = msecs - (System.currentTimeMillis() - startTime);

if (timeLeft <= 0) {

broken_ = true;

departures_ = entries_;

notifyAll();

throw new TimeoutException(msecs);

}

}

try {

wait(timeLeft);

}

catch (InterruptedException ex) {

if (broken_ || departures_ > 0) { // interrupted after release

Thread.currentThread().interrupt();

break;

}

else {

broken_ = true;

departures_ = entries_;

notifyAll();

throw ex;

}

}

}

}

}

finally {

y = slots_[index];

// Last one out cleans up and allows next set of threads in

if (--departures_ <= 0) {

for (int i = 0; i < slots_.length; ++i) slots_[i] = null;

entryGate_.release(entries_);

entries_ = 0;

}

}

// continue if no IE/TO throw

if (broken_)

throw new BrokenBarrierException(index);

else

return y;

}

}

既然这样,那提供一个共用的Barrier接口还有什么意义呢?Doug Lea也觉察出了这个问题。所以在即将在JDK1.5中作为标准并发工具包发布的java.util.concurrent中,就去除了Barrier接口。

最常用的Barrier实现是CyclicBarrier,下面是CyclicBarrier的一个简单使用实例,Rendezvous的使用实例可以参考concurrent包的API文档。

class Solver {

final int N;

final float[][] data;

final CyclicBarrier barrier;

class Worker implements Runnable {

int myRow;

Worker(int row) { myRow = row; }

public void run() {

while (!done()) {

processRow(myRow);

try {

barrier.barrier();

}

catch (InterruptedException ex) { return; }

catch (BrokenBarrierException ex) { return; }

}

}

}

public Solver(float[][] matrix) {

data = matrix;

N = matrix.length;

barrier = new CyclicBarrier(N);

barrier.setBarrierCommand(new Runnable() {

public void run() { mergeRows(...); }

});

for (int i = 0; i < N; ++i) {

new Thread(new Worker(i)).start();

waitUntilDone();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: