03.JUC 集合 - ArrayBlockingQueue
2017-02-23 00:00
295 查看
##基本概念
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。
##内部构造
构造函数,ArrayBlockingQueue 内部维护了一个数组。因为它是有界队列,所以数组容量是固定不变的,不会进行扩容操作。并且采用了单锁+双条件队列来解决出入队操作的冲突问题。
出入队位置,ArrayBlockingQueue 依靠出入队位置从而达到循环使用数组。
初始化时,出入队位置都为 0 。
正常情况下,入队位置都是在出队位置前面。因为只有先入队,才能执行出队操作。
当入队位置到达数组末尾后,又从数组初始开始执行入队。此时入队位置在出队位置之前。
##入队操作
add,成功返回 ture,失败抛出异常
offer,成功返回 true,失败返回 false
put,失败则进入阻塞,知道成功。
关键
##出队操作
remove,失败返回 false,成功返回 true。
poll,成功返回 true,失败返回 false。
take,成功返回被操作的值,失败则进入阻塞直至成功。
关键
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。
##内部构造
构造函数,ArrayBlockingQueue 内部维护了一个数组。因为它是有界队列,所以数组容量是固定不变的,不会进行扩容操作。并且采用了单锁+双条件队列来解决出入队操作的冲突问题。
// 内部数组 final Object[] items; // 重入锁 final ReentrantLock lock; // 条件,用于出队操作 private final Condition notEmpty; // 条件,用于入队操作 private final Condition notFull; public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0){ // 抛出异常... } this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
出入队位置,ArrayBlockingQueue 依靠出入队位置从而达到循环使用数组。
// 出队位置 int takeIndex; // 入队位置 int putIndex;
初始化时,出入队位置都为 0 。
正常情况下,入队位置都是在出队位置前面。因为只有先入队,才能执行出队操作。
当入队位置到达数组末尾后,又从数组初始开始执行入队。此时入队位置在出队位置之前。
##入队操作
add,成功返回 ture,失败抛出异常
public boolean add(E e) { return super.add(e); } // BlockingQueue.add public boolean add(E e) { if (offer(e)){ return true; }else{ // 抛出异常... } }
offer,成功返回 true,失败返回 false
public boolean offer(E e) { // e 为空抛出异常 checkNotNull(e); // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 判断队列的元素个数是否满了 if (count == items.length){ return false; }else { insert(e); return true; } } finally { lock.unlock(); } }
put,失败则进入阻塞,知道成功。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列满了,则进入 notFull 条件等待队列 while (count == items.length){ notFull.await(); } insert(e); } finally { lock.unlock(); } }
关键
private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; // 唤醒出队时因为空队列而进入 notEmpty 等待队列的线程 notEmpty.signal(); } final int inc(int i) { // 关键 -> 循环增加,一旦到达末尾又重新回到 0 return (++i == items.length) ? 0 : i; }
##出队操作
remove,失败返回 false,成功返回 true。
public boolean remove(Object o) { if (o == null){ return false; } final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { // 遍历内部数组的所有元素,从 takeIndex 开始 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { if (o.equals(items[i])) { // 存在匹配元素,则删除 removeAt(i); return true; } } return false; } finally { lock.unlock(); } } // 删除指定位置的元素 void removeAt(int i) { final Object[] items = this.items; // 判断是否在 takeIndex 上 if (i == takeIndex) { items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // 关键 -> 若指定位置,不在 takeIndex 上,那么表示它一定在 takeIndex - PutIndex 之间。 // 需要将 [(i+1) - putIndex ] 位置的元素,前移一位,通过覆盖 i 位置的元素,来达到删除的效果。 for (;;) { int nexti = inc(i); if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; putIndex = i; break; } } } --count; // 唤醒入队时因为满队列而进入 notFull 等待队列的线程 notFull.signal(); }
poll,成功返回 true,失败返回 false。
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract(); } finally { lock.unlock(); } }
take,成功返回被操作的值,失败则进入阻塞直至成功。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0){ notEmpty.await(); } return extract(); } finally { lock.unlock(); } }
关键
private E extract() { final Object[] items = this.items; E x = this.<E> cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } // 类型转换 @(JUC 集合) static <E> E cast(Object item) { return (E) item; }
相关文章推荐
- Java集合, ArrayBlockingQueue源码解析(常用于并发编程)
- Java多线程系列--“JUC集合”07之 ArrayBlockingQueue
- Java多线程系列--【JUC集合07】- ArrayBlockingQueue
- JUC集合-07之 ArrayBlockingQueue
- Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理
- Java之集合(十六)ArrayBlockingQueue
- 线程并发集合实现java生成消费模型(ArrayBlockingQueue和ConcurrentMap)
- Java多线程系列--“JUC集合”07之 ArrayBlockingQueue
- Java集合--ArrayBlockingQueue
- 04.JUC 集合 - ArrayBlockingQueue
- Java集合源码学习(16)_BlockingQueue接口的实现ArrayBlockingQueue
- Java多线程系列--“JUC集合”07之 ArrayBlockingQueue
- 详细分析Java并发集合ArrayBlockingQueue的用法
- (十五)java多线程之并发集合ArrayBlockingQueue
- Java多线程系列--“JUC集合”07之 ArrayBlockingQueue
- Java多线程--并发中集合的使用之ArrayBlockingQueue
- Java 集合框架分析:ArrayBlockingQueue java1.8
- 深入Java集合学习系列:ArrayBlockingQueue及其实现原理
- Java集合类(十九)JUC中的集合--ArrayBlockingQueue
- java并发编程(二十四)----(JUC集合)ArrayBlockingQueue和LinkedBlockingQueue介绍