美文网首页
aqs中condition的实现

aqs中condition的实现

作者: justonemoretry | 来源:发表于2022-05-26 09:51 被阅读0次

condition使用场景

condition条件变量的使用,看起来和操作系统中的pthread_cond很像,又类似于object.wait和object.signal的用法。在生产者消费者场景使用较多,之前在阻塞队列中经常看到。
Condition接口的await/signal机制是设计用来代替监视器锁的wait/notify机制的,因此,与监视器锁的wait/notify机制对照着学习有助于我们更好的理解Conditon接口:


image.png

首先,我们通过wait/notify机制来类比await/signal机制:

  1. 调用wait方法的线程首先必须是已经进入了同步代码块,即已经获取了监视器锁;与之类似,调用await方法的线程首先必须获得lock锁
  2. 调用wait方法的线程会释放已经获得的监视器锁,进入当前监视器锁的等待队列(wait set)中;与之类似,调用await方法的线程会释放已经获得的lock锁,进入到当前Condtion对应的条件队列中。
  3. 调用监视器锁的notify方法会唤醒等待在该监视器锁上的线程,这些线程将开始参与锁竞争,并在获得锁后,从wait方法处恢复执行;与之类似,调用Condtion的signal方法会唤醒对应的条件队列中的线程,这些线程将开始参与锁竞争,并在获得锁后,从await方法处开始恢复执行。

官网示例,典型的生产者消费者场景:

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生产者方法,往数组里面写数据
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await(); //数组已满,没有空间时,挂起等待,直到数组“非满”(notFull)
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            // 因为放入了一个数据,数组肯定不是空的了
            // 此时唤醒等待这notEmpty条件上的线程
            notEmpty.signal(); 
        } finally {
            lock.unlock();
        }
    }

    // 消费者方法,从数组里面拿数据
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 数组是空的,没有数据可拿时,挂起等待,直到数组非空(notEmpty)
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            // 因为拿出了一个数据,数组肯定不是满的了
            // 此时唤醒等待这notFull条件上的线程
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

这是java官方文档提供的例子,是一个典型的生产者-消费者模型。这里在同一个lock锁上,创建了两个条件队列notFull, notEmpty。当数组已满,没有存储空间时,put方法在notFull条件上等待,直到数组“not full”;当数组空了,没有数据可读时,take方法在notEmpty条件上等待,直到数组“not empty”,而notEmpty.signal()和notFull.signal()则用来唤醒等待在这个条件上的线程。

同步队列 vs 条件队列

sync queue

所有等待锁的线程都会被包装成Node扔到一个同步队列中。该同步队列如下:

image.png
sync queue是一个双向链表,我们使用prevnext属性来串联节点。但是在这个同步队列中,我们一直没有用到nextWaiter属性,即使是在共享锁模式下,这一属性也只作为一个标记,指向了一个空节点,因此,在sync queue中,我们不会用它来串联节点。

condtion queue

每创建一个Condtion对象就会对应一个Condtion队列,每一个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中,就像这样:


image.png

可见,每一个Condition对象对应一个Conditon队列,每个Condtion队列都是独立的,互相不影响的。在上图中,如果我们对当前线程调用了notFull.await(), 则当前线程就会被包装成Node加到notFull队列的末尾。
值得注意的是,condition queue是一个单向链表,在该链表中我们使用nextWaiter属性来串联链表。但是,就像在sync queue中不会使用nextWaiter属性来串联链表一样,在condition queue中,也并不会用到prev, next属性,它们的值都为null。也就是说,在条件队列中,Node节点真正用到的属性只有三个:

  • thread:代表当前正在等待某个条件的线程
  • waitStatus:条件的等待状态
  • nextWaiter:指向条件队列中的下一个节点
    既然这里又提到了waitStatus,我们这里再回顾一下它的取值范围:
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

在条件队列中,我们只需要关注一个值即可——CONDITION。它表示线程处于正常的等待状态,而只要waitStatus不是CONDITION,我们就认为线程不再等待了,此时就要从条件队列中出队。

sync queue 和 conditon queue的联系

一般情况下,等待锁的sync queue和条件队列condition queue是相互独立的,彼此之间并没有任何关系。但是,当我们调用某个条件队列的signal方法时,会将某个或所有等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程一样需要去争锁,如果没有抢到,则同样要被加到等待锁的sync queue中去,此时节点就从condition queue中被转移到sync queue中:


image.png

但是,这里尤其要注意的是,node是被一个一个转移过去的,哪怕我们调用的是signalAll()方法也是一个一个转移过去的,而不是将整个条件队列接在sync queue的末尾。

同时要注意的是,我们在sync queue中只使用prev、next来串联链表,而不使用nextWaiter;我们在condition queue中只使用nextWaiter来串联链表,而不使用prev、next.事实上,它们就是两个使用了同样的Node数据结构的完全独立的两种链表。因此,将节点从condition queue中转移到sync queue中时,我们需要断开原来的链接(nextWaiter),建立新的链接(prev, next),这某种程度上也是需要将节点一个一个地转移过去的原因之一。

入队时和出队时的锁状态

