JDK容器与并发—Queue—LinkedTransferQueue
2016-08-28 10:26
501 查看
概述
基于单链表的无界传输队列,线程安全。1)FIFO;
2)size()非固定时间,由于异步特新,若遍历过程有修改,则可能不正确;批量操作addAll、removeAll、retainAll、containsAll、equals、toArray不保证原子性;
3)遵守内存一致性原则。
数据结构
基于单链表:// 队列节点 // 依赖Unsafe机制最小化排序限制,可借助与其他访问或CAS操作的排序关系,采用putObject形式 static final class Node { final boolean isData; // true:数据节点;false:请求节点 volatile Object item; // Object类型;数据节点时non-null volatile Node next; volatile Thread waiter; // null until waiting // CAS next final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // CAS item final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 带item、isData参数构造 Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // putObject写,casNext后才可见 this.isData = isData; } // 自链接,casHead后调用 // help GC final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } // putObject item、waiter为null // 在匹配或取消后,help GC final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); // 在volatile/atomic获取item后才会调用到 UNSAFE.putObject(this, waiterOffset, null); // 在CAS或从park返回后才调用到 } // 是否匹配 final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } // 是否为未匹配的请求节点 final boolean isUnmatchedRequest() { return !isData && item == null; } // true:由于该节点未匹配且有相反的节点类型,当前haveData类型节点不能附加到其后 final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } // 手工匹配一个数据节点 final boolean tryMatchData() { // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } } /** head of the queue; null until first enqueue */ transient volatile Node head; /** tail of the queue; null until first append */ private transient volatile Node tail;
构造器
// 无参构造 public LinkedTransferQueue() { } // 带Collection参数构造 public LinkedTransferQueue(Collection<? extends E> c) { this(); addAll(c); }
增删查
基础方法
/** * 入队、出队的基础方法 * * @param e :入队item;出队null * @param haveData :入队true;出队false * @param how:NOW, ASYNC, SYNC, or TIMED * @param nanos :有限阻塞超时,纳米级 * @return an item if matched, else e * @throws NullPointerException if haveData mode but e is null */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 元素不能为null throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race for (Node h = head, p = h; p != null;) { // 从head节点开始,查找、匹配第一个未匹配节点 boolean isData = p.isData; Object item = p.item; if (item != p && (item != null) == isData) { // p为未匹配节点 if (isData == haveData) // 相同类型节点,不能匹配 break; if (p.casItem(item, e)) { // 进行匹配操作 for (Node q = p; q != h;) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { // 其他线程有维护head h.forgetNext(); // 将原来的head节点出队 break; } // 向前推进,重试 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return this.<E>cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { // 为匹配到 if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); if (pred == null) continue retry; // lost race vs opposite mode if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // 非阻塞 } } // 将节点附加为tail节点 // @param s :待附加节点 // @param haveData true:数据节点 // @return null :与不同类型模式节点附加或其前驱节点或其自身(没有前驱节点)竞争失败; private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // 从tail开始 Node n, u; // temps for reads of next & tail if (p == null && (p = head) == null) { // 队列为空 if (casHead(null, s)) return s; // initialize } else if (p.cannotPrecede(haveData)) return null; // 有相反类型的节点先附加 else if ((n = p.next) != null) // 非最后阶段,向前推进 p = p != t && t != (u = tail) ? (t = u) : // 其他线程有维护tail,t为过时tail (p != n) ? n : null; // 向前推进,如果p off-list,重新开始 else if (!p.casNext(null, s)) p = p.next; // 有其他线程附加节点进来 else { if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } return p; } } } /** * Spins/yields/blocks until node s is matched or caller gives up. * * @param s the waiting node * @param pred the predecessor of s, or s itself if it has no * predecessor, or null if unknown (the null case does not occur * in any current calls but may in possible future extensions) * @param e the comparison value for checking match * @param timed if true, wait only until timeout elapses * @param nanos timeout in nanosecs, used only if timed is true * @return matched item, or e if unmatched on interrupt or timeout */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { long lastTime = timed ? System.nanoTime() : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; if (item != e) { // matched // assert item != s; s.forgetContents(); // avoid garbage return this.<E>cast(item); } if ((w.isInterrupted() || (ti 4000 med && nanos <= 0)) && s.casItem(e, s)) { // cancel unsplice(pred, s); return e; } if (spins < 0) { // establish spins at/near front if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield } else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } else if (timed) { long now = System.nanoTime(); if ((nanos -= now - lastTime) > 0) LockSupport.parkNanos(this, nanos); lastTime = now; } else { LockSupport.park(this); } } }
增
public void put(E e) { xfer(e, true, ASYNC, 0); } public boolean offer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); return true; } public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
删
public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return xfer(null, false, NOW, 0); }
查
public E peek() { return firstDataItem(); } private E firstDataItem() { for (Node p = head; p != null; p = succ(p)) { Object item = p.item; if (p.isData) { if (item != null && item != p) return this.<E>cast(item); } else if (item == null) return null; } return null; }
迭代器
final class Itr implements Iterator<E> { private Node nextNode; // next node to return item for private E nextItem; // the corresponding item private Node lastRet; // last returned node, to support remove private Node lastPred; // predecessor to unlink lastRet /** * Moves to next node after prev, or first node if prev null. */ private void advance(Node prev) { /* * To track and avoid buildup of deleted nodes in the face * of calls to both Queue.remove and Itr.remove, we must * include variants of unsplice and sweep upon each * advance: Upon Itr.remove, we may need to catch up links * from lastPred, and upon other removes, we might need to * skip ahead from stale nodes and unsplice deleted ones * found while advancing. */ Node r, b; // reset lastPred upon possible deletion of lastRet if ((r = lastRet) != null && !r.isMatched()) lastPred = r; // next lastPred is old lastRet else if ((b = lastPred) == null || b.isMatched()) lastPred = null; // at start of list else { Node s, n; // help with removal of lastPred.next while ((s = b.next) != null && s != b && s.isMatched() && (n = s.next) != null && n != s) b.casNext(s, n); } this.lastRet = prev; for (Node p = prev, s, n;;) { s = (p == null) ? head : p.next; if (s == null) break; else if (s == p) { p = null; continue; } Object item = s.item; if (s.isData) { if (item != null && item != s) { nextItem = LinkedTransferQueue.<E>cast(item); nextNode = s; return; } } else if (item == null) break; // assert s.isMatched(); if (p == null) p = s; else if ((n = s.next) == null) break; else if (s == n) p = null; else p.casNext(s, n); } nextNode = null; nextItem = null; } Itr() { advance(null); } public final boolean hasNext() { return nextNode != null; } public final E next() { Node p = nextNode; if (p == null) throw new NoSuchElementException(); E e = nextItem; advance(p); return e; } public final void remove() { final Node lastRet = this.lastRet; if (lastRet == null) throw new IllegalStateException(); this.lastRet = null; if (lastRet.tryMatchData()) unsplice(lastPred, lastRet); } }
特性
lock-free,传输队列。相关文章推荐
- JDK容器与并发—Queue—LinkedBlockingQueue
- JDK容器与并发—Queue—ConcurrentLinkedQueue
- Java 并发容器和框架--ConcurrentLinkedQueue
- JDK并发工具类源码学习系列——LinkedBlockingQueue
- JDK容器与并发—Queue—SynchronousQueue
- 并发基础_11_并发_容器_ConcurrentLinkedQueue
- JDK容器与并发—List—LinkedList
- JDK容器与并发—Queue—DelayQueue
- Java并发容器——ConcurrentLinkedQueue
- JDK容器与并发—Queue—PriorityBlockingQueue
- JDK容器与并发—Queue—PriorityQueue
- 【死磕Java并发】-----J.U.C之阻塞队列:LinkedTransferQueue
- 从并发容器ConcurrentLinkedQueue看解决并发问题的设计思路
- Java并发容器之非阻塞队列ConcurrentLinkedQueue
- 【死磕Java并发】-----J.U.C之Java并发容器:ConcurrentLinkedQueue
- Java并发容器ConcurrentHashMap、ConcurrentLinkedQueue、BlockingQueue等
- 高并发第十三弹:J.U.C 队列 SynchronousQueue.ArrayBlockingQueue.LinkedBlockingQueue.LinkedTransferQueue
- Java多线程--并发中集合的使用之LinkedTransferQueue
- Java 并发 --- 阻塞队列之LinkedTransferQueue源码分析
- Java并发容器:ConcurrentLinkedQueue