- LinkedBlockingQueue实现BlockingQueue接口,是一个有界阻塞队列
- 内部数据结构是单向链表
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
- 是一个有容量限制的队列,默认int最大值
- 两个可重入锁,因为队列是一头入一头出,所以两边加锁
- 对应take锁,条件是队列空。队列为空时,不能take,当前take的线程进入notEmpty条件等待池并释放锁
- 对应put锁,条件是队列满。队列满时,不能put,当前put的线程进入notFull条件等待池并释放锁。
private final int capacity; //最大容量限制
private final AtomicInteger count = new AtomicInteger(); //当前占用容量
transient Node<E> head; //头指针
private transient Node<E> last; //尾指针
private final ReentrantLock takeLock = new ReentrantLock(); //take的可重入锁
private final Condition notEmpty = takeLock.newCondition(); //队列为空时的线程等待池
private final ReentrantLock putLock = new ReentrantLock(); //put的可重入锁
private final Condition notFull = putLock.newCondition(); //队列满时的队列等待池
- 初始化头尾指针时,是有一个空node,头指针永远指向一个空node
public LinkedBlockingQueue(int capacity) { //初始化最大容量
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null); //初始化头尾
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) { //队列满了阻塞
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); //唤醒put线程
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty(); //唤醒take线程
}
网友评论