sync queue是等待锁的队列,当一个线程被包装成Node加到该队列中时,必然是没有获取到锁;当处于该队列中的节点获取到了锁,它将从该队列中移除(事实上移除操作是将获取到锁的节点设为新的dummy head,并将thread属性置为null)。
condition队列是等待在特定条件下的队列,因为调用await方法时,必然是已经获得了lock锁,所以在进入condtion队列前线程必然是已经获取了锁;在被包装成Node扔进条件队列中后,线程将释放锁,然后挂起;当处于该队列中的线程被signal方法唤醒后,由于队列中的节点在之前挂起的时候已经释放了锁,所以必须先去再次的竞争锁,因此,该节点会被添加到sync queue中。因此,条件队列在出队时,线程并不持有锁。
所以事实上,这两个队列的锁状态正好相反:

  • condition queue:入队时已经持有了锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到sync queue
  • sync queue:入队时没有锁 -> 在队列中争锁 -> 离开队列时获得了锁

源码分析

AQS对Condition这个接口的实现主要是通过ConditionObject,上面已经说个,它的核心实现就是是一个条件队列,每一个在某个condition上等待的线程都会被封装成Node对象扔进这个条件队列。

核心属性

它的核心属性只有两个:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

这两个属性分别代表了条件队列的队头和队尾,每当我们新建一个conditionObject对象,都会对应一个条件队列。

构造函数

public ConditionObject() { }

构造函数啥也没干,可见,条件队列是延时初始化的,在真正用到的时候才会初始化。

Condition接口方法实现

await方法分析

public final void await() throws InterruptedException {
    // 如果当前线程在调动await()方法前已经被中断了,则直接抛出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程封装成Node添加到条件队列
    Node node = addConditionWaiter();
    // 释放当前线程所占用的锁,保存当前的锁状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果当前队列不在同步队列中,说明刚刚被await, 还没有人调用signal方法,则直接将当前线程挂起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 线程将在这里被挂起,停止运行
        // 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了
        // 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 放到队列中获锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

里面涉及到的方法我们逐个看下。
首先是将当前线程封装成Node扔进条件队列中的addConditionWaiter方法:
addConditionWaiter

       /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 加到队列尾部
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

首先我们要思考的是,存在两个不同的线程同时入队的情况吗?不存在。为什么呢?因为前面说过了,能调用await方法的线程必然是已经获得了锁,而获得了锁的线程只有一个,所以这里不存在并发,因此不需要CAS操作。
在这个方法中,我们就是简单的将当前线程封装成Node加到条件队列的末尾。这和将一个线程封装成Node加入等待队列略有不同:

  1. 节点加入sync queue时waitStatus的值为0,但节点加入condition queue时waitStatus的值为Node.CONDTION。
  2. sync queue的头节点为dummy节点,如果队列为空,则会先创建一个dummy节点,再创建一个代表当前节点的Node添加在dummy节点的后面;而condtion queue 没有dummy节点,初始化时,直接将firstWaiter和lastWaiter直接指向新建的节点就行了。
  3. sync queue是一个双向队列,在节点入队后,要同时修改当前节点的前驱和前驱节点的后继;而在condtion queue中,我们只修改了前驱节点的nextWaiter,也就是说,condtion queue是作为单向队列来使用的。

如果入队时发现尾节点已经取消等待了,那么我们就不应该接在它的后面,此时需要调用unlinkCancelledWaiters来剔除那些已经取消等待的线程:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 起到前置节点的作用
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 不是condition状态进行删除操作
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

该方法将从头节点开始遍历整个队列,剔除其中waitStatus不为Node.CONDTION的节点,这里使用了两个指针firstWaiter和trail来分别记录第一个和最后一个waitStatus不为Node.CONDTION的节点,这些都是基础的链表操作,很容易理解,这里不再赘述了。
** fullyRelease**
在节点被成功添加到队列的末尾后,我们将调用fullyRelease来释放当前线程所占用的锁:

/**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

值得注意的是,这是一次性释放了所有的锁,即对于可重入锁而言,无论重入了几次,这里是一次性释放完的,这也就是为什么该方法的名字叫fullyRelease。但这里尤其要注意的是release(savedState)方法是有可能抛出IllegalMonitorStateException的,这是因为当前线程可能并不是持有锁的线程。但是咱前面不是说,只有持有锁的线程才能调用await方法吗?既然fullyRelease方法在await方法中,为啥当前线程还有可能并不是持有锁的线程呢?
虽然话是这么说,但是在调用await方法时,我们其实并没有检测Thread.currentThread() == getExclusiveOwnerThread(),换句话说,也就是执行到fullyRelease这一步,我们才会检测这一点,而这一点检测是由AQS子类实现tryRelease方法来保证的,例如,ReentrantLock对tryRelease方法的实现如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

当发现当前线程不是持有锁的线程时,我们就会进入finally块,将当前Node的状态设为Node.CANCELLED,这也就是为什么上面的addConditionWaiter在添加新节点前每次都会检查尾节点是否已经被取消了。

参考文章

逐行分析AQS源码(4)——Condition接口实现

相关文章

网友评论

      本文标题:aqs中condition的实现

      本文链接:https://www.haomeiwen.com/subject/buduprtx.html