美文网首页
java数据集合之LinkedBlockingQueue

java数据集合之LinkedBlockingQueue

作者: CODERLIHAO | 来源:发表于2019-07-31 17:33 被阅读0次

1.继承

image.png

LinkedBlockingQueue是一个线程安全的队列,队列满了,生产者会阻塞,队列空了,消费者会阻塞

2.Collection接口

我们先不看代码,仔细想想对于一个容器,大概需要哪些方法,
大小、添加、删除、是否包含、遍历、清除等方法
容器的大小

int size();
boolean isEmpty();

是否包含元素

 boolean contains(Object o);
 boolean containsAll(Collection<?> c);

添加元素

boolean add(E e);
boolean addAll(Collection<? extends E> c);

删除元素

boolean removeAll(Collection<?> c);

3.AbstractCollection

AbstractCollection给出Collection接口的默认实现,但任然是抽象类
大小

    public abstract int size();
    public boolean isEmpty() {
        return size() == 0;
    }

是否包含元素,重点是会调用元素的equals方法比较

 public boolean contains(Object o) {
        Iterator<E> it = iterator();
        if (o==null) {
           //看的出,可以放一个null元素进来
            while (it.hasNext())
                //遍历元素,此时的o是null,只要找到元素是null的,就认为已经包含了
                if (it.next()==null)
                    return true;
        } else {
            //此时元素不为null,重点是会调用元素的equals方法比较
            while (it.hasNext())
                if (o.equals(it.next()))
                    return true;
        }
        return false;
    }

  //如果c中有一个元素不在容器内,就返回false
  public boolean containsAll(Collection<?> c) {
        for (Object e : c)
            if (!contains(e))
                return false;
        return true;
    }

添加元素

public boolean add(E e) {
        throw new UnsupportedOperationException();
    }

 public boolean addAll(Collection<? extends E> c) {
        boolean modified = false;
        for (E e : c)
            if (add(e))
                modified = true;
        return modified;
    }

删除元素,删除元素的重点是,先要找到元素,还是遍历调用元素的equals方法

  public boolean remove(Object o) {
        Iterator<E> it = iterator();
        if (o==null) {
            while (it.hasNext()) {
                if (it.next()==null) {
                    it.remove();
                    return true;
                }
            }
        } else {
            while (it.hasNext()) {
                if (o.equals(it.next())) {
                    it.remove();
                    return true;
                }
            }
        }
        return false;
    }

 public boolean removeAll(Collection<?> c) {
        Objects.requireNonNull(c);
        boolean modified = false;
        Iterator<?> it = iterator();
        while (it.hasNext()) {
            if (c.contains(it.next())) {
                it.remove();
                modified = true;
            }
        }
        return modified;
    }

4.Queue队列

我经常读错这个,现在放上音标 [kjuː]
基本上,一个队列就是一个先入先出(FIFO)的数据结构


Queue
//插入元素,新元素插入(offer)到队列的尾部
boolean offer(E e);

//删除并返回头元素,如果队列为空,返回null
E poll();

//删除并返回头元素,如果队列为空,会报错
E remove();

//返回头元素,不会删除,如果队列为空,会报错
E element();
//返回头元素,不会删除,如果队列为空,返回null
E peek();

5.AbstractQueue

AbstractQueue
//重写add方法,调用的还是offer方法,如果队列满了,添加会报错
 public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

//remove方法返回的一定不是null元素,删除一个头元素
 public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }


 public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

6.LinkedBlockingQueue

//插入到队列的尾部
 public boolean offer(E e) {
        //不许往队列里offer null元素
        if (e == null) throw new NullPointerException();
        //当前元素的数量
        final AtomicInteger count = this.count;
        //如果当前元素数量已经等于容量,将不再往队列里offer元素
        if (count.get() == capacity)
            return false;
        int c = -1;
        //将元素构建到Node节点里
        Node<E> node = new Node<E>(e);
        //先获取锁,不响应中断
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                //没有满,还可以继续放
                enqueue(node);
                //返回旧值,并将count加1
                c = count.getAndIncrement();
                //没有满,通知那些往队列里放的线程
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        //已经空了,通知队列空了
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

  private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

//插入到队列的尾部,如果满了,会阻塞
public void put(E e) throws InterruptedException {
        //依然不能放null
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //响应中断
        putLock.lockInterruptibly();
        try {
          //如果满了,生产者阻塞等待
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }


public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //可以响应中断 
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                //超时时间为0,直接返回
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
//获取头元素,并删除返回
 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //空队列,消费者将会一直阻塞的等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

//出队,获取头元素,这个函数会删除头元素
 private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

//返回头元素,并不会删除
public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

相关文章

网友评论

      本文标题:java数据集合之LinkedBlockingQueue

      本文链接:https://www.haomeiwen.com/subject/nactdctx.html