美文网首页
并发容器BlockingQueue - LinkedTransf

并发容器BlockingQueue - LinkedTransf

作者: 王侦 | 来源:发表于2019-07-13 10:37 被阅读0次

1.官方文档

请参考并发容器BlockingQueue - LinkedTransferQueue官方文档

2.队列结点、域以及构造器

    /**
     * Queue nodes. Uses Object, not E, for items to allow forgetting
     * them after use.  Relies heavily on Unsafe mechanics to minimize
     * unnecessary ordering constraints: Writes that are intrinsically
     * ordered wrt other accesses or CASes use simple relaxed forms.
     */
    static final class Node {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; // null until waiting

重要的域:

    /** head of the queue; null until first enqueue */
    transient volatile Node head;

    /** tail of the queue; null until first append */
    private transient volatile Node tail;

    /** The number of apparent failures to unsplice removed nodes */
    private transient volatile int sweepVotes;

构造器

    public LinkedTransferQueue() {
    }

3.入队、出队操作

抛出异常版本(因为是无界,所以不会抛出异常):

    public boolean add(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    public boolean remove(Object o) {
        return findAndRemove(o);
    }

返回特殊值、超时版本:

    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    public boolean offer(E e, long timeout, TimeUnit unit) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

    public E poll() {
        return xfer(null, false, NOW, 0);
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }

    public E peek() {
        return firstDataItem();
    }

阻塞版本(因为是无界队列,所以put不会阻塞):

    public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }

    public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

这些入队和出队的方法最后都统一调用xfer

另外还有两个方法transfer和tryTransfer,该入队方法是会阻塞的:

    public void transfer(E e) throws InterruptedException {
        if (xfer(e, true, SYNC, 0) != null) {
            Thread.interrupted(); // failure possible only due to interrupt
            throw new InterruptedException();
        }
    }

    public boolean tryTransfer(E e) {
        return xfer(e, true, NOW, 0) == null;
    }

    public boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

4.xfer

参数解释:

  • e —— 入队为e,出队为null
  • havaData —— 入队为true,出队为false
  • how —— NOW, ASYNC, SYNC, or TIMED
    private static final int NOW = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer
  • nanos —— 在how为TIMED时使用,表示超时时长
    private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed

        retry:
        for (;;) {                            // restart on append race

            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                if (item != p && (item != null) == isData) { // unmatched
                    if (isData == haveData)   // can't match
                        break;
                    if (p.casItem(item, e)) { // match
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }

            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
  • 不允许放入null元素,会抛出异常。
  • 整体流程分为两大步骤:
    1)如果模式不一样,会从头到尾寻找还未匹配的元素进行匹配(使用了松弛队列算法,不是每次都更新head)
    2)如果模式一样,或者队列中没有元素,就入队
  • 入队分为四种模式(只有超时版本的poll和take才会阻塞线程):
    NOW——立即返回,对于无超时的poll(),会立即返回,不入队。
    ASYNC——元素入队,但线程不会阻塞,offer, put, add(无限队列)
    SYNC——元素入队并阻塞线程,针对take操作,此时队列为空;以及transfer入队操作
    TIMED——入队阻塞指定时间,针对超时版本的poll及tryTransfer
    private Node tryAppend(Node s, boolean haveData) {
        for (Node t = tail, p = t;;) {        // move p to last node and append
            Node n, u;                        // temps for reads of next & tail
            if (p == null && (p = head) == null) {
                if (casHead(null, s))
                    return s;                 // initialize
            }
            else if (p.cannotPrecede(haveData))
                return null;                  // lost race vs opposite mode
            else if ((n = p.next) != null)    // not last; keep traversing
                p = p != t && t != (u = tail) ? (t = u) : // stale tail
                    (p != n) ? n : null;      // restart if off list
            else if (!p.casNext(null, s))
                p = p.next;                   // re-read on CAS failure
            else {
                if (p != t) {                 // update if slack now >= 2
                    while ((tail != t || !casTail(t, s)) &&
                           (t = tail)   != null &&
                           (s = t.next) != null && // advance and retry
                           (s = s.next) != null && s != t);
                }
                return p;
            }
        }
    }

入队操作tryAppend()使用了松弛队列的算法,不是每一次都更新tail。

    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
        int spins = -1; // initialized after first item and cancel checks
        ThreadLocalRandom randomYields = null; // bound if needed

        for (;;) {
            Object item = s.item;
            if (item != e) {                  // matched
                // assert item != s;
                s.forgetContents();           // avoid garbage
                return LinkedTransferQueue.<E>cast(item);
            }
            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                    s.casItem(e, s)) {        // cancel
                unsplice(pred, s);
                return e;
            }

            if (spins < 0) {                  // establish spins at/near front
                if ((spins = spinsFor(pred, s.isData)) > 0)
                    randomYields = ThreadLocalRandom.current();
            }
            else if (spins > 0) {             // spin
                --spins;
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
                    Thread.yield();           // occasionally yield
            }
            else if (s.waiter == null) {
                s.waiter = w;                 // request unpark then recheck
            }
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos > 0L)
                    LockSupport.parkNanos(this, nanos);
            }
            else {
                LockSupport.park(this);
            }
        }
    }

awaitMatch入队后自旋或阻塞,阻塞了就等待被其它线程匹配到并唤醒。

关于Thread.yield()作用,主要是减轻自旋对繁忙系统的影响,因为自旋会消耗cpu,自旋也会消耗cpu,等于是无故浪费cpu资源:
During spins threads check their interrupt status and generate a thread-local random number to decide to occasionally perform a Thread.yield. While yield has underdefined specs, we assume that it might help, and will not hurt, in limiting impact of spinning on busy systems.

5.实例

参考LinkedTransferQueue原理分析
操作序列:tansfer(a), transfer(b), tansfer(c), take(), take(), take(), transfer(d)/take()。

  • step1.thread1、thread2、thread3分别进行tansfer(a)、tansfer(b)、tansfer(c)


  • step2.线程thread4执行take()时, 找到匹配的节点, 置位null。 并唤醒thread1; thread1还会将item=this, waiter=null


  • step3.线程thread5执行take()时, 找到匹配的节点, 置位null, 并唤醒thread2. 发现当前匹配节点不是head节点, 那么则移动head节点直到松弛距离为2。 thread2还会将item=this, waiter=null


  • step4.线程thread6执行take()时, 找到匹配的节点, 置位null。 并唤醒thread3; thread3还会将item=this, waiter=null


  • step5-1.若此时线程thread7进行了tansfer(d), 仅仅将线程thread7以数据节点添加至队列中


  • step5-2.若此时线程thread7进行了take(), 此时也添加到了末尾, 注意此时队列模式发生了变化。



    在thread7进行了take()的基础上, thread8线程进行了put(4)操作:


6.总结

  • 使用了ConcurrentLinkedQueue松弛算法,“松弛阀值”为2,不是每次都更新head/tail,减少了CAS次数
  • 使用了SynchronousQueue的双重队列,因为是无界队列,因此put是异步的,不会阻塞。
  • 对于超时poll()和take(),阻塞前会自旋一会,这点也与SynchronousQueue机制一样

相关文章

网友评论

      本文标题:并发容器BlockingQueue - LinkedTransf

      本文链接:https://www.haomeiwen.com/subject/ecfpkctx.html