美文网首页
Java并发——并发容器

Java并发——并发容器

作者: 坠尘_ae94 | 来源:发表于2020-08-07 13:56 被阅读0次

BlockingQueue

A {@link Queue} that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

ArrayBlockingQueue

A bounded backed by an array. This queue orders elements FIFO (first-in-first-out). The <em>head</em> of the queue is that element that has been on the queue the longest time. The <em>tail</em> of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。

成员变量

//    序列化ID
    private static final long serialVersionUID = -817911632652898426L;
//    存放元素
    final Object[] items;
//    下一个取出元素的坐标
    int takeIndex;
//    下一个存放元素的坐标
    int putIndex;
//    队列中的元素计数
    int count;
//    可重入锁
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
//  不知道啥玩意儿
    transient Itrs itrs;

构造方法

共三个:

public ArrayBlockingQueue(int capacity) {}
public ArrayBlockingQueue(int capacity, boolean fair) {}
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {}

需要一个capacity参数的构造方法调用需要两个参数的构造方法:

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

API

  • add

        @Test(expected = IllegalStateException.class)
        public void testAddMethod(){
            ArrayBlockingQueue<String > queue = example.create(5);
            assertThat(queue.add("Hello1"), CoreMatchers.equalTo(true));
            assertThat(queue.add("Hello2"), CoreMatchers.equalTo(true));
            assertThat(queue.add("Hello3"), CoreMatchers.equalTo(true));
            assertThat(queue.add("Hello4"), CoreMatchers.equalTo(true));
            assertThat(queue.add("Hello5"), CoreMatchers.equalTo(true));
            assertThat(queue.add("Hello6"), CoreMatchers.equalTo(true));
            assertThat(queue.size(),CoreMatchers.equalTo(5));
            fail("FAILED TO ADD");
        }
    
  • offer

    与add类似,只是offer不抛异常,而是返回false

    @Test
        public void tesOfferMethod(){
            ArrayBlockingQueue<String > queue = example.create(5);
            assertThat(queue.offer("Hello1"), CoreMatchers.equalTo(true));
            assertThat(queue.offer("Hello2"), CoreMatchers.equalTo(true));
            assertThat(queue.offer("Hello3"), CoreMatchers.equalTo(true));
            assertThat(queue.offer("Hello4"), CoreMatchers.equalTo(true));
            assertThat(queue.offer("Hello5"), CoreMatchers.equalTo(true));
            assertThat(queue.offer("Hello6"), CoreMatchers.equalTo(false));
        }
    
  • put

    如果队列已满,那么调用put方法会进入阻塞,可能抛出InterruptedException

        @Test
        public void tesPutMethod() throws InterruptedException {
            ArrayBlockingQueue<String > queue = example.create(5);
            queue.put("Hello1");
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(()-> {
                try {
                    assertThat(queue.take(),CoreMatchers.equalTo("Hello1"));
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            queue.put("Hello2");
            queue.put("Hello3");
            queue.put("Hello4");
            queue.put("Hello5");
            queue.put("Hello6");
        }
    
  • poll

    把值取出来

        public void testPoll(){
            ArrayBlockingQueue<String> queue = example.create(2);
            queue.add("Hello1");
            queue.add("Hello2");
            assertThat(queue.poll(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.poll(),CoreMatchers.equalTo("Hello2"));
            assertThat(queue.poll(),CoreMatchers.nullValue());
        }
    
  • peak

    不管取几次,每次取得都是头部数据,也就是它不会删除队列中的数据

            ArrayBlockingQueue<String> queue = example.create(2);
            queue.add("Hello1");
            queue.add("Hello2");
            assertThat(queue.peek(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.peek(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.peek(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.peek(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.peek(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.peek(),CoreMatchers.equalTo("Hello1"));
    
  • clear

    取出队列所有元素,返回void

  • element

    实际内部调用peek方法,但是可能抛出NoSuchElementException

        @Test(expected = NoSuchElementException.class)
        public void testElement(){
            ArrayBlockingQueue<String> queue = example.create(2);
            queue.add("Hello1");
            queue.add("Hello2");
            assertThat(queue.element(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.element(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.element(),CoreMatchers.equalTo("Hello1"));
            queue.clear();
            assertThat(queue.element(),CoreMatchers.equalTo("Hello1"));
        }
    
  • remove

    实际内部调用poll,没有元素时会抛出异常

        @Test(expected = NoSuchElementException.class)
        public void testRemove(){
            ArrayBlockingQueue<String> queue = example.create(2);
            queue.add("Hello1");
            queue.add("Hello2");
            assertThat(queue.remove(),CoreMatchers.equalTo("Hello1"));
            assertThat(queue.remove(),CoreMatchers.equalTo("Hello2"));
            assertThat(queue.remove(),CoreMatchers.equalTo("Hello1"));
        }
    

PriorityBlockingQueue

An unbounded {@linkplain BlockingQueue blocking queue} that uses the same ordering rules as class {@link PriorityQueue} and suppliesblocking retrieval operations.

无边界队列,不允许NULL值(要排序),不允许插入不可排序的对象.

要插入到该队列的元素需要实现java.lang.Comparable接口,重写compareTo方法。

构造方法

public PriorityBlockingQueue() {}
public PriorityBlockingQueue(int initialCapacity) {}
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {}
public PriorityBlockingQueue(int initialCapacity) {}

API

  • add
  • offer
  • put

这仨都一样,底层调用offer方法。

    public void put(E e) {
        offer(e); // never need to block
    }
    public boolean add(E e) {
        return offer(e);
    }

测试下:


public class PriorityBlockingQueueTest {
    public static void main(String[] args) {
        PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue();
        priorityBlockingQueue.add(new Person(4,"Amy"));
        priorityBlockingQueue.add(new Person(2,"Mike"));
        priorityBlockingQueue.add(new Person(3,"Jake"));
        priorityBlockingQueue.add(new Person(5,"Jake3"));
        System.out.println(priorityBlockingQueue);
    }
}

class Person implements Comparable<Person> {
    int id;
    String name;

    public Person(int id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "Person{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public int compareTo(Person o) {
        return this.id > o.id ? 1 : -1;
    }

结果:

[Person{id=2, name='Mike'}, Person{id=4, name='Amy'}, Person{id=3, name='Jake'}, Person{id=5, name='Jake3'}]

结果和预期不太一样啊。

看这里:

Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The priority queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: For each node n in the heap and each descendant d of n, n <= d. The element with the
lowest value is in queue[0], assuming the queue is nonempty.

看不懂?没事儿,我也看不懂,贴出来显得我很厉害的样子。

反正大致就是,它的存放和你想的不太一样,采用了二叉堆,二叉堆一般用数组表示,如果父节点的节点位置在n处,那么其左孩子节点为:2 * n + 1 ,其右孩子节点为2 * (n + 1),其父节点为(n - 1) / 2 处。详细点的可以看参考的第三篇博客,算了,看这里

二叉堆是一种特殊的堆,就结构性而言就是完全二叉树或者是近似完全二叉树,满足树结构性和堆序性。树机构特性就是完全二叉树应该有的结构,堆序性则是:父节点的键值总是保持固定的序关系于任何一个子节点的键值,且每个节点的左子树和右子树都是一个二叉堆。它有两种表现形式:最大堆、最小堆。

最大堆:父节点的键值总是大于或等于任何一个子节点的键值

最小堆:父节点的键值总是小于或等于任何一个子节点的键值

  • element

    读取数据,但是不会删除数据,实际调用peek方法

        @Test
        public void testGetElement(){
            PriorityBlockingQueue<String> queue = example.create(3);
            assertThat(queue.add("hello1"),equalTo(true));
            assertThat(queue.add("hello2"),equalTo(true));
            assertThat(queue.add("hello3"),equalTo(true));
            assertThat(queue.add("hello0"),equalTo(true));
            assertThat(queue.element(), CoreMatchers.equalTo("hello0"));
            assertThat(queue.element(), CoreMatchers.equalTo("hello0"));
        }
    
  • peek

    同上

  • poll

    这个会弹数据

  • take

    如果没有数据则会阻塞,可抛出中断异常

    @Test
    public void testAdd(){
        PriorityBlockingQueue<Hello> queue = example.create(3,(o1,o2)->o1.hashCode()-o2.hashCode());
        queue.offer(new Hello());
    }

LinkedBlockingQueue

An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
linked nodes. This queue orders elements FIFO (first-in-first-out). The <em>head</em> of the queue is that element that has been on the queue the longest time. The <em>tail</em> of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

LinkedBlockingQueue是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

构造方法

public LinkedBlockingQueue() {}
public LinkedBlockingQueue(int capacity) {}
public LinkedBlockingQueue(Collection<? extends E> c) {}

API

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。
LinkedBlockingQueue()
// 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
LinkedBlockingQueue(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingQueue。
LinkedBlockingQueue(int capacity)
// 从队列彻底移除所有元素。
void clear()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
boolean offer(E e)
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。void put(E e)
// 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
int remainingCapacity()
// 从此队列移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回队列中的元素个数。
int size()
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
E take()
// 返回按适当顺序包含此队列中所有元素的数组。
Object[] toArray()
// 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

这里的内容从参考第五篇博客复制。

  • add

    向队列中添加元素还有一个add方法,并且它可以针对有界队列抛异常,底层仍旧调用offer方法:

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

这里贴一个另一个博主的示例:

import java.util.*;
import java.util.concurrent.*;
/** LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。 
* 下面是“多个线程同时操作并且遍历queue”的示例 
* (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。 
* (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。 
*  @author skywang 
*/
public class LinkedBlockingQueueDemo1 { 
  // TODO: queue是LinkedList对象时,程序会出错。
  //private static Queue<String> queue = new LinkedList<String>();
  private static Queue<String> queue = new LinkedBlockingQueue<String>(); 
  public static void main(String[] args) { 
    // 同时启动两个线程对queue进行操作! 
    new MyThread("ta").start(); 
    new MyThread("tb").start(); 
  } 

  private static void printAll() { 
    String value; 
    Iterator iter = queue.iterator(); 
    while(iter.hasNext()) { 
      value = (String)iter.next(); 
      System.out.print(value+", "); 
    } 
    System.out.println(); 
  } 

  private static class MyThread extends Thread { 
    MyThread(String name) { 
      super(name); 
    } 

    @Override 
    public void run() { 
      int i = 0; 
      while (i++ < 6) { 
        // “线程名” + "-" + "序号" 
        String val = Thread.currentThread().getName()+i;
        queue.add(val); 
        // 通过“Iterator”遍历queue。 
        printAll(); 
      } 
    } 
}

SynchronousQueue

A {@linkplain BlockingQueue blocking queue} in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot {@code peek} at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The <em>head</em> of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and {@code poll()} will return {@code null}. For purposes of other {@code Collection} methods (for example {@code contains}), a {@code SynchronousQueue} acts as an empty collection. This queue
does not permit {@code null} elements.

SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。

被应用于Executors.newCachedThreadPool。

构造方法

public SynchronousQueue() {}
public SynchronousQueue(boolean fair){}

关于fair参数:if true, waiting threads contend in FIFO order for access; otherwise the order is unspecified.

应用

        SynchronousQueue<String> queue = new SynchronousQueue<>();
        new Thread(()->{
            System.out.println(Thread.currentThread().getName() + " Begin add date: HI");
            try {
                TimeUnit.SECONDS.sleep(3);
                queue.put("HI");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            System.out.println("Wait for taking , get data.");
            try {
                TimeUnit.SECONDS.sleep(4);
               System.out.println(queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

结果:

Wait for taking , get data.
Thread-0 Begin add date: HI
HI

参考

相关文章

网友评论

      本文标题:Java并发——并发容器

      本文链接:https://www.haomeiwen.com/subject/elyfdktx.html