美文网首页
JAVA中的队列

JAVA中的队列

作者: 娃娃要从孩子抓起 | 来源:发表于2017-09-27 15:13 被阅读498次

前言

最近写一个简单版的网络框架,遇到一个这样的场景,如果有多个请求,那么我们需要把请求都放入一个队列当中。队列正常来说是FIFO(先进先出),在java中也定义了一个Queue接口,但是一看JAVA API文档发现,有一堆的实现类,那么我们该使用哪个类来维护用户的请求呢?

结构图

队列.png

以上是我对照JAVA API文档画的一个类图。把常用的类和接口都标了出来。首先BlockingQueue和Deque接口都继承了Queue,它们一个是阻塞队列,一个是双端队列。接着就是它们各自的实现类,我们看看每一个的特点。

BlockingQueue

BlockingQueue从字面意思就知道它是阻塞队列,它在以下两种情况会造成阻塞:
1.当队列满了的时候进行入队操作。
2.当队列空了的时候进行出队操作。
也就是说,当一个线程对已经满了的队列进行入队操作时,会被阻塞,除非另外一个线程进行了出队操作。或者当一个线程对一个空的队列进行出队操作的时候,会被阻塞,除非另外一个线程进行了入队的操作。

阻塞队列是线程安全的,主要用于生产者/消费者的场景。比如一个线程在队尾进行put操作,另外一个线程在队头进行take操作。需要注意的是BlockingQueue不能插入Null值,否则会报NullPointerException异常。

LinkedBlockingQueue

LinkedBlockingQueue阻塞队列的大小如果不指定默认为Integer的最大值。

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

内部采用链表实现

  /**
     * Linked list node class.
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

从入队出队的代码中我们可以发现,LinkedBlockingQueue是采用先进先出的方式存储数据,也就是在队尾入队,在队头出队。

  /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

ArrayBlockingQueue

ArrayBlockingQueue阻塞队列必须在创建的时候指定队列的大小,内部采用数组的实现方式,并且跟LinkedBlockingQueue一样,都是采用先进先出的方式。

 public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c)
                    items[i++] = Objects.requireNonNull(e);
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

PriorityBlockingQueue

PriorityBlockingQueue默认的初始化大小只有11,允许分配的最大size为integer的最大值-8,内部采用数组实现,可以插入NULL值,所有插入PriorityBlockingQueue的值都必须实现Comparable接口,因为队列的优先顺序就是按照这个规则来实现的。

  /**
     * Default array capacity.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    /**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    /**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

SynchronousQueue

SynchronousQueue是一种没有数据缓冲的BlockingQueue,生产者的put操作必须等待消费者的take操作,反过来也一样。(
producer waits until consumer is ready, consumer waits until producer is ready。)
SynchronousQueue使用了一种无锁算法“Dual stack and Dual queue“来实现阻塞操作,在创建SynchronousQueue对象的时候可以选择竞争机制,是公平竞争还是非公平竞争,如果不指定默认为非公平竞争。

 /**
     * Creates a {@code SynchronousQueue} with nonfair access policy.
     */
    public SynchronousQueue() {
        this(false);
    }

    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

非公平竞争使用的是先进后出,公平竞争使用的是先进先出。但是两种模式内部都是使用链表来实现。

 /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

            boolean casNext(QNode cmp, QNode val) {
                return next == cmp &&
                    U.compareAndSwapObject(this, NEXT, cmp, val);
            }

            boolean casItem(Object cmp, Object val) {
                return item == cmp &&
                    U.compareAndSwapObject(this, ITEM, cmp, val);
            }

            /**
             * Tries to cancel by CAS'ing ref to this as item.
             */
            void tryCancel(Object cmp) {
                U.compareAndSwapObject(this, ITEM, cmp, this);
            }

            boolean isCancelled() {
                return item == this;
            }

            /**
             * Returns true if this node is known to be off the queue
             * because its next pointer has been forgotten due to
             * an advanceHead operation.
             */
            boolean isOffList() {
                return next == this;
            }

            // Unsafe mechanics
            private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
            private static final long ITEM;
            private static final long NEXT;

            static {
                try {
                    ITEM = U.objectFieldOffset
                        (QNode.class.getDeclaredField("item"));
                    NEXT = U.objectFieldOffset
                        (QNode.class.getDeclaredField("next"));
                } catch (ReflectiveOperationException e) {
                    throw new Error(e);
                }
            }
        }


/** Node class for TransferStacks. */
        static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
            // Note: item and mode fields don't need to be volatile
            // since they are always written before, and read after,
            // other volatile/atomic operations.

            SNode(Object item) {
                this.item = item;
            }

            boolean casNext(SNode cmp, SNode val) {
                return cmp == next &&
                    U.compareAndSwapObject(this, NEXT, cmp, val);
            }

            /**
             * Tries to match node s to this node, if so, waking up thread.
             * Fulfillers call tryMatch to identify their waiters.
             * Waiters block until they have been matched.
             *
             * @param s the node to match
             * @return true if successfully matched to s
             */
            boolean tryMatch(SNode s) {
                if (match == null &&
                    U.compareAndSwapObject(this, MATCH, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                return match == s;
            }

            /**
             * Tries to cancel a wait by matching node to itself.
             */
            void tryCancel() {
                U.compareAndSwapObject(this, MATCH, null, this);
            }

            boolean isCancelled() {
                return match == this;
            }

            // Unsafe mechanics
            private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
            private static final long MATCH;
            private static final long NEXT;

            static {
                try {
                    MATCH = U.objectFieldOffset
                        (SNode.class.getDeclaredField("match"));
                    NEXT = U.objectFieldOffset
                        (SNode.class.getDeclaredField("next"));
                } catch (ReflectiveOperationException e) {
                    throw new Error(e);
                }
            }
        }

DelayQueue

DelayQueue=BlockingQueue+PriorityQueue+Delayed。从这个等式中可以看出所有DelayQueue的特性:阻塞,有序,延迟。DelayQueue中的元素必须实现java.util.concurrent.Delayed这个接口。

public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

这个接口的定义非常简单,getDelay返回值就是队列中元素延迟释放的时间。如果返回值是0或者是一个负值,那么就说明该元素到了要释放的时间,就会通过take方法释放该元素。

 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

DelayQueue的使用场景很多,例如:
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

Deque

Deque的含义是”double ended queue“,即双端队列。
双端队列是一种具有队列和栈性质的数据结构。元素可以从队列的两端进行插入,弹出和删除操作。在java的util包中定义为接口。

LinkedList

LinkedList同时实现了List和Deque接口。所以它既可以看作是一个顺序容器,又可以看作是一个队列,还可以看作是一个栈。当你想使用栈的时候,可以考虑一下LinkedList,因为官方已经不建议使用Stack。

private static class Node<E> {
        E item;
        Node<E> next;
        Node<E> prev;

        Node(Node<E> prev, E element, Node<E> next) {
            this.item = element;
            this.next = next;
            this.prev = prev;
        }
    }

LinkedList内部使用双向链表来实现。

ArrayDeque

ArrayDeque是用数组实现的双端队列,创建的时候可以指定队列的容量,队列最小容量为8。ArrayDeque的效率比LinkedList的效率高,所以优先考虑使用。

总结

看了这么多类的介绍,那么我们在网络框架中到底使用哪一个来维护用户的请求比较好呢?窃以为如果用户的请求有优先级的话,那么可以考虑使用PriorityBlockingQueue,如果有过期的需求可以考虑DelayQueue。如果想更灵活的话可以使用双端队列中的ArrayDeque也是很高效的。

如果你觉得本篇文章帮助到了你,希望大爷能够给瓶买水钱。
本文为原创文章,转载请注明出处!

相关文章

  • 【二】优先队列和堆

    堆 ----待补充--- java中的优先队列 PriorityQueue为java中的优先队列((a,b)->b...

  • Java阻塞队列四组API介绍

    Java阻塞队列四组API介绍 通过前面几篇文章的学习,我们已经知道了Java中的队列分为阻塞队列和非阻塞队列以及...

  • 以LinkedBlockingQueue为例浅谈阻塞队列的实现

    目录 阻塞队列简介阻塞队列的定义Java中的阻塞队列 LinkedBlockingQueue单链表定义锁和等待队列...

  • 数据结构 | 其三 栈和队列

    栈 java中栈继承了Vector 队列 2.1 普通队列 2.2 循环队列 2.3 优先级队列 2.4 阻塞队列...

  • 队列

    文章结构 什么是队列 实现队列顺序队列链式队列循环队列 Java中的队列 1. 什么是队列 队列也是一种操作受限的...

  • Java 多线程、Queue学习,CAS学习

    主题一:Queue: Java并发(10)- 简单聊聊JDK中的七大阻塞队列解读 Java 并发队列 Blocki...

  • JAVA中的队列

    前言 最近写一个简单版的网络框架,遇到一个这样的场景,如果有多个请求,那么我们需要把请求都放入一个队列当中。队列正...

  • java中的队列

    queue 在java1.5被引入,它和list 、set一样继承自collection接口。其中LinkedLi...

  • java中的队列

    一、队列是什么 队列是一种先进先出的数据结构。 二、队列的接口定义 方法名称作用队列满boolean add(E ...

  • java中的队列

    队列的定义和数据类型 定义:队列,也叫先进先出表,是一种运算受限的线性表,其限制是仅允许在表的一端进行插入,而在表...

网友评论

      本文标题:JAVA中的队列

      本文链接:https://www.haomeiwen.com/subject/beddextx.html