您的位置:首页 > 产品设计 > UI/UE

ConcurrentLinkedQueue的实现

2017-09-03 12:01 387 查看
        今天我们介绍一下ConcurrentLinkedQueue的内部实现。从名字就可以看出来,其内部使用链表实现。下面介绍一下它的结构:

基本结构



        单单从类图看来,结构不算复杂,有两个重要的属性就是head和tail。
方法操作
CAS操作

        Node节点里面有几个CAS的操作,介绍一下:

// CAS设置Node的值,cmp 期望值、val 目标值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// CAS设置next的节点,val 目标值
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS设置next的节点,cmp 期望值、val 目标值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}offer()

        offer()操作就是入队操作,即在队未添加节点,并且在这一过程中移动head和tail指针。其过程如下所示:



        从上图可以看出,网队列中添加元素的时候,tail指针并不是一直在队列末端的,而是隔一个Node改变一次位置。下面看一下代码实现:

public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 无锁操作
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个节点
if (p.casNext(null, newNode)) {
if (p != t) // 每2次更新tail指针位置
casTail(t, newNode);
return true;
}
// CAS竞争失败,再次尝试
}
else if (p == q)
// 当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。
p = (t != (t = tail)) ? t : head;
else
// 取下一个Node或者最后一个Node
p = (p != t && t != (t = tail)) ? t : q;
}
}        从上面代码可以看出,先从一个空队列开始添加节点。添加第1个节点时,执行到if(q == null)这个模块,然后插入第一个Node,但是此时并没有更新tail指针。

        添加第2个节点时,q为第1个节点,所以进入到第3个判断。此时p取的是下一个节点q 或者最后一个节点t。然后再次循环到if(q == null) ,此时插入节点后,执行了tail更新。

        程序执行的过程中,还有一种情况是:当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。如下图所示:



        注意:下面解释一下这一行代码:

p = (t != (t = tail)) ? t : head;        这里面只有多线程的情况下才会出现:t != (t = tail),程序首先会获取 !=号左边的t值,然后计算右边的值,此时如果tail被其他线程修改了。就有可能造成!=的发生,从而程序“打赌”直接取了t(也就是刚刚更新的tail)。
poll()

        poll()操作是从队列头部取出Node,然后更新head指针的位置。下面看一下其具体的流程:



         从上面流程图可以看出,当队列取出第1个Node时,head位置更新了,当取出第2个Node时,并没有更新head。这个变更和入队时tail的变更类似。下面看一下具体代码实现:

public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

if (item != null && p.casItem(item, null)) {
// 设置Node的item为null,此时Node并未移除
if (p != h) // 2次更新head,移除Node
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {// 队列中无Node时
updateHead(h, p);
return null;
}
else if (p == q)// 出现哨兵节点,全部重新开始
continue restartFromHead;
else
p = q; //空Node时,向后移动
}
}
}        从上面的代码,设置一个Node的场景。然后从队列中poll()元素2次,第1次poll()时,会进入到第4个判断,然后p向后移动。再次循环时,进入到第1个判断,此时出队,并且更新head指针。

        然后执行第2个poll()时,直接进入到第2个判断,然后更新head指针,返回null。

        在多线程操作的时候,会进入到第三个判断。此时Node的next会指向自己,因此会重新遍历链表,节点会出现一下情况:



参考:《java高并发程序编程》

链接:http://moguhu.com/article/detail?articleId=38
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息