美文网首页
【BlockingQueue】DelayQueue

【BlockingQueue】DelayQueue

作者: 有章 | 来源:发表于2018-08-16 13:54 被阅读0次

DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最短。注意:不能将null元素放置到这种队列

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    private final Condition available = lock.newCondition();

代码示例:

public class NewCachedPoolDemo {
    @Data
    public static class Work implements Delayed {
        long workTime;
        long submitTime;

        public Work(long workTime) {
            this.workTime = workTime;
            this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS) + System.nanoTime();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.submitTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (o == null || !(o instanceof Work)) return 1;
            Work work = (Work) o;
            if (this.getDelay(TimeUnit.NANOSECONDS) > work.getDelay(TimeUnit.NANOSECONDS)) return 1;
            if (this.workTime < work.workTime) return -1;
            return 0;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newScheduledThreadPool(1);
        DelayQueue<Work> delayQueue = new DelayQueue<>();
        Random random = new Random();
        for (int i = 0; i < 20; i++) {
            delayQueue.add(new Work(i + random.nextInt(200)));
        }
        Work work = null;
        do {
            work = delayQueue.poll();
            System.out.println("submit time:"+work.getSubmitTime());
        } while (work != null);
    }
}

由于delayQueue中维护了priorityQueue队列,实际delayQueue的put、add、offer都会调用priorityQueue的对应方法。而priorityQueue队列的put、add方法最终都会调用其offer方法,故重点看下offer方法:

private final PriorityQueue<E> q = new PriorityQueue<E>();

内部的优先级队列PriorityQueue(按到期时间正序排),实际利用Work中实现Comparable中的compareTo方法进行比较,优先级队列的队首位置是即将到期的元素。priorityQueue方法可以参考链接 https://www.jianshu.com/p/49726e0e5167

offer方法
public boolean offer(E e) {
        // 获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            // 判断是否添加成功
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
1.add内部调用了offer方法,将元素放置在优先队列中
2.如果队首元素等于新添加的元素,则标识队列中只有一个元素,则唤醒一个消费线程

take方法

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
1.获取可中断重入锁
2.自旋的执行,获取队列中第一个元素first=q.peek(),如果队列为空则进入线程等待队列
3.获取头元素的到期时间first.getDelay(),如果到期,则出队并删除优先队列的第一个元素
4.如果还没到期,则释放头元素的引用。当leader不为空,说明被其他线程持有了,则当前线程直接进入等待队列available.await()。
5.否则设置header为当前线程,当前线程等待头元素的到期时间,等待超时后,如果当前线程还持有leader,则释放leader,并重新自旋重复以上逻辑
6.最终,没有任何线程持有leader且队列不为空时,在唤醒一个消费线程
leader线程的作用:
高并发下,多个线程在执行队列的出队操作,如果一个线程持有了Leader,则直接让其他线程进入等待队列,避免无谓的性能消耗
first置为null的作用类似,也是避免多个线程持有队列头元素的引用,导致无法gc
peek方法
只返回Null或队列头元素
    public E peek() {
        return (size == 0) ? null : (E) queue[0];
    }
poll方法
1.返回队列头元素,并将队列最后一个位置s的元素置空
2.当队列元素个数大于1时,调用siftDown方法,比较头元素的左右子元素和x的大小,选出最小的元素m放在队列头部。如果x最小,将x放在队列头部后结束。如果x
不为最小值,则继续比较x和m的子元素大小,依次
    public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;
        E result = (E) queue[0];
        E x = (E) queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return result;
    }

相关文章

网友评论

      本文标题:【BlockingQueue】DelayQueue

      本文链接:https://www.haomeiwen.com/subject/kdanbftx.html