您的位置:首页 > 编程语言 > Java开发

JDK容器与并发—Queue—DelayQueue

2016-08-28 10:29 211 查看

概述

      元素为实现Delayed接口的无界阻塞队列。

数据结构

      基于优先队列,一把锁、一个条件:

// 增删查公用的锁
private transient final ReentrantLock lock = new ReentrantLock();

// 用优先队列存储元素
private final PriorityQueue<E> q = new PriorityQueue<E>();

// 实现Leader-Follower设计模式,以最少等待获取元素
// leader线程用于在队首等待到期元素
// leader线程可以更换,其只需要下一次delay时间即可,而其他Follower线程无限期等待
// leader线程在从take()、poll(...)返回前需要通知其他Follower线程直至其中某个线程成为leader
// 当有更早的过期元素成为队首元素时,通过设置leader为null,使当前leader线程失去leader位置,等待中的某个Follower线程成为leader
// 等待中的Follower线程随时准备好成为leader或失去leader位置
private Thread leader = null;

// 当队首可获取新的元素时或某个Follower线程成为新leader时,会发送available信号
private final Condition available = lock.newCondition();


构造器

// 无参构造
public DelayQueue() {}

// 带Collection参数构造
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}


增删查

步骤:

1)获取锁;

2)将元素委托给优先队列q入队;

3)若入队的元素delay时间最小,则更换leader;

4)返回true。

public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();		// 获取lock锁
try {
q.offer(e);     // 委托给优先队列入队
if (q.peek() == e) {	// 若入队的元素delay时间最小,则更换leader
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();  // 释放lock锁
}
}


      步骤:

1)获取锁;

2)若队列为空,则阻塞等待;

3)否则,获取队首元素delay时间;

4)若delay时间已过期,则释放锁,将队首元素出队(注意:take返回前,如果leader为null且队列不为空,则发送available信号);

5)若delay时间未到期且已设置leader,则阻塞等待;

6)若delay时间未到期且未设置leader,则设置当前线程为leader,等待队首元素过期;

7)循环2)——6),返回前,释放锁。

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();      // 委托给优先队列q获取队首元素
if (first == null)		 // 队列q为空,阻塞等待
available.await();
else {					 // 获取队首元素first
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)		 // 队首元素delay时间已到,直接将其出队
return q.poll();
else if (leader != null) // 队首元素delay时间未到,已设置leader线程,则Follower阻塞等待
available.await();
else {					 // 队首元素delay时间未到,还未设置leader线程,设置当前线程为leader,等待队首元素过期
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); 	// leader线程等待delay时间
} finally {
if (leader == thisThread)		// delay时间到,更换leader
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null) // take返回前,如果leader为null且队列不为空,则发送available信号
available.signal();
lock.unlock();
}
}

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();				// 获取锁
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null; // 队列为空或无过期时间元素,则返回null
else
return q.poll(); // 有过期元素,将其出队
} finally {
lock.unlock();			// 释放锁
}
}

// 有限阻塞
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null) // poll返回前,如果leader为null且队列不为空,则发送available信号
available.signal();
lock.unlock();
}
}


步骤:

1)获取锁;

2)获取队首元素;

3)释放锁。

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}


迭代器

      遍历所有已过期和未到期的元素,不保证元素的顺序。

      弱一致性,基于元素的副本构造:

public Iterator<E> iterator() {
return new Itr(toArray());
}

private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor;           // index of next element to return;
int lastRet;          // index of last element, or -1 if no such

Itr(Object[] array) {
lastRet = -1;
this.array = array;
}

public boolean hasNext() {
return cursor < array.length;
}

@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
}

public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
Object x = array[lastRet];
lastRet = -1;
// Traverse underlying queue to find == element,
// not just a .equals element.
lock.lock();
try {
for (Iterator it = q.iterator(); it.hasNext(); ) {
if (it.next() == x) {
it.remove();
return;
}
}
} finally {
lock.unlock();
}
}
}


特性

      用leader—follower设计模式实现leader获取已过期的优先队列队首元素。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息