您的位置:首页 > 产品设计 > UI/UE

java.util.concurrent.BlockingQueue ArrayBlockingQueue

2012-02-28 11:35 465 查看

一、简介

BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,让容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞。ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

二、具体实现

ArrayBlockingQueue底层定义如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

// 使用循环数组来实现queue,初始时takeIndex和putIndex均为0
private final E[] items;
private transient int takeIndex;
private transient int putIndex;
private int count;

// 用于并发的锁和条件
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

/**
* 循环数组
* Circularly increment i.
*/
final int inc(int i) {
return (++i == items.length)? 0 : i;
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
// 分配锁及该锁上的condition
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}

...
}


ArrayBlockingQueue的取操作:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
// 激发notFull条件
notFull.signal();
return x;
}

/**
* condition的await的语义如下:
* 与condition相关的锁以原子方式释放,并禁用该线程
* 方法返回时,线程必须获得与该condition相关的锁
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
// 等待notEmpty的条件
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}

...
}


ArrayBlockingQueue的写操作:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}

public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
// 等待notFull条件
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(o);
} finally {
lock.unlock();
}
}

...
}


注意:ArrayBlockingQueue在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: