前言
最近写一个简单版的网络框架,遇到一个这样的场景,如果有多个请求,那么我们需要把请求都放入一个队列当中。队列正常来说是FIFO(先进先出),在java中也定义了一个Queue接口,但是一看JAVA API文档发现,有一堆的实现类,那么我们该使用哪个类来维护用户的请求呢?
结构图

以上是我对照JAVA API文档画的一个类图。把常用的类和接口都标了出来。首先BlockingQueue和Deque接口都继承了Queue,它们一个是阻塞队列,一个是双端队列。接着就是它们各自的实现类,我们看看每一个的特点。
BlockingQueue
BlockingQueue从字面意思就知道它是阻塞队列,它在以下两种情况会造成阻塞:
1.当队列满了的时候进行入队操作。
2.当队列空了的时候进行出队操作。
也就是说,当一个线程对已经满了的队列进行入队操作时,会被阻塞,除非另外一个线程进行了出队操作。或者当一个线程对一个空的队列进行出队操作的时候,会被阻塞,除非另外一个线程进行了入队的操作。
阻塞队列是线程安全的,主要用于生产者/消费者的场景。比如一个线程在队尾进行put操作,另外一个线程在队头进行take操作。需要注意的是BlockingQueue不能插入Null值,否则会报NullPointerException异常。
LinkedBlockingQueue
LinkedBlockingQueue阻塞队列的大小如果不指定默认为Integer的最大值。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
内部采用链表实现
/**
* Linked list node class.
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
从入队出队的代码中我们可以发现,LinkedBlockingQueue是采用先进先出的方式存储数据,也就是在队尾入队,在队头出队。
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
ArrayBlockingQueue
ArrayBlockingQueue阻塞队列必须在创建的时候指定队列的大小,内部采用数组的实现方式,并且跟LinkedBlockingQueue一样,都是采用先进先出的方式。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
PriorityBlockingQueue
PriorityBlockingQueue默认的初始化大小只有11,允许分配的最大size为integer的最大值-8,内部采用数组实现,可以插入NULL值,所有插入PriorityBlockingQueue的值都必须实现Comparable接口,因为队列的优先顺序就是按照这个规则来实现的。
/**
* Default array capacity.
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
SynchronousQueue
SynchronousQueue是一种没有数据缓冲的BlockingQueue,生产者的put操作必须等待消费者的take操作,反过来也一样。(
producer waits until consumer is ready, consumer waits until producer is ready。)
SynchronousQueue使用了一种无锁算法“Dual stack and Dual queue“来实现阻塞操作,在创建SynchronousQueue对象的时候可以选择竞争机制,是公平竞争还是非公平竞争,如果不指定默认为非公平竞争。
/**
* Creates a {@code SynchronousQueue} with nonfair access policy.
*/
public SynchronousQueue() {
this(false);
}
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
非公平竞争使用的是先进后出,公平竞争使用的是先进先出。但是两种模式内部都是使用链表来实现。
/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
U.compareAndSwapObject(this, NEXT, cmp, val);
}
boolean casItem(Object cmp, Object val) {
return item == cmp &&
U.compareAndSwapObject(this, ITEM, cmp, val);
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
void tryCancel(Object cmp) {
U.compareAndSwapObject(this, ITEM, cmp, this);
}
boolean isCancelled() {
return item == this;
}
/**
* Returns true if this node is known to be off the queue
* because its next pointer has been forgotten due to
* an advanceHead operation.
*/
boolean isOffList() {
return next == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long ITEM;
private static final long NEXT;
static {
try {
ITEM = U.objectFieldOffset
(QNode.class.getDeclaredField("item"));
NEXT = U.objectFieldOffset
(QNode.class.getDeclaredField("next"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
/** Node class for TransferStacks. */
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
U.compareAndSwapObject(this, NEXT, cmp, val);
}
/**
* Tries to match node s to this node, if so, waking up thread.
* Fulfillers call tryMatch to identify their waiters.
* Waiters block until they have been matched.
*
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
if (match == null &&
U.compareAndSwapObject(this, MATCH, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
/**
* Tries to cancel a wait by matching node to itself.
*/
void tryCancel() {
U.compareAndSwapObject(this, MATCH, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long MATCH;
private static final long NEXT;
static {
try {
MATCH = U.objectFieldOffset
(SNode.class.getDeclaredField("match"));
NEXT = U.objectFieldOffset
(SNode.class.getDeclaredField("next"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
DelayQueue
DelayQueue=BlockingQueue+PriorityQueue+Delayed。从这个等式中可以看出所有DelayQueue的特性:阻塞,有序,延迟。DelayQueue中的元素必须实现java.util.concurrent.Delayed这个接口。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
这个接口的定义非常简单,getDelay返回值就是队列中元素延迟释放的时间。如果返回值是0或者是一个负值,那么就说明该元素到了要释放的时间,就会通过take方法释放该元素。
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
DelayQueue的使用场景很多,例如:
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
Deque
Deque的含义是”double ended queue“,即双端队列。
双端队列是一种具有队列和栈性质的数据结构。元素可以从队列的两端进行插入,弹出和删除操作。在java的util包中定义为接口。
LinkedList
LinkedList同时实现了List和Deque接口。所以它既可以看作是一个顺序容器,又可以看作是一个队列,还可以看作是一个栈。当你想使用栈的时候,可以考虑一下LinkedList,因为官方已经不建议使用Stack。
private static class Node<E> {
E item;
Node<E> next;
Node<E> prev;
Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}
LinkedList内部使用双向链表来实现。
ArrayDeque
ArrayDeque是用数组实现的双端队列,创建的时候可以指定队列的容量,队列最小容量为8。ArrayDeque的效率比LinkedList的效率高,所以优先考虑使用。
总结
看了这么多类的介绍,那么我们在网络框架中到底使用哪一个来维护用户的请求比较好呢?窃以为如果用户的请求有优先级的话,那么可以考虑使用PriorityBlockingQueue,如果有过期的需求可以考虑DelayQueue。如果想更灵活的话可以使用双端队列中的ArrayDeque也是很高效的。
如果你觉得本篇文章帮助到了你,希望大爷能够给瓶买水钱。
本文为原创文章,转载请注明出处!
网友评论