ConcurrentLinkedQueue的实现
2017-09-03 12:01
387 查看
今天我们介绍一下ConcurrentLinkedQueue的内部实现。从名字就可以看出来,其内部使用链表实现。下面介绍一下它的结构:
基本结构
单单从类图看来,结构不算复杂,有两个重要的属性就是head和tail。
方法操作
CAS操作
Node节点里面有几个CAS的操作,介绍一下:
// CAS设置Node的值,cmp 期望值、val 目标值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// CAS设置next的节点,val 目标值
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS设置next的节点,cmp 期望值、val 目标值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}offer()
offer()操作就是入队操作,即在队未添加节点,并且在这一过程中移动head和tail指针。其过程如下所示:
从上图可以看出,网队列中添加元素的时候,tail指针并不是一直在队列末端的,而是隔一个Node改变一次位置。下面看一下代码实现:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 无锁操作
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个节点
if (p.casNext(null, newNode)) {
if (p != t) // 每2次更新tail指针位置
casTail(t, newNode);
return true;
}
// CAS竞争失败,再次尝试
}
else if (p == q)
// 当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。
p = (t != (t = tail)) ? t : head;
else
// 取下一个Node或者最后一个Node
p = (p != t && t != (t = tail)) ? t : q;
}
} 从上面代码可以看出,先从一个空队列开始添加节点。添加第1个节点时,执行到if(q == null)这个模块,然后插入第一个Node,但是此时并没有更新tail指针。
添加第2个节点时,q为第1个节点,所以进入到第3个判断。此时p取的是下一个节点q 或者最后一个节点t。然后再次循环到if(q == null) ,此时插入节点后,执行了tail更新。
程序执行的过程中,还有一种情况是:当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。如下图所示:
注意:下面解释一下这一行代码:
p = (t != (t = tail)) ? t : head; 这里面只有多线程的情况下才会出现:t != (t = tail),程序首先会获取 !=号左边的t值,然后计算右边的值,此时如果tail被其他线程修改了。就有可能造成!=的发生,从而程序“打赌”直接取了t(也就是刚刚更新的tail)。
poll()
poll()操作是从队列头部取出Node,然后更新head指针的位置。下面看一下其具体的流程:
从上面流程图可以看出,当队列取出第1个Node时,head位置更新了,当取出第2个Node时,并没有更新head。这个变更和入队时tail的变更类似。下面看一下具体代码实现:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 设置Node的item为null,此时Node并未移除
if (p != h) // 2次更新head,移除Node
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {// 队列中无Node时
updateHead(h, p);
return null;
}
else if (p == q)// 出现哨兵节点,全部重新开始
continue restartFromHead;
else
p = q; //空Node时,向后移动
}
}
} 从上面的代码,设置一个Node的场景。然后从队列中poll()元素2次,第1次poll()时,会进入到第4个判断,然后p向后移动。再次循环时,进入到第1个判断,此时出队,并且更新head指针。
然后执行第2个poll()时,直接进入到第2个判断,然后更新head指针,返回null。
在多线程操作的时候,会进入到第三个判断。此时Node的next会指向自己,因此会重新遍历链表,节点会出现一下情况:
参考:《java高并发程序编程》
链接:http://moguhu.com/article/detail?articleId=38
基本结构
单单从类图看来,结构不算复杂,有两个重要的属性就是head和tail。
方法操作
CAS操作
Node节点里面有几个CAS的操作,介绍一下:
// CAS设置Node的值,cmp 期望值、val 目标值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// CAS设置next的节点,val 目标值
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// CAS设置next的节点,cmp 期望值、val 目标值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}offer()
offer()操作就是入队操作,即在队未添加节点,并且在这一过程中移动head和tail指针。其过程如下所示:
从上图可以看出,网队列中添加元素的时候,tail指针并不是一直在队列末端的,而是隔一个Node改变一次位置。下面看一下代码实现:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
// 无锁操作
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个节点
if (p.casNext(null, newNode)) {
if (p != t) // 每2次更新tail指针位置
casTail(t, newNode);
return true;
}
// CAS竞争失败,再次尝试
}
else if (p == q)
// 当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。
p = (t != (t = tail)) ? t : head;
else
// 取下一个Node或者最后一个Node
p = (p != t && t != (t = tail)) ? t : q;
}
} 从上面代码可以看出,先从一个空队列开始添加节点。添加第1个节点时,执行到if(q == null)这个模块,然后插入第一个Node,但是此时并没有更新tail指针。
添加第2个节点时,q为第1个节点,所以进入到第3个判断。此时p取的是下一个节点q 或者最后一个节点t。然后再次循环到if(q == null) ,此时插入节点后,执行了tail更新。
程序执行的过程中,还有一种情况是:当p==q时,是由于遇到了哨兵节点(sentinel)导致的。哨兵节点是指next指向自己的节点,此时节点有可能已被删除或者为空节点。因为此时无法从next获取后续节点,所以只能从head从新遍历。tail被其他线程修改的时候,此时程序对这种情况打了一次赌,即从tail开始遍历。如下图所示:
注意:下面解释一下这一行代码:
p = (t != (t = tail)) ? t : head; 这里面只有多线程的情况下才会出现:t != (t = tail),程序首先会获取 !=号左边的t值,然后计算右边的值,此时如果tail被其他线程修改了。就有可能造成!=的发生,从而程序“打赌”直接取了t(也就是刚刚更新的tail)。
poll()
poll()操作是从队列头部取出Node,然后更新head指针的位置。下面看一下其具体的流程:
从上面流程图可以看出,当队列取出第1个Node时,head位置更新了,当取出第2个Node时,并没有更新head。这个变更和入队时tail的变更类似。下面看一下具体代码实现:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// 设置Node的item为null,此时Node并未移除
if (p != h) // 2次更新head,移除Node
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {// 队列中无Node时
updateHead(h, p);
return null;
}
else if (p == q)// 出现哨兵节点,全部重新开始
continue restartFromHead;
else
p = q; //空Node时,向后移动
}
}
} 从上面的代码,设置一个Node的场景。然后从队列中poll()元素2次,第1次poll()时,会进入到第4个判断,然后p向后移动。再次循环时,进入到第1个判断,此时出队,并且更新head指针。
然后执行第2个poll()时,直接进入到第2个判断,然后更新head指针,返回null。
在多线程操作的时候,会进入到第三个判断。此时Node的next会指向自己,因此会重新遍历链表,节点会出现一下情况:
参考:《java高并发程序编程》
链接:http://moguhu.com/article/detail?articleId=38
相关文章推荐
- Java使用ConcurrentLinkedQueue实现简易数据库连接池
- 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
- ConcurrentLinkedQueue 的实现原理分析
- ConcurrentLinkedQueue的实现原理分析
- ConcurrentLinkedQueue的实现原理和源码分析
- 聊聊并发——ConcurrentLinkedQueue的实现原理分析
- ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
- ConcurrentLinkedQueue的实现原理和源码分析
- ConcurrentLinkedQueue的实现原理分析
- Java Thread&Concurrency(13): 深入理解ConcurrentLinkedQueue及其实现原理
- 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
- ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- ConcurrentLinkedQueue的实现原理分析
- 聊聊并发(六)ConcurrentLinkedQueue的实现原理分析
- java 非阻塞算法在并发容器中的实现(ConcurrentLinkedQueue源码)
- 使用zmq,probuf,缓冲池实现序列化和反序列化框架(一)-使用ConcurrentLinkedQueue实现缓冲池