您的位置:首页 > 产品设计 > UI/UE

JAVA多线程之——LinkedBlockingQueue

2017-04-04 13:55 288 查看

LinkedBlockingQueue

LinkedBlockingQueue队列跟ArrayBlockingQueue一样都实现了BlockingQueue。因此同样是阻塞队列,有三种删除和三种添加的方法。LinkedBlockingQueue的底层是基于链表实现。ArrayBlockingQueue通过一个锁和锁的两个条件对象保证并发的安全性,LinkedBlockingQueue通过两个锁和每个锁的一个条件队列来控制并发的安全性。

基本属性如下:

static class Node<E> {
E item;
Node<E> next;

Node(E x) { item = x; }
}

private final int capacity;

private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();


ArrayBlockingQueue只有一个锁,因此不支持添加和删除元素并行执行,LinkedBlockingQueue有一个取锁和放锁,因此支持并行的取和放。但是,取和取,放和放之间是互斥的。每次只能有一个线程拿到锁。

add方法实现

同理add方法是父类AbstractQueue队列实现,其中调用了offer方法。如果因为队列满了,添加失败抛出IllegalStateException异常。

offer实现

public boolean offer(E e) {
if (e == null) throw new NullPointerException();//插入元素为空抛出异常
final AtomicInteger count = this.count;
if (count.get() == capacity)//队列已满,返回false
return false;
int c = -1;
Node<E> node = new Node(e);  //把需要插入的对象封装成队列的一个节点
final ReentrantLock putLock = this.putLock;
putLock.lock();         //获取放锁
try {
if (count.get() < capacity) {
enqueue(node);       //队列没满,把节点加入队列
c = count.getAndIncrement();
if (c + 1 < capacity)     //添加之后,队列还没满,就通知其它等待添加线程
notFull.signal();
}
} finally {
putLock.unlock(); //释放锁
}
if (c == 0)     //如果先前队列为空(开始线程为空,那么可能所有取线程在调用take方法时候,会堵塞起来),通知被堵塞的取线程
signalNotEmpty();
return c >= 0;
}


enqueue方法

private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}


put方法

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();//插入对象为空抛出异常
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//获取放锁,如果线程被中断抛出异常
try {
while (count.get() == capacity) { //如果队列满了,阻塞当前线程,知道队列可以添加元素为止
notFull.await();
}
enqueue(node);  //队列没满,就插入队列尾部
c = count.getAndIncrement();
if (c + 1 < capacity) //插入元素之后,队列还没满就通知其它放线程
notFull.signal();
} finally {
putLock.unlock(); //释放锁
}
if (c == 0) //插入之前队列为空,就通知阻塞的取线程
signalNotEmpty();
}


poll方法

public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)  //如果队列为空,返回null
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();   //队列不为空,获取取锁
4000

try {
if (count.get() > 0) {   //再次判断队列是否有元素
x = dequeue();     //获取头节点的值,并且删除
c = count.getAndDecrement();
if (c > 1)  //队列如果还有元素,通知其它取线程
notEmpty.signal();
}
} finally {
takeLock.unlock(); //释放锁
}
if (c == capacity)  //这里看上去c == capacity看上去是队列满了,就通知放线程,但是这里的c返回的是count取元素之前的值。之后,这个count减1.所以这个时候count的值可能是在一直变化的。
signalNotFull();
return x;
}


take方法

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//获取取锁。如果线程中断,抛出异常
try {
while (count.get() == 0) {//队列为空,就一直阻塞,知道队列有数据
notEmpty.await();
}
x = dequeue();//获取头元素,并删除
c = count.getAndDecrement(); //元素个数减1,并返回原来的个数
if (c > 1) //如果队列没不为空,通知其它取线程
notEmpty.signal();
} finally {
takeLock.unlock();//释放锁
}
if (c == capacity)//还清其它放线程。
signalNotFull();
return x;
}


remove方法

public boolean remove(Object o) {
if (o == null) return false;//对象为空,返回false
fullyLock();//删除元素,此元素的位置任意,所以取锁和放锁都锁定
try {
for (Node<E> trail = head, p = trail.next;  //遍历整个链表
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {  //判断是否找到元素
unlink(p, trail);  //找到元素,调用unlink方法
return true;  //删除成功返回true
}
}
return false;
} finally {
fullyUnlock();//释放锁
}
}


unlink

void unlink(Node<E> p, Node<E> trail) {
p.item = null;  //将找到的元素置空
trail.next = p.next;//把删除元素的上一个节点和它下一个节点链接(等于把自己从链表中断开出来)
if (last == p)//如果元素是最后一个,就把last节点指向它的上一个节点
last = trail;
if (count.getAndDecrement() == capacity)//唤醒放节点
notFull.signal();
}


LinkedBlockingQueue队列还有一个获取头部的方法,但那是该方法并不移除元素。这就是peek方法

public E peek() {
if (count.get() == 0)//队列为空,返回null
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//获取锁
try {
Node<E> first = head.next;
if (first == null) //头为空返回null
return null;
else
return first.item;//不为空返回其值
} finally {
takeLock.unlock();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发 多线程