[TOC]
ConcurrentlinkedQueue 还是一个基于链表的,×××的,线程安全的单端队列,它采用先进先出(FIFO)的规则对节点进行排序,当我们加入一个元素时,它会插入队列的尾部,当我们获取元素时,会从队列的首部获取元素。它没有使用锁来保证线程安全,使用的是“wait-free”算法来保证整个队列的线程安全。
// 存储的数据
volatile E item;
// 下一个节点引用
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
// 构造一个node节点
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// 修改节点的item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 懒修改节点的next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// cas修改节点的next节点
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
构造节点是其实就是构造了一个node的item为null的节点,然后head和tail指向这个节点,如下图所示:
public boolean add(E e) {
return offer(e);
}
我们可以看出其实调用的是offer方法,具体参考offer方法的讲解。
源码解析:
public boolean offer(E e) {
// 入队元素不能为null
checkNotNull(e);
// 创建新的节点
final Node<E> newNode = new Node<E>(e);
// 死循环,设置节点
// p获取尾节点
for (Node<E> t = tail, p = t;;) {
// q是p的next节点
Node<E> q = p.next;
// 获取尾节点的next节点
// 尾节点没有下一个节点
if (q == null) {
// p is last node
// 这一步说明p是尾节点,新的节点设置为尾节点的next节点
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
// 设置尾节点,当之前的尾节点和现在插入的节点之间有一个节点时
// 并不是每一次都cas设置尾节点(优化手段,是怎么想到这种优化的??)
if (p != t) // hop two nodes at a time
// cas设置尾节点,可能会失败
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
// 所以这里需要重新找新的head
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// 寻找尾节点
// Check for tail updates after two hops.
// p!=t
p = (p != t && t != (t = tail)) ? t : q;
}
}
分析插入过程,我们插入使用3个线程来调用offer 方法,ThreadA,ThreadB同时运行,ThreadC最后插入,分析下offer方法的流程。
第一步,队列属于初始化状态,ThreadA,ThreadB同时调用offer方法;创建节点,死循环设置节点,获取尾节点的next节点,此时q== null,两个线程都同时可能看见,然后cas设置尾节点的next节点(队列状态如图A所示),我们假设是ThreadA线程cas设置成功了,然后p==t此时的尾节点其实没有发生变化;此时我们来看ThreadB由于A成功了,所以ThreadB cas失败了,重新循环,此时q != null了,p == q显然不等于,再看下一个else判断p!=t,此时显然p == t,所以才是p = q,然后再次循环,此时的q==null,我们假设没有线程来和ThreadB竞争,所以cas设置成功,然后p!=t吗,显然满足所以设置尾节点,此时的设置尾节点的节点和之前的尾节点之间刚刚好有一个节点(如图B所示)。
第二步,ThreadC插入,此时的尾节点是ThreadB插入的节点假设是B,获取B的next节点,q == null,然后cas设置节点,完成,p==t,所以不用更新尾节点(如图C所示)。
注意:不会删除元素,要和poll方法区别
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
// 更新头结点
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
public E poll() {
restartFromHead:
// 循环
for (;;) {
// 获取头结点
for (Node<E> h = head, p = h, q;;) {
// 获取节点的内容
E item = p.item;
// item不为null ,使用cas设置item为空
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// 更新头结点,和尾节点一样,不是每次都更新
// 头结点item为null是,下个节点就必须更新头结点
// 头结点item不为null时,规则和更新尾节点一样
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// p == q,说明别的线程调用了updateHead,
// 自己的next 指向了自己,重新循环,获取最新的头结点
else if (p == q)
continue restartFromHead;
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
else
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
分析我们按照offer时候元素来执行poll方法,ThreadD和ThreadE同时执行来分析下队列的变化(主要分析p==q的产生)。
初始状态(如图C所示)
第一步,ThreadD和ThreadE执行poll操作,item等于null,所以执行执行下面的操作(q = p.next) == null不等于,p == q不等于,所以p = q,其实就是上图的ThreadA插入的节点,此时的item已经不为null了,所以执行cas设置item为null的操作,假设ThreadD执行成功了,那么此时p!=h就满足了,所以此时要更新头结点调用updateHead,这个方法会更新头结点,并且把原来的头节点的next设置为自己)(如图D所示);接下我们分析ThreadE,cas失败了需要重新执行,此时的item已经不为null,所以执行执行下面的操作(q = p.next) == null不等于,p == q这使其实已经是等于了,因为ThreadD改变了了以前头结点的next节点为自己,所以需要重新遍历,获取最新的头结点,此时的头结点其实就是ThreadA插入的节点,然后item为null,接着执行下面的判断,最终p就是p.next节点也就是ThreadB节点,然后cas设置item为null,由于p=p.next,所以p发生了变化,所以需要设置ThreadB为头结点(如图E所示)。
看到上面的执行流程可能就有人有疑问了,这不是每次都更新头结点吗,没有优化啊,只看poll方法确实是这样,那什么时候会产生不是每次都更新头节点了,那就是当头节点的item不为null的时候,但是如果按初始化的状况来看,头结点的item一直是null,但是当我看了peek方法之后才发现,peek可以改变这个情况,可以设置item不为null的头结点,其实我们可以在poll方法前调用下peek方法,其实就启动了优化策略,不是每次更新头结点,不知道作者是不是这么设计的,反正就是牛皮。
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
我们可以发现size没有加锁,就是遍历了整个队列,但是遍历的同时可能在发生poll或者offer,所以size不是特别的精确,用的时候要注意。
ConcurrentLinkedQueue是×××的队列,所以使用时一定要注意内存溢出的问题,还有在执行size方法时一定要注意这个是不准确的值;在学poll和offer方法时,一定要理解更新head和tail节点的时机,这种优化手段值得我们去学习,我觉得这就是学习源码的作用,就是学习作者的源码思想。
参考《Java 并发编程的艺术》
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。