JDK容器与并发—Queue—DelayQueue
2016-08-28 10:29
211 查看
概述
元素为实现Delayed接口的无界阻塞队列。数据结构
基于优先队列,一把锁、一个条件:// 增删查公用的锁 private transient final ReentrantLock lock = new ReentrantLock(); // 用优先队列存储元素 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 实现Leader-Follower设计模式,以最少等待获取元素 // leader线程用于在队首等待到期元素 // leader线程可以更换,其只需要下一次delay时间即可,而其他Follower线程无限期等待 // leader线程在从take()、poll(...)返回前需要通知其他Follower线程直至其中某个线程成为leader // 当有更早的过期元素成为队首元素时,通过设置leader为null,使当前leader线程失去leader位置,等待中的某个Follower线程成为leader // 等待中的Follower线程随时准备好成为leader或失去leader位置 private Thread leader = null; // 当队首可获取新的元素时或某个Follower线程成为新leader时,会发送available信号 private final Condition available = lock.newCondition();
构造器
// 无参构造 public DelayQueue() {} // 带Collection参数构造 public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
增删查
增
步骤:1)获取锁;
2)将元素委托给优先队列q入队;
3)若入队的元素delay时间最小,则更换leader;
4)返回true。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); // 获取lock锁 try { q.offer(e); // 委托给优先队列入队 if (q.peek() == e) { // 若入队的元素delay时间最小,则更换leader leader = null; available.signal(); } return true; } finally { lock.unlock(); // 释放lock锁 } }
删
步骤:1)获取锁;
2)若队列为空,则阻塞等待;
3)否则,获取队首元素delay时间;
4)若delay时间已过期,则释放锁,将队首元素出队(注意:take返回前,如果leader为null且队列不为空,则发送available信号);
5)若delay时间未到期且已设置leader,则阻塞等待;
6)若delay时间未到期且未设置leader,则设置当前线程为leader,等待队首元素过期;
7)循环2)——6),返回前,释放锁。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); // 委托给优先队列q获取队首元素 if (first == null) // 队列q为空,阻塞等待 available.await(); else { // 获取队首元素first long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) // 队首元素delay时间已到,直接将其出队 return q.poll(); else if (leader != null) // 队首元素delay时间未到,已设置leader线程,则Follower阻塞等待 available.await(); else { // 队首元素delay时间未到,还未设置leader线程,设置当前线程为leader,等待队首元素过期 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); // leader线程等待delay时间 } finally { if (leader == thisThread) // delay时间到,更换leader leader = null; } } } } } finally { if (leader == null && q.peek() != null) // take返回前,如果leader为null且队列不为空,则发送available信号 available.signal(); lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); // 获取锁 try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; // 队列为空或无过期时间元素,则返回null else return q.poll(); // 有过期元素,将其出队 } finally { lock.unlock(); // 释放锁 } } // 有限阻塞 public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // poll返回前,如果leader为null且队列不为空,则发送available信号 available.signal(); lock.unlock(); } }
查
步骤:1)获取锁;
2)获取队首元素;
3)释放锁。
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } }
迭代器
遍历所有已过期和未到期的元素,不保证元素的顺序。弱一致性,基于元素的副本构造:
public Iterator<E> iterator() { return new Itr(toArray()); } private class Itr implements Iterator<E> { final Object[] array; // Array of all elements int cursor; // index of next element to return; int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); Object x = array[lastRet]; lastRet = -1; // Traverse underlying queue to find == element, // not just a .equals element. lock.lock(); try { for (Iterator it = q.iterator(); it.hasNext(); ) { if (it.next() == x) { it.remove(); return; } } } finally { lock.unlock(); } } }
特性
用leader—follower设计模式实现leader获取已过期的优先队列队首元素。相关文章推荐
- JDK容器与并发—Queue
- JDK容器与并发—Queue—PriorityBlockingQueue
- JDK容器与并发—Queue—PriorityQueue
- JDK容器与并发—Queue—LinkedBlockingQueue
- JDK容器与并发—Queue—SynchronousQueue
- JDK容器与并发—Queue—LinkedTransferQueue
- JDK容器与并发—Queue—ConcurrentLinkedQueue
- java 非阻塞算法在并发容器中的实现(ConcurrentLinkedQueue源码)
- java并发编程-同步类容器-ArrayBlockingQueue
- Java高并发程序-Chapter3 JDK并发包(第二十一讲)JDK 并发容器
- Java concurrent Framework并发容器之ConcurrentHashMap(Doug Lea 非JDK版)源码分析
- JDK并发工具类源码学习系列——PriorityBlockingQueue
- JDK容器与并发—数据结构
- Java 并发容器和框架--ConcurrentLinkedQueue
- JDK并发工具类源码学习系列——PriorityBlockingQueue
- JDK源码分析之DelayQueue无边界阻塞队列类
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- JDK并发包 - 集合容器
- JDK容器与并发—JDK容器框架
- Java并发容器之非阻塞队列ConcurrentLinkedQueue