JAVA多线程之——LinkedBlockingQueue
2017-04-04 13:55
288 查看
LinkedBlockingQueue
LinkedBlockingQueue队列跟ArrayBlockingQueue一样都实现了BlockingQueue。因此同样是阻塞队列,有三种删除和三种添加的方法。LinkedBlockingQueue的底层是基于链表实现。ArrayBlockingQueue通过一个锁和锁的两个条件对象保证并发的安全性,LinkedBlockingQueue通过两个锁和每个锁的一个条件队列来控制并发的安全性。基本属性如下:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity; private final AtomicInteger count = new AtomicInteger(0); private transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
ArrayBlockingQueue只有一个锁,因此不支持添加和删除元素并行执行,LinkedBlockingQueue有一个取锁和放锁,因此支持并行的取和放。但是,取和取,放和放之间是互斥的。每次只能有一个线程拿到锁。
add方法实现
同理add方法是父类AbstractQueue队列实现,其中调用了offer方法。如果因为队列满了,添加失败抛出IllegalStateException异常。
offer实现
public boolean offer(E e) { if (e == null) throw new NullPointerException();//插入元素为空抛出异常 final AtomicInteger count = this.count; if (count.get() == capacity)//队列已满,返回false return false; int c = -1; Node<E> node = new Node(e); //把需要插入的对象封装成队列的一个节点 final ReentrantLock putLock = this.putLock; putLock.lock(); //获取放锁 try { if (count.get() < capacity) { enqueue(node); //队列没满,把节点加入队列 c = count.getAndIncrement(); if (c + 1 < capacity) //添加之后,队列还没满,就通知其它等待添加线程 notFull.signal(); } } finally { putLock.unlock(); //释放锁 } if (c == 0) //如果先前队列为空(开始线程为空,那么可能所有取线程在调用take方法时候,会堵塞起来),通知被堵塞的取线程 signalNotEmpty(); return c >= 0; }
enqueue方法
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
put方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();//插入对象为空抛出异常 int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly();//获取放锁,如果线程被中断抛出异常 try { while (count.get() == capacity) { //如果队列满了,阻塞当前线程,知道队列可以添加元素为止 notFull.await(); } enqueue(node); //队列没满,就插入队列尾部 c = count.getAndIncrement(); if (c + 1 < capacity) //插入元素之后,队列还没满就通知其它放线程 notFull.signal(); } finally { putLock.unlock(); //释放锁 } if (c == 0) //插入之前队列为空,就通知阻塞的取线程 signalNotEmpty(); }
poll方法
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) //如果队列为空,返回null return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); //队列不为空,获取取锁 4000 try { if (count.get() > 0) { //再次判断队列是否有元素 x = dequeue(); //获取头节点的值,并且删除 c = count.getAndDecrement(); if (c > 1) //队列如果还有元素,通知其它取线程 notEmpty.signal(); } } finally { takeLock.unlock(); //释放锁 } if (c == capacity) //这里看上去c == capacity看上去是队列满了,就通知放线程,但是这里的c返回的是count取元素之前的值。之后,这个count减1.所以这个时候count的值可能是在一直变化的。 signalNotFull(); return x; }
take方法
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//获取取锁。如果线程中断,抛出异常 try { while (count.get() == 0) {//队列为空,就一直阻塞,知道队列有数据 notEmpty.await(); } x = dequeue();//获取头元素,并删除 c = count.getAndDecrement(); //元素个数减1,并返回原来的个数 if (c > 1) //如果队列没不为空,通知其它取线程 notEmpty.signal(); } finally { takeLock.unlock();//释放锁 } if (c == capacity)//还清其它放线程。 signalNotFull(); return x; }
remove方法
public boolean remove(Object o) { if (o == null) return false;//对象为空,返回false fullyLock();//删除元素,此元素的位置任意,所以取锁和放锁都锁定 try { for (Node<E> trail = head, p = trail.next; //遍历整个链表 p != null; trail = p, p = p.next) { if (o.equals(p.item)) { //判断是否找到元素 unlink(p, trail); //找到元素,调用unlink方法 return true; //删除成功返回true } } return false; } finally { fullyUnlock();//释放锁 } }
unlink
void unlink(Node<E> p, Node<E> trail) { p.item = null; //将找到的元素置空 trail.next = p.next;//把删除元素的上一个节点和它下一个节点链接(等于把自己从链表中断开出来) if (last == p)//如果元素是最后一个,就把last节点指向它的上一个节点 last = trail; if (count.getAndDecrement() == capacity)//唤醒放节点 notFull.signal(); }
LinkedBlockingQueue队列还有一个获取头部的方法,但那是该方法并不移除元素。这就是peek方法
public E peek() { if (count.get() == 0)//队列为空,返回null return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock();//获取锁 try { Node<E> first = head.next; if (first == null) //头为空返回null return null; else return first.item;//不为空返回其值 } finally { takeLock.unlock(); } }
相关文章推荐
- JAVA多线程之——LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- Java多线程-BlockingQueue-ArrayBlockingQueue-LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- java多线程之LinkedBlockingQueue
- Java多线程系列--【JUC集合08】- LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueue
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- JAVA多线程之——LinkedBlockingQueue
- Java多线程系列--“JUC集合”08之 LinkedBlockingQueue
- Java集合源码学习(17)_BlockingQueue接口的实现LinkedBlockingQueue