- ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列。
- 采用FIFO对节点排序
- 采用CAS实现非阻塞
1. ConcurrentLinkedQueue结构
- 由head和tail节点组成
- 每个节点(Node)由节点元素(item)和指向下一个节点的指针(next)组成
2. 入列
- 入列就是将入列节点添加到队列尾部
- 入列源码:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
- 获取p节点的下一个节点
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
- 设置入队节点为尾节点
casNext(null, n)可以将入队节点置为尾节点的next节点,p为null就表示p是队列尾节点了,如果不为空,说明其他线程更新了尾节点,需要重新获取
3. 出列
获取头节点的元素,然后判断头结点是否为空,为空说明其他线程已经获取了该节点,如果不为空,CAS将头结点设置为null,成功返回头结点,不成功重新获取。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
网友评论