您的位置:首页 > 产品设计 > UI/UE

Java多线程 -- JUC包源码分析15 -- SynchronousQueue与CachedThreadPool

2016-09-11 16:19 531 查看
在前面分析工具类Executors的时候,提到了CachedThreadPool:其线程数会无限增大,每来一个新请求,就会new一个Thread,其maxPoolSize = Integer.MAX。

其构造函数如下:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

//ThreadPoolExecutor的execute函数:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}


至所以会达到这个效果:就是因为SynchronousQueue本身没有容量,上面的wokerQueue.offer(command)函数,永远返回空,所以就会一直走到下面的addIfUnderMaximuxPoolSize里面。

下面就来详细分析SynchronousQueue

SynchronousQueue使用方式

SynchronousQueue最大的特点就是put/take是成对调用的:

先调put,线程会阻塞在那;直到另外一个线程调用了take,2个线程才同时解锁。反之亦然。

对于多个线程,比如3个,调用3次put,3个都会阻塞在那;直到等另外的线程,调用3次take,大家才同时解锁。反之亦然。

这里就会有1个问题:先调用了3次put,那调用take的时候,是首先唤醒哪一个put线程呢?第1个,还是最后一个呢?

这就涉及到2种不同的模式:公平模式(队列模式) 和 非公平模式(栈模式)。

队列模式:最先调用put的线程,最先被take唤醒

栈模式:最后调用put的线程,最先被take唤醒。

SynchronousQueue实现

public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private transient volatile Transferer transferer;

public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) {  //第一个参数为put进去的obejct
Thread.interrupted();
throw new InterruptedException();
}
}

public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0);  //第1个参数为空,返回值是上面put进去的object
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}

public E poll() {
return (E)transferer.transfer(null, true, 0);
}

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
。。。
}


从代码可以看出,无论是put/take/offer/poll,都是调用的transfer.transfer函数,只是传进去的参数不一样而已。

那这个Transfer是什么呢?看代码知道,Transfer是个接口,有TransferQueue/TransferStack 2种实现,这2个实现,都是SynchronousQueue的内部类,其结果如下:

static final class TransferStack extends Transferer {
static final class SNode {
volatile SNode next;        // next node in stack
volatile SNode match;       // the node matched to this
volatile Thread waiter;     // to control park/unpark
Object item;                // data; or null for REQUESTs
int mode;
...
}

volatile SNode head;

...
}

static final class TransferQueue extends Transferer {
static final class QNode {
volatile QNode next;          // next node in queue
volatile Object item;         // CAS'ed to or from null
volatile Thread waiter;       // to control park/unpark
final boolean isData;
...
}

transient volatile QNode head;
transient volatile QNode tail;
}


从上面代码可以看出,2者结构都是一个单向链表。对于栈,只需要维护head结点;对于队列,维护head + tail结点。

实现思路

不管是栈,还是队列,其基本实现思路类似:

put的时候,new一个item != null的结点;take的时候,new一个item = null的结点。从head开始遍历,遇到的结点和自己同类型的,说明没有匹配者,自己也加入链表;遇到和自己不同类型的,尝试匹配。

加上链表的时候,如果是栈模式,加在头部;如果是队列模式,加在尾部。如下图所示:



transfer代码

//TransferQueue.transfer
Object transfer(Object e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)         // saw uninitialized value
continue;                       // spin

if (h == t || t.isData == isData) { //队列为空,或者head结点和自己同类型
QNode tn = t.next;
if (t != tail)            // inconsistent read
continue;
if (tn != null) {               // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0)        // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s))        // failed to link in
continue;

advanceTail(t, s);              // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) {                   // wait was cancelled
clean(t, s);
return null;
}

if (!s.isOffList()) {           // not already unlinked
advanceHead(t, s);          // unlink if head
if (x != null)              // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null)? x : e;

} else {                            //队列不为空,head和自己不同类型
QNode m = h.next;               // node to fulfill
if (t != tail || m == null || h != head)
continue;                   // inconsistent read

Object x = m.item;
if (isData == (x != null) ||    // m already fulfilled
x == m ||                   // m cancelled
!m.casItem(x, e)) {         // lost CAS
advanceHead(h, m);          // dequeue and retry
continue;
}

advanceHead(h, m);              // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null)? x : e;
}
}
}

//TransferStack.transfer
Object transfer(Object e, boolean timed, long nanos) {
SNode s = null;
int mode = (e == null)? REQUEST : DATA;

for (;;) {  //for循环里面3大分支
SNode h = head;
if (h == null || h.mode == mode) {  //case1:栈为空,或者栈顶元素和自己同类型
if (timed && nanos <= 0) {      // 关键点:offer函数就走的这个逻辑,offer(e, ture, 0) 一直会返回null
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) {               // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next);     // help s's fulfiller
return mode == REQUEST? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) { //case 2: 和自己不同类型,并且没有其他线程fulfilling这个结点,进入
if (h.isCancelled())
casHead(h, h.next);         // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next;       // m is s's match
if (m == null) {        // all waiters are gone
casHead(s, null);   // pop fulfill node
s = null;           // use new node next time
break;              // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn);     // pop both s and m
return (mode == REQUEST)? m.item : s.item;
} else                  // lost match
s.casNext(m, mn);   // help unlink
}
}
} else {    //case 3: 和自己不同类型,但现在有其他线程正在fulfilling此结点
SNode m = h.next;               // m is h's match
if (m == null)                  // waiter is gone
casHead(h, null);           // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h))          // help match
casHead(h, mn);         // pop both h and m
else                        // lost match
h.casNext(m, mn);       // help unlink
}
}
}
}


这里面有一个关键点:

无论是栈的实现,还是队列的实现,链表本身是没有加锁的。因此在多线程访问下,就会有弱一致性问题,inconsistent read问题。

但这个不会出问题,因为匹配上,那是最好;匹配不上,出现inconsistent read,for循环回来再重新读,直到匹配上了,线程才会解锁。

也正因为如此,上面的代码中,有诸多的double check逻辑。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