condition使用场景
condition条件变量的使用,看起来和操作系统中的pthread_cond很像,又类似于object.wait和object.signal的用法。在生产者消费者场景使用较多,之前在阻塞队列中经常看到。
Condition接口的await/signal机制是设计用来代替监视器锁的wait/notify机制的,因此,与监视器锁的wait/notify机制对照着学习有助于我们更好的理解Conditon接口:
image.png
首先,我们通过wait/notify机制来类比await/signal机制:
- 调用wait方法的线程首先必须是已经进入了同步代码块,即已经获取了监视器锁;与之类似,调用await方法的线程首先必须获得lock锁
- 调用wait方法的线程会释放已经获得的监视器锁,进入当前监视器锁的等待队列(wait set)中;与之类似,调用await方法的线程会释放已经获得的lock锁,进入到当前Condtion对应的条件队列中。
- 调用监视器锁的notify方法会唤醒等待在该监视器锁上的线程,这些线程将开始参与锁竞争,并在获得锁后,从wait方法处恢复执行;与之类似,调用Condtion的signal方法会唤醒对应的条件队列中的线程,这些线程将开始参与锁竞争,并在获得锁后,从await方法处开始恢复执行。
官网示例,典型的生产者消费者场景:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生产者方法,往数组里面写数据
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); //数组已满,没有空间时,挂起等待,直到数组“非满”(notFull)
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
// 因为放入了一个数据,数组肯定不是空的了
// 此时唤醒等待这notEmpty条件上的线程
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 消费者方法,从数组里面拿数据
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 数组是空的,没有数据可拿时,挂起等待,直到数组非空(notEmpty)
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
// 因为拿出了一个数据,数组肯定不是满的了
// 此时唤醒等待这notFull条件上的线程
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
这是java官方文档提供的例子,是一个典型的生产者-消费者模型。这里在同一个lock锁上,创建了两个条件队列notFull, notEmpty。当数组已满,没有存储空间时,put方法在notFull条件上等待,直到数组“not full”;当数组空了,没有数据可读时,take方法在notEmpty条件上等待,直到数组“not empty”,而notEmpty.signal()和notFull.signal()则用来唤醒等待在这个条件上的线程。
同步队列 vs 条件队列
sync queue
所有等待锁的线程都会被包装成Node扔到一个同步队列中。该同步队列如下:
image.png
sync queue是一个双向链表,我们使用prev、next属性来串联节点。但是在这个同步队列中,我们一直没有用到nextWaiter属性,即使是在共享锁模式下,这一属性也只作为一个标记,指向了一个空节点,因此,在sync queue中,我们不会用它来串联节点。
condtion queue
每创建一个Condtion对象就会对应一个Condtion队列,每一个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中,就像这样:
image.png
可见,每一个Condition对象对应一个Conditon队列,每个Condtion队列都是独立的,互相不影响的。在上图中,如果我们对当前线程调用了notFull.await(), 则当前线程就会被包装成Node加到notFull队列的末尾。
值得注意的是,condition queue是一个单向链表,在该链表中我们使用nextWaiter属性来串联链表。但是,就像在sync queue中不会使用nextWaiter属性来串联链表一样,在condition queue中,也并不会用到prev, next属性,它们的值都为null。也就是说,在条件队列中,Node节点真正用到的属性只有三个:
- thread:代表当前正在等待某个条件的线程
- waitStatus:条件的等待状态
- nextWaiter:指向条件队列中的下一个节点
既然这里又提到了waitStatus,我们这里再回顾一下它的取值范围:
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
在条件队列中,我们只需要关注一个值即可——CONDITION。它表示线程处于正常的等待状态,而只要waitStatus不是CONDITION,我们就认为线程不再等待了,此时就要从条件队列中出队。
sync queue 和 conditon queue的联系
一般情况下,等待锁的sync queue和条件队列condition queue是相互独立的,彼此之间并没有任何关系。但是,当我们调用某个条件队列的signal方法时,会将某个或所有等待在这个条件队列中的线程唤醒,被唤醒的线程和普通线程一样需要去争锁,如果没有抢到,则同样要被加到等待锁的sync queue中去,此时节点就从condition queue中被转移到sync queue中:
image.png
但是,这里尤其要注意的是,node是被一个一个转移过去的,哪怕我们调用的是signalAll()方法也是一个一个转移过去的,而不是将整个条件队列接在sync queue的末尾。
同时要注意的是,我们在sync queue中只使用prev、next来串联链表,而不使用nextWaiter;我们在condition queue中只使用nextWaiter来串联链表,而不使用prev、next.事实上,它们就是两个使用了同样的Node数据结构的完全独立的两种链表。因此,将节点从condition queue中转移到sync queue中时,我们需要断开原来的链接(nextWaiter),建立新的链接(prev, next),这某种程度上也是需要将节点一个一个地转移过去的原因之一。
入队时和出队时的锁状态
sync queue是等待锁的队列,当一个线程被包装成Node加到该队列中时,必然是没有获取到锁;当处于该队列中的节点获取到了锁,它将从该队列中移除(事实上移除操作是将获取到锁的节点设为新的dummy head,并将thread属性置为null)。
condition队列是等待在特定条件下的队列,因为调用await方法时,必然是已经获得了lock锁,所以在进入condtion队列前线程必然是已经获取了锁;在被包装成Node扔进条件队列中后,线程将释放锁,然后挂起;当处于该队列中的线程被signal方法唤醒后,由于队列中的节点在之前挂起的时候已经释放了锁,所以必须先去再次的竞争锁,因此,该节点会被添加到sync queue中。因此,条件队列在出队时,线程并不持有锁。
所以事实上,这两个队列的锁状态正好相反:
- condition queue:入队时已经持有了锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到sync queue
- sync queue:入队时没有锁 -> 在队列中争锁 -> 离开队列时获得了锁
源码分析
AQS对Condition这个接口的实现主要是通过ConditionObject,上面已经说个,它的核心实现就是是一个条件队列,每一个在某个condition上等待的线程都会被封装成Node对象扔进这个条件队列。
核心属性
它的核心属性只有两个:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
这两个属性分别代表了条件队列的队头和队尾,每当我们新建一个conditionObject对象,都会对应一个条件队列。
构造函数
public ConditionObject() { }
构造函数啥也没干,可见,条件队列是延时初始化的,在真正用到的时候才会初始化。
Condition接口方法实现
await方法分析
public final void await() throws InterruptedException {
// 如果当前线程在调动await()方法前已经被中断了,则直接抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程封装成Node添加到条件队列
Node node = addConditionWaiter();
// 释放当前线程所占用的锁,保存当前的锁状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果当前队列不在同步队列中,说明刚刚被await, 还没有人调用signal方法,则直接将当前线程挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 线程将在这里被挂起,停止运行
// 能执行到这里说明要么是signal方法被调用了,要么是线程被中断了
// 所以检查下线程被唤醒的原因,如果是因为中断被唤醒,则跳出while循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 放到队列中获锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
里面涉及到的方法我们逐个看下。
首先是将当前线程封装成Node扔进条件队列中的addConditionWaiter方法:
addConditionWaiter
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 加到队列尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首先我们要思考的是,存在两个不同的线程同时入队的情况吗?不存在。为什么呢?因为前面说过了,能调用await方法的线程必然是已经获得了锁,而获得了锁的线程只有一个,所以这里不存在并发,因此不需要CAS操作。
在这个方法中,我们就是简单的将当前线程封装成Node加到条件队列的末尾。这和将一个线程封装成Node加入等待队列略有不同:
- 节点加入sync queue时waitStatus的值为0,但节点加入condition queue时waitStatus的值为Node.CONDTION。
- sync queue的头节点为dummy节点,如果队列为空,则会先创建一个dummy节点,再创建一个代表当前节点的Node添加在dummy节点的后面;而condtion queue 没有dummy节点,初始化时,直接将firstWaiter和lastWaiter直接指向新建的节点就行了。
- sync queue是一个双向队列,在节点入队后,要同时修改当前节点的前驱和前驱节点的后继;而在condtion queue中,我们只修改了前驱节点的nextWaiter,也就是说,condtion queue是作为单向队列来使用的。
如果入队时发现尾节点已经取消等待了,那么我们就不应该接在它的后面,此时需要调用unlinkCancelledWaiters来剔除那些已经取消等待的线程:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 起到前置节点的作用
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 不是condition状态进行删除操作
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
该方法将从头节点开始遍历整个队列,剔除其中waitStatus不为Node.CONDTION的节点,这里使用了两个指针firstWaiter和trail来分别记录第一个和最后一个waitStatus不为Node.CONDTION的节点,这些都是基础的链表操作,很容易理解,这里不再赘述了。
** fullyRelease**
在节点被成功添加到队列的末尾后,我们将调用fullyRelease来释放当前线程所占用的锁:
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
值得注意的是,这是一次性释放了所有的锁,即对于可重入锁而言,无论重入了几次,这里是一次性释放完的,这也就是为什么该方法的名字叫fullyRelease。但这里尤其要注意的是release(savedState)方法是有可能抛出IllegalMonitorStateException的,这是因为当前线程可能并不是持有锁的线程。但是咱前面不是说,只有持有锁的线程才能调用await方法吗?既然fullyRelease方法在await方法中,为啥当前线程还有可能并不是持有锁的线程呢?
虽然话是这么说,但是在调用await方法时,我们其实并没有检测Thread.currentThread() == getExclusiveOwnerThread(),换句话说,也就是执行到fullyRelease这一步,我们才会检测这一点,而这一点检测是由AQS子类实现tryRelease方法来保证的,例如,ReentrantLock对tryRelease方法的实现如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
当发现当前线程不是持有锁的线程时,我们就会进入finally块,将当前Node的状态设为Node.CANCELLED,这也就是为什么上面的addConditionWaiter在添加新节点前每次都会检查尾节点是否已经被取消了。












网友评论