Java多线程 -- JUC包源码分析15 -- SynchronousQueue与CachedThreadPool
2016-09-11 16:19
531 查看
在前面分析工具类Executors的时候,提到了CachedThreadPool:其线程数会无限增大,每来一个新请求,就会new一个Thread,其maxPoolSize = Integer.MAX。
其构造函数如下:
至所以会达到这个效果:就是因为SynchronousQueue本身没有容量,上面的wokerQueue.offer(command)函数,永远返回空,所以就会一直走到下面的addIfUnderMaximuxPoolSize里面。
下面就来详细分析SynchronousQueue
先调put,线程会阻塞在那;直到另外一个线程调用了take,2个线程才同时解锁。反之亦然。
对于多个线程,比如3个,调用3次put,3个都会阻塞在那;直到等另外的线程,调用3次take,大家才同时解锁。反之亦然。
这里就会有1个问题:先调用了3次put,那调用take的时候,是首先唤醒哪一个put线程呢?第1个,还是最后一个呢?
这就涉及到2种不同的模式:公平模式(队列模式) 和 非公平模式(栈模式)。
队列模式:最先调用put的线程,最先被take唤醒
栈模式:最后调用put的线程,最先被take唤醒。
从代码可以看出,无论是put/take/offer/poll,都是调用的transfer.transfer函数,只是传进去的参数不一样而已。
那这个Transfer是什么呢?看代码知道,Transfer是个接口,有TransferQueue/TransferStack 2种实现,这2个实现,都是SynchronousQueue的内部类,其结果如下:
从上面代码可以看出,2者结构都是一个单向链表。对于栈,只需要维护head结点;对于队列,维护head + tail结点。
put的时候,new一个item != null的结点;take的时候,new一个item = null的结点。从head开始遍历,遇到的结点和自己同类型的,说明没有匹配者,自己也加入链表;遇到和自己不同类型的,尝试匹配。
加上链表的时候,如果是栈模式,加在头部;如果是队列模式,加在尾部。如下图所示:
![](http://img.blog.csdn.net/20160911160146573)
这里面有一个关键点:
无论是栈的实现,还是队列的实现,链表本身是没有加锁的。因此在多线程访问下,就会有弱一致性问题,inconsistent read问题。
但这个不会出问题,因为匹配上,那是最好;匹配不上,出现inconsistent read,for循环回来再重新读,直到匹配上了,线程才会解锁。
也正因为如此,上面的代码中,有诸多的double check逻辑。
其构造函数如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //ThreadPoolExecutor的execute函数: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
至所以会达到这个效果:就是因为SynchronousQueue本身没有容量,上面的wokerQueue.offer(command)函数,永远返回空,所以就会一直走到下面的addIfUnderMaximuxPoolSize里面。
下面就来详细分析SynchronousQueue
SynchronousQueue使用方式
SynchronousQueue最大的特点就是put/take是成对调用的:先调put,线程会阻塞在那;直到另外一个线程调用了take,2个线程才同时解锁。反之亦然。
对于多个线程,比如3个,调用3次put,3个都会阻塞在那;直到等另外的线程,调用3次take,大家才同时解锁。反之亦然。
这里就会有1个问题:先调用了3次put,那调用take的时候,是首先唤醒哪一个put线程呢?第1个,还是最后一个呢?
这就涉及到2种不同的模式:公平模式(队列模式) 和 非公平模式(栈模式)。
队列模式:最先调用put的线程,最先被take唤醒
栈模式:最后调用put的线程,最先被take唤醒。
SynchronousQueue实现
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private transient volatile Transferer transferer; public void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); if (transferer.transfer(o, false, 0) == null) { //第一个参数为put进去的obejct Thread.interrupted(); throw new InterruptedException(); } } public E take() throws InterruptedException { Object e = transferer.transfer(null, false, 0); //第1个参数为空,返回值是上面put进去的object if (e != null) return (E)e; Thread.interrupted(); throw new InterruptedException(); } public E poll() { return (E)transferer.transfer(null, true, 0); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; } 。。。 }
从代码可以看出,无论是put/take/offer/poll,都是调用的transfer.transfer函数,只是传进去的参数不一样而已。
那这个Transfer是什么呢?看代码知道,Transfer是个接口,有TransferQueue/TransferStack 2种实现,这2个实现,都是SynchronousQueue的内部类,其结果如下:
static final class TransferStack extends Transferer { static final class SNode { volatile SNode next; // next node in stack volatile SNode match; // the node matched to this volatile Thread waiter; // to control park/unpark Object item; // data; or null for REQUESTs int mode; ... } volatile SNode head; ... } static final class TransferQueue extends Transferer { static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; ... } transient volatile QNode head; transient volatile QNode tail; }
从上面代码可以看出,2者结构都是一个单向链表。对于栈,只需要维护head结点;对于队列,维护head + tail结点。
实现思路
不管是栈,还是队列,其基本实现思路类似:put的时候,new一个item != null的结点;take的时候,new一个item = null的结点。从head开始遍历,遇到的结点和自己同类型的,说明没有匹配者,自己也加入链表;遇到和自己不同类型的,尝试匹配。
加上链表的时候,如果是栈模式,加在头部;如果是队列模式,加在尾部。如下图所示:
transfer代码
//TransferQueue.transfer Object transfer(Object e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin if (h == t || t.isData == isData) { //队列为空,或者head结点和自己同类型 QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null)? x : e; } else { //队列不为空,head和自己不同类型 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null)? x : e; } } } //TransferStack.transfer Object transfer(Object e, boolean timed, long nanos) { SNode s = null; int mode = (e == null)? REQUEST : DATA; for (;;) { //for循环里面3大分支 SNode h = head; if (h == null || h.mode == mode) { //case1:栈为空,或者栈顶元素和自己同类型 if (timed && nanos <= 0) { // 关键点:offer函数就走的这个逻辑,offer(e, ture, 0) 一直会返回null if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return mode == REQUEST? m.item : s.item; } } else if (!isFulfilling(h.mode)) { //case 2: 和自己不同类型,并且没有其他线程fulfilling这个结点,进入 if (h.isCancelled()) casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (mode == REQUEST)? m.item : s.item; } else // lost match s.casNext(m, mn); // help unlink } } } else { //case 3: 和自己不同类型,但现在有其他线程正在fulfilling此结点 SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
这里面有一个关键点:
无论是栈的实现,还是队列的实现,链表本身是没有加锁的。因此在多线程访问下,就会有弱一致性问题,inconsistent read问题。
但这个不会出问题,因为匹配上,那是最好;匹配不上,出现inconsistent read,for循环回来再重新读,直到匹配上了,线程才会解锁。
也正因为如此,上面的代码中,有诸多的double check逻辑。
相关文章推荐
- Java多线程 -- JUC包源码分析19 -- ForkJoinPool/ForkJoinTask
- Java多线程 -- JUC包源码分析9 -- AbstractQueuedSynchronizer深入分析-- Semaphore与CountDownLatch
- Java多线程 -- JUC包源码分析14 -- ScheduledThreadPoolExecutor与DelayQueue源码分析
- java再复习——多线程之初识线程,并从源码角度分析start与run方法,Thread类与Runnable接口
- Java多线程 -- JUC包源码分析10 -- ConcurrentLinkedQueue源码分析
- Java多线程 -- JUC包源码分析3-- volatile/final语义
- Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueu
- Java多线程 -- JUC包源码分析4 -- 各种锁与无锁
- Java多线程 -- JUC包源码分析3-- volatile/final语义
- Java多线程 -- JUC包源码分析17 -- 弱一致性与无锁队列
- Java多线程 -- JUC包源码分析7 -- 对Interrupt的深刻理解
- Java多线程 -- JUC包源码分析13 -- Callable/FutureTask源码分析
- Java多线程 -- JUC包源码分析16 -- Exchanger源码分析
- Java多线程 -- JUC包源码分析11 -- CyclicBarrier源码分析
- Java多线程 -- JUC包源码分析4 -- 各种锁与无锁
- Java多线程 -- JUC包源码分析1 -- CAS/乐观锁
- Java多线程 -- JUC包源码分析2 -- Copy On Write/CopyOnWriteArrayList/CopyOnWriteArraySet
- Java多线程 -- JUC包源码分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/无锁链表
- Java多线程 -- JUC包源码分析12 -- ThreadPoolExecutor源码分析
- Java多线程 -- JUC包源码分析8 -- 对happen before的深刻理解