1.官方文档
请参考并发容器BlockingQueue - LinkedTransferQueue官方文档
2.队列结点、域以及构造器
/**
* Queue nodes. Uses Object, not E, for items to allow forgetting
* them after use. Relies heavily on Unsafe mechanics to minimize
* unnecessary ordering constraints: Writes that are intrinsically
* ordered wrt other accesses or CASes use simple relaxed forms.
*/
static final class Node {
final boolean isData; // false if this is a request node
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null until waiting
重要的域:
/** head of the queue; null until first enqueue */
transient volatile Node head;
/** tail of the queue; null until first append */
private transient volatile Node tail;
/** The number of apparent failures to unsplice removed nodes */
private transient volatile int sweepVotes;
构造器
public LinkedTransferQueue() {
}
3.入队、出队操作
抛出异常版本(因为是无界,所以不会抛出异常):
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean remove(Object o) {
return findAndRemove(o);
}
返回特殊值、超时版本:
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public E poll() {
return xfer(null, false, NOW, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E peek() {
return firstDataItem();
}
阻塞版本(因为是无界队列,所以put不会阻塞):
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
这些入队和出队的方法最后都统一调用xfer
另外还有两个方法transfer和tryTransfer,该入队方法是会阻塞的:
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
4.xfer
参数解释:
- e —— 入队为e,出队为null
- havaData —— 入队为true,出队为false
- how —— NOW, ASYNC, SYNC, or TIMED
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer - nanos —— 在how为TIMED时使用,表示超时时长
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // unmatched
if (isData == haveData) // can't match
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
- 不允许放入null元素,会抛出异常。
- 整体流程分为两大步骤:
1)如果模式不一样,会从头到尾寻找还未匹配的元素进行匹配(使用了松弛队列算法,不是每次都更新head)
2)如果模式一样,或者队列中没有元素,就入队 - 入队分为四种模式(只有超时版本的poll和take才会阻塞线程):
NOW——立即返回,对于无超时的poll(),会立即返回,不入队。
ASYNC——元素入队,但线程不会阻塞,offer, put, add(无限队列)
SYNC——元素入队并阻塞线程,针对take操作,此时队列为空;以及transfer入队操作
TIMED——入队阻塞指定时间,针对超时版本的poll及tryTransfer
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
入队操作tryAppend()使用了松弛队列的算法,不是每一次都更新tail。
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
unsplice(pred, s);
return e;
}
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
LockSupport.park(this);
}
}
}
awaitMatch入队后自旋或阻塞,阻塞了就等待被其它线程匹配到并唤醒。
关于Thread.yield()作用,主要是减轻自旋对繁忙系统的影响,因为自旋会消耗cpu,自旋也会消耗cpu,等于是无故浪费cpu资源:
During spins threads check their interrupt status and generate a thread-local random number to decide to occasionally perform a Thread.yield. While yield has underdefined specs, we assume that it might help, and will not hurt, in limiting impact of spinning on busy systems.
5.实例
参考LinkedTransferQueue原理分析
操作序列:tansfer(a), transfer(b), tansfer(c), take(), take(), take(), transfer(d)/take()。
-
step1.thread1、thread2、thread3分别进行tansfer(a)、tansfer(b)、tansfer(c)
-
step2.线程thread4执行take()时, 找到匹配的节点, 置位null。 并唤醒thread1; thread1还会将item=this, waiter=null
-
step3.线程thread5执行take()时, 找到匹配的节点, 置位null, 并唤醒thread2. 发现当前匹配节点不是head节点, 那么则移动head节点直到松弛距离为2。 thread2还会将item=this, waiter=null
-
step4.线程thread6执行take()时, 找到匹配的节点, 置位null。 并唤醒thread3; thread3还会将item=this, waiter=null
-
step5-1.若此时线程thread7进行了tansfer(d), 仅仅将线程thread7以数据节点添加至队列中
-
step5-2.若此时线程thread7进行了take(), 此时也添加到了末尾, 注意此时队列模式发生了变化。
在thread7进行了take()的基础上, thread8线程进行了put(4)操作:
6.总结
- 使用了ConcurrentLinkedQueue松弛算法,“松弛阀值”为2,不是每次都更新head/tail,减少了CAS次数
- 使用了SynchronousQueue的双重队列,因为是无界队列,因此put是异步的,不会阻塞。
- 对于超时poll()和take(),阻塞前会自旋一会,这点也与SynchronousQueue机制一样











网友评论