美文网首页
2. AbstractQueuedSynchronizer(二)

2. AbstractQueuedSynchronizer(二)

作者: shallowinggg | 来源:发表于2019-03-15 23:10 被阅读0次

Conditon接口与示例

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象创建出来的,换句话说,Condition是依赖Lock对象的。

Condition的使用方式比较简单,需要主语在调用方法前获取锁,如下所示:

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    
    public void conditionAwait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }
    
    public void conditionSignal() throws InterruptedException {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才会从await()方法返回,并且在返回前已经获取了锁。

Condition接口定义的方法如下:

方法名称 描述
void await() throws InterruptedException 当前线程进入等待状态直到被通知或中断,当前线程进入运行状态且从await()方法返回的情况包括:
1.其他线程调用Condition的signal()或signalAll()方法,而当前线程被选中唤醒。
2.其他线程中断当前线程。
如果当前等待线程从await()方法返回,那么表面该线程已经获取了Condition对象所对应的锁
void awaitUninterruptibly() 当前线程进入等待状态直到被通知,从方法名称可以看出该方法对中断不敏感
long awaitNanos(long nanosTimeout) throws InterruptedException 当前线程进行等待状态直到被通知、中断或者超时。返回值表示剩余的时间
boolean await(long time, TimeUnit unit) throws InterruptedException 当前线程进行等待状态直到被通知、中断或者超时,与awaitNanos区别就是可以指定时间单位
boolean awaitUntil(Date deadline) throws InterruptedException 当前线程进行等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,返回true,否则返回false
void signal() 唤醒一个等待在Condition上的线程,该线程从等待方法上返回前必须获得与Condition相关联的锁
void signalAll() 唤醒所有等待在Condition上的线程,能从等待方法上返回的线程必须先获得与Condition相关联的锁

下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作会阻塞插入线程,直接队列出现空位。

package util;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedQueue<T> {
    private Object[] items;
    private int addIndex;
    private int removeIndex;
    private int count;
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public BoundedQueue(int count) {
        items = new Object[count];
    }

    public void add(T item) throws InterruptedException {
        lock.lock();
        try {
            while (count==items.length) {
                notFull.await();
            }
            items[addIndex] = item;
            if(++addIndex == items.length) {
                addIndex=0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count==0) {
                notEmpty.await();
            }
            Object o = items[removeIndex];
            if(++removeIndex==items.length) {
                removeIndex=0;
            }
            --count;
            notFull.signal();
            return (T) o;
        } finally {
            lock.unlock();
        }
    }
}

上述示例中,BoundedQueue通过add(T)方法添加一个元素,通过remove()方法移除一个元素。

以增加方法为例,首先需要获得锁,目的是确保数组修改的可见性和排他性。当数量元素数量等于数组长度时,表示数组已满,调用notFull.await(),当前线程随之释放锁并进入等待状态。如果数组元素数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中有新元素可以获取。

Condition实现分析

ConditionObject是AQS的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也比较合理。每个Condition对象都包含一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

等待队列

等待队列是一个FIFO队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义服用了同步器中节点的定义,也就是两个节点类型都是AbstractQueuedSynchronizer.Node

一个Condition中包含了一个等待队列,Condition拥有首节点和尾节点。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。

> line: 1862
public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

等待队列的基本结构如下:

等待

Condition拥有首尾节点的引用,而新增节点只需要将原来的尾节点nextWaiter指向它,并且更新尾节点即可。上述的更新过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

> line: 2007
public final void awaitUninterruptibly() {
    // 增加尾节点
    Node node = addConditionWaiter();

    // 释放同步状态
    int savedState = fullyRelease(node);
    boolean interrupted = false;

    // 阻塞当前线程
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }

    // 线程被唤醒后再次尝试获取同步状态
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

> line: 1880
private Node addConditionWaiter() {
    // 如果不是持有锁的线程调用,抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // 如果lastWaiter被取消了,则清除它
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    // 创建一个CONDITION节点
    Node node = new Node(Node.CONDITION);

    // 更新字段
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

> line: 1943
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 解除对此节点的引用
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

> line: 1756
final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

> line: 1666
final boolean isOnSyncQueue(Node node) {
    // 在addConditionWaiter()方法上创建的新节点都是CONDITION节点
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切的说是AQS)有一个同步队列和多个等待队列:

等待的响应中断版本如下:

> line: 2068
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 检查等待时线程是否被中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被唤醒或者中断后再次尝试获取同步状态
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

> line: 2037
private int checkInterruptWhileWaiting(Node node) {
    // 如果线程等待时被中断,尝试将其加入到同步队列中
    // 在被通知前被中断了,返回THROW_IE
    // 在被通知后被中断了,返回REINTERRUPT
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

> line: 2028
/*
 * 对于响应中断的等待,我们需要跟踪是否要抛出InterruptedException
 * 异常,如果在等待队列上被阻塞时被中断了,那么抛出异常,
 * 如果在同步队列上被阻塞时被中断了,那么就再次中断。
*/

/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT =  1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE    = -1;

> line: 1734
final boolean transferAfterCancelledWait(Node node) {
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        enq(node);
        return true;
    }

    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

> line: 629
// 此方法与addWaiter(Node)方法几乎一样
private Node enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

> line: 2047
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

等待的超时版本如下:

public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 我们不会在这里检查 nanosTimeout <= 0L,使得awaitNanos(0)成为一种释放锁的方式
    // 计算截止时间
    final long deadline = System.nanoTime() + nanosTimeout;
    long initialNanos = nanosTimeout;
    // 增加到等待队列中
    Node node = addConditionWaiter();
    // 释放同步状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 超时时间已经过去,移到同步队列中,退出循环
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        // 睡眠指定时间
        if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 重新计算超时时间
        nanosTimeout = deadline - System.nanoTime();
    }
    // 尝试获取同步状态
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    long remaining = deadline - System.nanoTime(); // avoid overflow
    return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}
> line: 2146
public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    // 获取截止时间
    long abstime = deadline.getTime();
    // 响应中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 增加到等待队列中
    Node node = addConditionWaiter();
    // 释放同步状态
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 到达截止时间,移到同步队列,退出循环
        if (System.currentTimeMillis() >= abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // 阻塞线程
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 尝试获取同步状态
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

通知

调用Condition的signal()方法会唤醒在等待队列中等待时间最长的节点(即首节点),在唤醒线程前,将节点移到同步队列中。

> line: 1973
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

> line: 1906
private void doSignal(Node first) {
    // 解除头节点的引用,同时更新头节点
    // 然后尝试将头节点移到同步队列,如果失败表示此节点已经被取消,
    // 唤醒下一个节点
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

> line: 1707
final boolean transferForSignal(Node node) {
    /*
     * 如果无法改变waitStatus,表示这个节点已经被取消了
     */
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    // 移到同步队列中,并返回前驱结点
    Node p = enq(node);
    int ws = p.waitStatus;
    // 尝试将前驱结点的waitStatus设置为SIGNAL
    // 如果失败了或者前驱节点被取消了,那么就唤醒线程使其继续尝试获取同步状态
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法开头进行了检查。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node)方法返回true),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。

public final void awaitUninterruptibly() {
    // 增加尾节点
    Node node = addConditionWaiter();

    // 释放同步状态
    int savedState = fullyRelease(node);
    boolean interrupted = false;

    // 阻塞当前线程
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }

    // 线程被唤醒后再次尝试获取同步状态
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

final boolean isOnSyncQueue(Node node) {
    // 被移动到同步队列的节点waitStatus被更新为0
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;

    // 查看next字段的注释:"The next field of cancelled nodes is set to
    //     point to the node itself instead of null, to make life
    //     easier for isOnSyncQueue."。从此处我们可以发现这个if语句
    // 存在的意义。
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    // 想象这样一种场景,有一个线程调用signal方法并执行到transferForSignal
    // 方法中的enq方法,尝试将此节点插入到同步队列中,查看enq源码可以发现,
    // 不管CAS tail是否成功,此节点的prev节点已经被设置了,所以第一个if的
    // 检查将会通过(prev!=null),此时这个线程被意外唤醒,然后在await方法中的
    // 循环中继续执行,调用此方法,此时就会符合注释描述的这种情况。
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
    // We check for node first, since it's likely to be at or near tail.
    // tail is known to be non-null, so we could re-order to "save"
    // one null check, but we leave it this way to help the VM.
    for (Node p = tail;;) {
        if (p == node)
            return true;
        if (p == null)
            return false;
        p = p.prev;
    }
}

signalAll()方法则对等待队列中的每个节点都唤醒,将它们全部移到同步队列中。

> line: 1988
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

> line: 1919
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

关于AQS的分析到此结束。除了AQS以外,java.util.concurrent.lock包中还有一个AbstractQueuedLongSynchronizer类,此类的实现与AQS几乎完全一致,只有同步状态使用long表示而不是int。

> line: 99
/**
 * The synchronization state.
 */
private volatile long state;

相关文章

网友评论

      本文标题:2. AbstractQueuedSynchronizer(二)

      本文链接:https://www.haomeiwen.com/subject/ljbdmqtx.html