美文网首页程序员
Java - LinkedBlockingQueue的阻塞实现

Java - LinkedBlockingQueue的阻塞实现

作者: 夹胡碰 | 来源:发表于2021-01-01 21:09 被阅读0次

LinkedBlockingQueueBlockingQueue的链表实现,他的阻塞体现在puttake方法上,下面将通过源码介绍如何LinkedBlockingQueue是如何实现的阻塞队列。

1. ReentrantLock+Condition

通过AQS构建的ReentrantLockCondition实现了puttake的锁与线程挂起/唤醒模型

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
2. put阻塞

1)首先加put
2)当容量满时(count.get() == capacity),执行notFull.await();挂起当前线程并释放锁,相当于Object.wait
3)当容量从0变成1时(if c == 0),执行signalNotEmpty();唤醒一个take挂起线程,相当于Object.notify

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // put锁
    try {
        while (count.get() == capacity) {
            notFull.await(); // 容量满时,挂起线程释放锁,并加入等待队列
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty(); // 当容量从0变成1时,唤醒一个take挂起的线程
}
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();// 唤醒一个take挂起的线程
    } finally {
        takeLock.unlock();
    }
}
3. take阻塞

1)首先加take
2)当容量为0时(count.get() == 0),执行notEmpty.await();挂起当前线程并释放锁,相当于Object.wait
3)当容量从满容量减少1时(if c == capacity),执行signalNotFull();唤醒一个put挂起线程,相当于Object.notify

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // take锁
    try {
        while (count.get() == 0) {
            notEmpty.await(); // 当队列中没有对象时,挂起线程释放锁,并加入等待队列
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull(); // 当容量从满容量减少1时,唤醒一个put挂起的线程
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal(); // 唤醒一个put挂起的线程
    } finally {
        putLock.unlock();
    }
}
4. 拓展: offer(timeout) 与 poll(timeout)实现原理

实现与put/take基本一样,只不过底层调用了LockSupport.parkNanos/LockSupport.unparkNanos

  • wait
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 基础park
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • wait timeout
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout); // timeout park
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

相关文章

网友评论

    本文标题:Java - LinkedBlockingQueue的阻塞实现

    本文链接:https://www.haomeiwen.com/subject/sooqoktx.html