您的位置:首页 > 产品设计 > UI/UE

ArrayBlockingQueue源码分析

2018-01-08 16:12 399 查看
在Java1.5以后增加了阻塞队列BlockingQueue,主要提供了如下的常用方法。

boolean add(E e);往队列里面添加元素,如果可以添加进去,返回true,不能则抛出异常。IllegalStateException

boolean offer(E e, long timeout, TimeUnit unit)往队列里面添加元素,如果可以添加返回true,不能则等待一定的时间,如果还不能,则返回false。

void put(E e)往对列里面添加一个元素,如果不能添加,则进入阻塞状态,直到有空间可以添加。

E take()从队列中取出一个元素,如果没有元素取出进入则塞状态,直到可以取出元素。

E poll(long timeout, TimeUnit unit)从队列取出一个元素,可以等待一定额时间。

这几个是我们常用的,记住就行。

BlockingQueue主要有四个实现类。

ArrayBlockingQueue

LinkedBlockingQueue

PriorityBlockingQueue

SynchronousQueue

我们为了最后为线程池的分析做准备,我们将的按照一定的知识体系分析。

我们先来看看类中的一些属性。

//存在数据的数组
final Object[] items;
//取出元素的下标
int takeIndex;
//添加元素的下标
int putIndex;
//元素的个数
int count;
//可从入锁
final ReentrantLock lock;
//不为空的信号
private final Condition notEmpty;
private final Condition notFull;
**构造方法**
public ArrayBlockingQueue(int capacity, boolean fair) {
//初始化一系列的参数属性
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}


我们来看看常用的方法

put(E e)

public void put(E e) throws InterruptedException {
//检查参数的合法性
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果count==item.length
while (count == items.length)
//进入阻塞
//从这个我们就可以知道,在我们刚才上面的分析,为什么插入的时候,如果满了就进行阻塞,而不是跟别的一样。放弃插入或者进行等待一定的时间。
notFull.await();
//没有满,则进入插入方法
insert(e);
} finally {
//最后释放锁
lock.unlock();
}


检查参数的合法性

private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}


insert(e)

private void insert(E x) {
//把插入的元素放在数组里面
items[putIndex] = x;
//返回下一次插入的位置
putIndex = inc(putIndex);
//数组里面的数增加一个
++count;
//如果有阻塞的取元素的动作,告诉它有元素可以取了
//一定要看清楚锁notEmpty
notEmpty.signal();
}
final int inc(int i) {
//一个数组容量的判断
return (++i == items.length) ? 0 : i;
}


我们在分析一个offer()方法,看看是不是跟我们刚说的一样,不能插入等待一定时间,而不是我们刚分析的阻塞。

offer(E e, long timeout, TimeUnit unit)

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//下面的几句都擦不都
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//看这个代码,这个就是等待一定的时间,而不是直接返回false
nanos = notFull.awaitNanos(nanos);
}
//这个方法上面讲过。不在分析
insert(e);
return true;
} finally {
lock.unlock();
}
}


我们在讲讲常用的取出的方法。

take()

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
//上面不解释
try {
while (count == 0)
//这个时候我们回到上面的insert()去看看,我们可以发现有一个 notEmpty.signal(),这个是在插入了一个元素以后,唤醒了取元素的时候,没有元素进行阻塞的情况,告诉它我们已经插入了一个元素,你可去取了,然后进入extract()方法,跟进去看看。
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}


extract()

private E extract() {
//拿到这个数组
final Object[] items = this.items;
//取到第takeIndex位置的元素,遵循先入先出的原则
E x = this.<E>cast(items[takeIndex]);
//当前位置置位null
items[takeIndex] = null;
//takeIndex的下标加一
takeIndex = inc(takeIndex);
//数组中元素的个数-1
--count;
//这个就比较有意思了,我们可以看到又唤醒了一个等待的线程,我们可以在前面的put()方法中找到这么一段代码。 notFull.await(),这个就是唤醒put()方法,告诉它,我已经拿走了一个元素了,你可以往里面放数据了。然后put()方法接着调用insert()的方法。这就回到了上面的take()方法了,他们之间不停的互相唤醒,insert()进去数据以后又告诉take(),有数数据了,你可有拿了。
notFull.signal();
return x;
}


总结:

通过上面的分析我们可以看到为什么有的方法是进行阻塞,有的直接报异常。通过上面我们还可以看到。其实notEmpty,notFull,是同一个锁来的,忘了的同学可以看看构造方法,那么久意味着take(),put()方法是不可并行执行的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: