java.util.concurrent.BlockingQueue ArrayBlockingQueue
2012-02-28 11:35
465 查看
一、简介
BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,让容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞。ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。二、具体实现
ArrayBlockingQueue底层定义如下:public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 使用循环数组来实现queue,初始时takeIndex和putIndex均为0 private final E[] items; private transient int takeIndex; private transient int putIndex; private int count; // 用于并发的锁和条件 private final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; /** * 循环数组 * Circularly increment i. */ final int inc(int i) { return (++i == items.length)? 0 : i; } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; // 分配锁及该锁上的condition lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } ... }
ArrayBlockingQueue的取操作:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; // 激发notFull条件 notFull.signal(); return x; } /** * condition的await的语义如下: * 与condition相关的锁以原子方式释放,并禁用该线程 * 方法返回时,线程必须获得与该condition相关的锁 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { // 等待notEmpty的条件 while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } ... }
ArrayBlockingQueue的写操作:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } public void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { // 等待notFull条件 while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(o); } finally { lock.unlock(); } } ... }
注意:ArrayBlockingQueue在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式。
相关文章推荐
- java.util.concurrent系列之--ArrayBlockingQueue
- Java多线程工具包java.util.concurrent---ArrayBlockingQueue
- 《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue
- java.util.concurrent.ArrayBlockingQueue
- java.util.concurrent.ArrayBlockingQueue
- (译)Java Concurrent系列--ArrayBlockingQueue
- java.util.concurrent.LinkedBlockingQueue
- java.util.Concurrent.BlockingQueue
- java.util.concurrent BlockingQueue
- ClassNotFound: edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue
- java.util.concurrent.LinkedBlockingQueue
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- 《java.util.concurrent 包源码阅读》07 LinkedBlockingQueue
- java.util.concurrent.BlockingQueue
- 《java.util.concurrent 包源码阅读》19 PriorityBlockingQueue
- java.util.concurrent系列之--LinkedBlockingQueue
- Java中的阻塞队列ArrayBlockingQueue的使用
- Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
- Java多线程系列--【JUC集合07】- ArrayBlockingQueue
- 深入Java集合学习系列:ArrayBlockingQueue及其实现原理