关于多线程同步的初步教程--Barrier的设计及使用
2008-03-04 10:12
543 查看
Barrier是一个多线程编程中经常要用到的同步工具,尤其多用于大数据量的计算过程中的同步。本文以广为流程的Doug Lea的concurrent工具包的Barrier实现为例,进行一点探讨。在Doug Lea的concurrent工具包中,Barrier是一个接口,在concurrent包中提供了两个Barrier的实现:CyclicBarrier和Rendezvous。下面是Barrier接口的定义:
public interface Barrier {
/**
* Return the number of parties that must meet per barrier
* point. The number of parties is always at least 1.
**/
public int parties();
/**
* Returns true if the barrier has been compromised
* by threads leaving the barrier before a synchronization
* point (normally due to interruption or timeout).
* Barrier methods in implementation classes throw
* throw BrokenBarrierException upon detection of breakage.
* Implementations may also support some means
* to clear this status.
**/
public boolean broken();
}
Barrier接口中的方法非常简单,parties()返回所有需要在屏障处同步的线程数;broken()返回一个标志,指示释放是否已被破坏。Barrier接口中并没有提供加入屏障的方法,而是在c和Rendezvous的Barrier实现中提供的。你可以会疑问,为什么不在Barrier接口中提供这些方法呢?因为这些实现的差异迥异,以至于很难在这些实现中提炼出一个共用的方法签名。比如,对于CyclicBarrier加入屏障的方法是:barrier(),
// CyclicBarrier.java
public int barrier() throws InterruptedException, BrokenBarrierException {
return doBarrier(false, 0);
}
protected synchronized int doBarrier(boolean timed, long msecs)
throws InterruptedException, TimeoutException, BrokenBarrierException {
int index = --count_;
if (broken_) {
throw new BrokenBarrierException(index);
}
else if (Thread.interrupted()) {
broken_ = true;
notifyAll();
throw new InterruptedException();
}
else if (index == 0) { // tripped
count_ = parties_;
++resets_;
notifyAll();
try {
if (barrierCommand_ != null)
barrierCommand_.run();
return 0;
}
catch (RuntimeException ex) {
broken_ = true;
return 0;
}
}
else if (timed && msecs <= 0) {
broken_ = true;
notifyAll();
throw new TimeoutException(msecs);
}
else { // wait until next reset
int r = resets_;
long startTime = (timed)? System.currentTimeMillis() : 0;
long waitTime = msecs;
for (;;) {
try {
wait(waitTime);
}
catch (InterruptedException ex) {
// Only claim that broken if interrupted before reset
if (resets_ == r) {
broken_ = true;
notifyAll();
throw ex;
}
else {
Thread.currentThread().interrupt(); // propagate
}
}
if (broken_)
throw new BrokenBarrierException(index);
else if (r != resets_)
return index;
else if (timed) {
waitTime = msecs - (System.currentTimeMillis() - startTime);
if (waitTime <= 0) {
broken_ = true;
notifyAll();
throw new TimeoutException(msecs);
}
}
}
}
}
而Rendezvous中则是:rendezvous(Object x):
// Rendezvous.java
public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException {
return doRendezvous(x, false, 0);
}
protected Object doRendezvous(Object x, boolean timed, long msecs)
throws InterruptedException, TimeoutException, BrokenBarrierException {
// rely on semaphore to throw interrupt on entry
long startTime;
if (timed) {
startTime = System.currentTimeMillis();
if (!entryGate_.attempt(msecs)) {
throw new TimeoutException(msecs);
}
}
else {
startTime = 0;
entryGate_.acquire();
}
synchronized(this) {
Object y = null;
int index = entries_++;
slots_[index] = x;
try {
// last one in runs function and releases
if (entries_ == parties_) {
departures_ = entries_;
notifyAll();
try {
if (!broken_ && rendezvousFunction_ != null)
rendezvousFunction_.rendezvousFunction(slots_);
}
catch (RuntimeException ex) {
broken_ = true;
}
}
else {
while (!broken_ && departures_ < 1) {
long timeLeft = 0;
if (timed) {
timeLeft = msecs - (System.currentTimeMillis() - startTime);
if (timeLeft <= 0) {
broken_ = true;
departures_ = entries_;
notifyAll();
throw new TimeoutException(msecs);
}
}
try {
wait(timeLeft);
}
catch (InterruptedException ex) {
if (broken_ || departures_ > 0) { // interrupted after release
Thread.currentThread().interrupt();
break;
}
else {
broken_ = true;
departures_ = entries_;
notifyAll();
throw ex;
}
}
}
}
}
finally {
y = slots_[index];
// Last one out cleans up and allows next set of threads in
if (--departures_ <= 0) {
for (int i = 0; i < slots_.length; ++i) slots_[i] = null;
entryGate_.release(entries_);
entries_ = 0;
}
}
// continue if no IE/TO throw
if (broken_)
throw new BrokenBarrierException(index);
else
return y;
}
}
既然这样,那提供一个共用的Barrier接口还有什么意义呢?Doug Lea也觉察出了这个问题。所以在即将在JDK1.5中作为标准并发工具包发布的java.util.concurrent中,就去除了Barrier接口。
最常用的Barrier实现是CyclicBarrier,下面是CyclicBarrier的一个简单使用实例,Rendezvous的使用实例可以参考concurrent包的API文档。
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.barrier();
}
catch (InterruptedException ex) { return; }
catch (BrokenBarrierException ex) { return; }
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N);
barrier.setBarrierCommand(new Runnable() {
public void run() { mergeRows(...); }
});
for (int i = 0; i < N; ++i) {
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
public interface Barrier {
/**
* Return the number of parties that must meet per barrier
* point. The number of parties is always at least 1.
**/
public int parties();
/**
* Returns true if the barrier has been compromised
* by threads leaving the barrier before a synchronization
* point (normally due to interruption or timeout).
* Barrier methods in implementation classes throw
* throw BrokenBarrierException upon detection of breakage.
* Implementations may also support some means
* to clear this status.
**/
public boolean broken();
}
Barrier接口中的方法非常简单,parties()返回所有需要在屏障处同步的线程数;broken()返回一个标志,指示释放是否已被破坏。Barrier接口中并没有提供加入屏障的方法,而是在c和Rendezvous的Barrier实现中提供的。你可以会疑问,为什么不在Barrier接口中提供这些方法呢?因为这些实现的差异迥异,以至于很难在这些实现中提炼出一个共用的方法签名。比如,对于CyclicBarrier加入屏障的方法是:barrier(),
// CyclicBarrier.java
public int barrier() throws InterruptedException, BrokenBarrierException {
return doBarrier(false, 0);
}
protected synchronized int doBarrier(boolean timed, long msecs)
throws InterruptedException, TimeoutException, BrokenBarrierException {
int index = --count_;
if (broken_) {
throw new BrokenBarrierException(index);
}
else if (Thread.interrupted()) {
broken_ = true;
notifyAll();
throw new InterruptedException();
}
else if (index == 0) { // tripped
count_ = parties_;
++resets_;
notifyAll();
try {
if (barrierCommand_ != null)
barrierCommand_.run();
return 0;
}
catch (RuntimeException ex) {
broken_ = true;
return 0;
}
}
else if (timed && msecs <= 0) {
broken_ = true;
notifyAll();
throw new TimeoutException(msecs);
}
else { // wait until next reset
int r = resets_;
long startTime = (timed)? System.currentTimeMillis() : 0;
long waitTime = msecs;
for (;;) {
try {
wait(waitTime);
}
catch (InterruptedException ex) {
// Only claim that broken if interrupted before reset
if (resets_ == r) {
broken_ = true;
notifyAll();
throw ex;
}
else {
Thread.currentThread().interrupt(); // propagate
}
}
if (broken_)
throw new BrokenBarrierException(index);
else if (r != resets_)
return index;
else if (timed) {
waitTime = msecs - (System.currentTimeMillis() - startTime);
if (waitTime <= 0) {
broken_ = true;
notifyAll();
throw new TimeoutException(msecs);
}
}
}
}
}
而Rendezvous中则是:rendezvous(Object x):
// Rendezvous.java
public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException {
return doRendezvous(x, false, 0);
}
protected Object doRendezvous(Object x, boolean timed, long msecs)
throws InterruptedException, TimeoutException, BrokenBarrierException {
// rely on semaphore to throw interrupt on entry
long startTime;
if (timed) {
startTime = System.currentTimeMillis();
if (!entryGate_.attempt(msecs)) {
throw new TimeoutException(msecs);
}
}
else {
startTime = 0;
entryGate_.acquire();
}
synchronized(this) {
Object y = null;
int index = entries_++;
slots_[index] = x;
try {
// last one in runs function and releases
if (entries_ == parties_) {
departures_ = entries_;
notifyAll();
try {
if (!broken_ && rendezvousFunction_ != null)
rendezvousFunction_.rendezvousFunction(slots_);
}
catch (RuntimeException ex) {
broken_ = true;
}
}
else {
while (!broken_ && departures_ < 1) {
long timeLeft = 0;
if (timed) {
timeLeft = msecs - (System.currentTimeMillis() - startTime);
if (timeLeft <= 0) {
broken_ = true;
departures_ = entries_;
notifyAll();
throw new TimeoutException(msecs);
}
}
try {
wait(timeLeft);
}
catch (InterruptedException ex) {
if (broken_ || departures_ > 0) { // interrupted after release
Thread.currentThread().interrupt();
break;
}
else {
broken_ = true;
departures_ = entries_;
notifyAll();
throw ex;
}
}
}
}
}
finally {
y = slots_[index];
// Last one out cleans up and allows next set of threads in
if (--departures_ <= 0) {
for (int i = 0; i < slots_.length; ++i) slots_[i] = null;
entryGate_.release(entries_);
entries_ = 0;
}
}
// continue if no IE/TO throw
if (broken_)
throw new BrokenBarrierException(index);
else
return y;
}
}
既然这样,那提供一个共用的Barrier接口还有什么意义呢?Doug Lea也觉察出了这个问题。所以在即将在JDK1.5中作为标准并发工具包发布的java.util.concurrent中,就去除了Barrier接口。
最常用的Barrier实现是CyclicBarrier,下面是CyclicBarrier的一个简单使用实例,Rendezvous的使用实例可以参考concurrent包的API文档。
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.barrier();
}
catch (InterruptedException ex) { return; }
catch (BrokenBarrierException ex) { return; }
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N);
barrier.setBarrierCommand(new Runnable() {
public void run() { mergeRows(...); }
});
for (int i = 0; i < N; ++i) {
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
相关文章推荐
- 关于多线程同步的初步教程--Metux的设计及使用
- 关于多线程同步的初步教程--Simaphore的设计及使用
- 关于多线程同步的初步教程--使用synchronized [推荐]
- 关于多线程同步的初步教程--使用synchronized
- 前端设计中关于外部js文件加载的速度优化及minify使用教程
- 关于使用LoadRunner对Appeon Web应用进行压力测试的初步介绍
- 关于多线程同步中使用的锁对象
- Unity3D游戏设计之旅--页面篇:NGUI使用(二)关于系统通知设计的理念
- Parasoft C++test使用教程:关于测试用例(二)
- 关于腾讯云server使用FTP具体配置教程
- TeeChart For VCL/FMX V2017使用教程:第五章 图例设计
- 关于spring中定时器的使用教程
- Android 关于greenDao的使用教程
- 关于Fiddler的使用教程
- Java中级----多线程同步基本思想,java多线程设计wait、notify、notifyall、synchronized的使用机制(转)
- 关于eclipse的初步使用
- 关于Java开发中设计模式的使用问题
- python安装与初步使用教程
- 也来一篇关于Infragistics WPF Report的使用教程 (一)