ArrayBlockingQueue源码分析
2018-03-25 18:01
507 查看
原文来自:https://blog.csdn.net/xin_jmail/article/details/26157971
ArrayBlockingQueue是一个由数组支持的有界阻塞队列,也是一个循环队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素,队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。
ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)。其中一个构造方法为:
[java] view
plain copy
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue类中定义的变量有:
[java] view
plain copy
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。注:本文主要讲解put()和take()操作,其他方法类似。
put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。
[java] view
plain copy
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
*/
public void put(E e) throws InterruptedException {
//不能存放 null 元素
if (e == null) throw new NullPointerException();
final E[] items = this.items; //数组队列
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
try {
//当队列满时,调用notFull.await()方法,使该线程阻塞。
//直到take掉某个元素后,调用notFull.signal()方法激活该线程。
while (count == items.length)
Full.await();
} catch (InterruptedException ie) {
Full.signal(); // propagate to non-interrupted thread
throw ie;
}
//把元素 e 插入到队尾
insert(e);
} finally {
//解锁
lock.unlock();
}
}
insert(E e) 方法如下:
[java] view
plain copy
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
//下标加1或者等于0
putIndex = inc(putIndex);
++count; //计数加1
//若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
//若没有take()线程陷入阻塞,则该操作无意义。
Empty.signal();
}
**
* Circularly increment i.
*/
final int inc(int i) {
//此处可以看到使用了循环队列
return (++i == items.length)? 0 : i;
}
take()方法代码如下。take操作和put操作相反,故不作详细介绍。
[java] view
plain copy
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加锁
try {
try {
//当队列空时,调用notEmpty.await()方法,使该线程阻塞。
//直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。
while (count == 0)
Empty.await();
} catch (InterruptedException ie) {
Empty.signal(); // propagate to non-interrupted thread
throw ie;
}
//取出队头元素
E x = extract();
return x;
} finally {
lock.unlock(); //解锁
}
}
extract() 方法如下:
[java] view
plain copy
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
Full.signal();
return x;
}
小结:进行put和take操作,共用同一个锁对象。也即是说,put和take无法并行执行!两个Condition来进行通信。
ArrayBlockingQueue是一个由数组支持的有界阻塞队列,也是一个循环队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素,队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列检索操作则是从队列头部开始获得元素。
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致放入操作受阻塞;试图从空队列中检索元素将导致类似阻塞。
ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)。其中一个构造方法为:
[java] view
plain copy
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue类中定义的变量有:
[java] view
plain copy
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。注:本文主要讲解put()和take()操作,其他方法类似。
put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。
[java] view
plain copy
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
*/
public void put(E e) throws InterruptedException {
//不能存放 null 元素
if (e == null) throw new NullPointerException();
final E[] items = this.items; //数组队列
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
try {
//当队列满时,调用notFull.await()方法,使该线程阻塞。
//直到take掉某个元素后,调用notFull.signal()方法激活该线程。
while (count == items.length)
Full.await();
} catch (InterruptedException ie) {
Full.signal(); // propagate to non-interrupted thread
throw ie;
}
//把元素 e 插入到队尾
insert(e);
} finally {
//解锁
lock.unlock();
}
}
insert(E e) 方法如下:
[java] view
plain copy
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
//下标加1或者等于0
putIndex = inc(putIndex);
++count; //计数加1
//若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
//若没有take()线程陷入阻塞,则该操作无意义。
Empty.signal();
}
**
* Circularly increment i.
*/
final int inc(int i) {
//此处可以看到使用了循环队列
return (++i == items.length)? 0 : i;
}
take()方法代码如下。take操作和put操作相反,故不作详细介绍。
[java] view
plain copy
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加锁
try {
try {
//当队列空时,调用notEmpty.await()方法,使该线程阻塞。
//直到take掉某个元素后,调用notEmpty.signal()方法激活该线程。
while (count == 0)
Empty.await();
} catch (InterruptedException ie) {
Empty.signal(); // propagate to non-interrupted thread
throw ie;
}
//取出队头元素
E x = extract();
return x;
} finally {
lock.unlock(); //解锁
}
}
extract() 方法如下:
[java] view
plain copy
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
Full.signal();
return x;
}
小结:进行put和take操作,共用同一个锁对象。也即是说,put和take无法并行执行!两个Condition来进行通信。
相关文章推荐
- 深入理解阻塞队列(二)——ArrayBlockingQueue源码分析
- java阻塞队列ArrayBlockingQueue源码分析
- JDK1.8 ArrayBlockingQueue源码分析
- ArrayBlockingQueue 源码阅读与分析
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- Java concurrent Framework并发容器之ArrayBlockingQueue(1.6)源码分析
- ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码分析及使用
- Java 容器源码分析之ArrayBlockingQueue和LinkedBlockingQueue
- Java 并发 --- 阻塞队列之ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码分析
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- JDK源码分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
- java多线程系列(九)---ArrayBlockingQueue源码分析
- ArrayBlockingQueue源码分析