DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最短。注意:不能将null元素放置到这种队列
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
代码示例:
public class NewCachedPoolDemo {
@Data
public static class Work implements Delayed {
long workTime;
long submitTime;
public Work(long workTime) {
this.workTime = workTime;
this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS) + System.nanoTime();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.submitTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null || !(o instanceof Work)) return 1;
Work work = (Work) o;
if (this.getDelay(TimeUnit.NANOSECONDS) > work.getDelay(TimeUnit.NANOSECONDS)) return 1;
if (this.workTime < work.workTime) return -1;
return 0;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newScheduledThreadPool(1);
DelayQueue<Work> delayQueue = new DelayQueue<>();
Random random = new Random();
for (int i = 0; i < 20; i++) {
delayQueue.add(new Work(i + random.nextInt(200)));
}
Work work = null;
do {
work = delayQueue.poll();
System.out.println("submit time:"+work.getSubmitTime());
} while (work != null);
}
}
由于delayQueue中维护了priorityQueue队列,实际delayQueue的put、add、offer都会调用priorityQueue的对应方法。而priorityQueue队列的put、add方法最终都会调用其offer方法,故重点看下offer方法:
private final PriorityQueue<E> q = new PriorityQueue<E>();
内部的优先级队列PriorityQueue(按到期时间正序排),实际利用Work中实现Comparable中的compareTo方法进行比较,优先级队列的队首位置是即将到期的元素。priorityQueue方法可以参考链接 https://www.jianshu.com/p/49726e0e5167
offer方法
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 判断是否添加成功
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
1.add内部调用了offer方法,将元素放置在优先队列中
2.如果队首元素等于新添加的元素,则标识队列中只有一个元素,则唤醒一个消费线程
take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
1.获取可中断重入锁
2.自旋的执行,获取队列中第一个元素first=q.peek(),如果队列为空则进入线程等待队列
3.获取头元素的到期时间first.getDelay(),如果到期,则出队并删除优先队列的第一个元素
4.如果还没到期,则释放头元素的引用。当leader不为空,说明被其他线程持有了,则当前线程直接进入等待队列available.await()。
5.否则设置header为当前线程,当前线程等待头元素的到期时间,等待超时后,如果当前线程还持有leader,则释放leader,并重新自旋重复以上逻辑
6.最终,没有任何线程持有leader且队列不为空时,在唤醒一个消费线程
leader线程的作用:
高并发下,多个线程在执行队列的出队操作,如果一个线程持有了Leader,则直接让其他线程进入等待队列,避免无谓的性能消耗
first置为null的作用类似,也是避免多个线程持有队列头元素的引用,导致无法gc
peek方法
只返回Null或队列头元素
public E peek() {
return (size == 0) ? null : (E) queue[0];
}
poll方法
1.返回队列头元素,并将队列最后一个位置s的元素置空
2.当队列元素个数大于1时,调用siftDown方法,比较头元素的左右子元素和x的大小,选出最小的元素m放在队列头部。如果x最小,将x放在队列头部后结束。如果x
不为最小值,则继续比较x和m的子元素大小,依次
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
}








网友评论