您的位置:首页 > 产品设计 > UI/UE

深入分析ArrayBlockingQueue

2016-09-04 00:49 441 查看
ArrayBlockingQueue是固定大小的阻塞队列,其内部采用数组存储元素。ArrayBlockingQueue继承自AbstractQueue,并实现BlockingQueue接口。



图1 ArrayBlockingQueue的公共方法

使用场景:

ArrayBlockingQueue主要用在限制阻塞队列大小的环境中。比如工厂在生产货物的过程中,不会生产过剩的货物。

源码分析:

在ArrayBlockingQueue中,利用以下信息来维护当前的队列。

/** The queued items  */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;


items数组:存储队列中存放的所有数据。

takeIndex:记录当前可提取的元素。在提取操作之后,takeIndex指向数组中的下一个元素。

putIndex:记录当前可添加的位置索引。在添加之后,putIndex指向数组中的下一个可存放的位置。

count:记录队列中存放的元素个数。

ArrayBlockingQueue的构造函数:

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化数组容量
this.items = (E[]) new Object[capacity];
// 创建锁
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
if (capacity < c.size())
throw new IllegalArgumentException();

for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
add(it.next());
}


可以看到,ArrayBlockingQueue(int capacity, boolean fair)的构造函数传入2个参数,一个是队列的容量大小,根据该值初始化创建数组;还有一个是fair属性,用来创建锁。当fair为false时,由于创建的是非公平锁,所以不能保证数据被加入到队列中的顺序和申请的顺序是一致的;当fair为true时,创建的是公平锁,加入到队列中的顺序和申请的顺序是一致的。

add()方法:

public boolean add(E e) {
return super.add(e);
}

// AbstractQueue中的add()方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}


ArrayBlockingQueue并没有对add()方法进行特殊处理,其调用AbstractQueue中的add()方法。

AbstractQueue中的add()方法通过offer()操作判断是否可以添加元素到队列中,如果失败,则抛出IllegalStateException异常。

offer()方法

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 判断数组是否已满
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}

private void insert(E x) {
items[putIndex] = x;
// 记录下一个可存放元素的位置,
// 如果putIndex已经是最后一个位置了,则执行inc操作之后putIndex等于0
putIndex = inc(putIndex);
++count;
// 唤醒等待队列非空的线程
notEmpty.signal();
}

final int inc(int i) {
return (++i == items.length)? 0 : i;
}


offer()方法增加ReentrantLock 锁来保证插入元素的同步。在插入元素前,会先判断count是否已经到达数组容量的大小。如果数组已满,则不执行插入操作,并返回false;否则执行插入操作。

put()方法:

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
// 如果队列已经满了,则利用notFull阻塞当前线程
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}


对于put方法,ArrayBlockingQueue利用Condition实现当前线程的阻塞。可以看到,ArrayBlockingQueue中使用notFull和notEmpty两个condition来处理添加/获取元素的场景。

offer(E e, long timeout, TimeUnit unit)方法:

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (count != items.length) {
insert(e);
return true;
}
if (nanos <= 0)
return false;
try {
// 利用Condition的超时特性
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}


offer(E e, long timeout, TimeUnit unit)的超时是利用锁的超时实现的。在我们自己写代码过程中,也可以充分利用java的特性实现自己需要的功能。

poll()方法:

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
// 从队列中删除首元素,并返回
E x = extract();
return x;
} finally {
lock.unlock();
}
}


poll()方法会立刻返回队列中的首元素,如果首元素不存在,则返回null。

take()方法:

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
// 如果队列为空,则一直等待
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}


take()方法获得队列中的首元素。其中在返回首元素之前,会先判断队列是否为空。如果队列为空,则调用notEmpty的await()方法,阻塞当前线程,直到队列中的元素不为空。

peek()方法:

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();
}
}


可以看到peek方法返回首元素,但不删除首元素。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  concurrent 阻塞队列