独占锁以ReenttrantLock为例,在同一时间有且只有一个线程能够获取到锁,其他线程此时全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。
因为前一篇文章是通过ReentrantLock来介绍AQS的,这里就不再展示代码了,关于独占锁就单独的做一些总结。
首先ReentrantLock分为公平锁和非公平锁,默认是非公平锁,因为线程的唤醒需要开销,所以爱性能上非公平锁要优于公平锁
- 公平锁:每个线程抢占锁的顺序为先后调用lock方法的顺序,并依此顺序获得锁,类似于排队做什么做什么;
- 非公平锁:每个线程抢占锁的顺序不变,谁运气好,谁就获得锁,和调用lock方法的先后顺序无关,类似后来的插队。
非公平锁:
lock为入口,通过tryAcquire尝试获取锁,如果获取成功直接执行当前线程的逻辑,如果失败,则通过addWaiter加入到等待队列,然后将此线程挂起,等待队列中的其他线程执行完轮到此线程时,会将当前线程唤醒,然后执行当前线程的逻辑
公平锁:
公平锁的流程跟非公平锁大致一样,只有一处是不同的就是新加入的线程不会立即去尝试获取锁,而是直接加入到队列中,等待其前任线程执行完毕然后被唤醒.
共享锁以CountDownLatch为例
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
countDownLatch = new CountDownLatch(5);
for (int index = 0; index < 5; index++) {
MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable, "Thread" + index);
thread.start();
}
try {
Log.i("chuan", "onCreate: 主线程开始阻塞 ");
countDownLatch.await();
Log.i("chuan", "onCreate: 子线程全部执行完毕 主线程开始执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
Log.i("chuan", "run: " + Thread.currentThread().getName()+"获取到锁 并执行" );
countDownLatch.countDown();
// addIndex();
}
}
com.chuan.jun I/chuan: run: Thread0获取到锁 并执行
com.chuan.jun I/chuan: onCreate: 主线程开始阻塞
com.chuan.jun I/chuan: run: Thread2获取到锁 并执行
com.chuan.jun I/chuan: run: Thread4获取到锁 并执行
com.chuan.jun I/chuan: run: Thread3获取到锁 并执行
com.chuan.jun I/chuan: run: Thread1获取到锁 并执行
com.chuan.jun I/chuan: onCreate: 子线程全部执行完毕 主线程开始执行
先来看构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch#Sync
Sync(int count) {
etState(count);
}
AbstractQueuedSynchronizer#Node
private volatile int state;
protected final void setState(int newState) {
state = newState;
}
传入的数值为共享锁最大可以接受的线程数,通过state标识值控制
我们先从await看
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
tryAcquireShared()
用来判断是否阻塞当前线程(示例中为主线程),依据是state标识,从日志中可以看出如果子线程很少的话,比如说一个,因为countDown
会减少state的数量,这是时候主线程将不会去阻塞,返回1;当然大部分情况下是-1,也就是阻塞当前线程,进入到doAcquireSharedInterruptibly
中
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 这里会将线程挂起,线程进入阻塞状态,知道前一节点来唤醒(前一节点为头结点并且执行完的时候)
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
通过addWaiter
将转换成节点node的当前线程(调用await的线程)加入到同步队列中,并返回此节点,然后判断如果当前节点的前任节点是否是头结点,这里先讨论不是的情况
shouldParkAfterFailedAcquire
判断当前线程的前任线程状态是否是SIGNAL
,如果是的话返回true,如果是CANCELLED
的话将前任线程脱离出队列,重新为当前线程匹配前任线程,然后返回false,因为是一个死循环,所以会再次判断,如果前任线程不是SIGNAL
也不是CANCELLED
则将前任线程的状态修改为SIGNAL
,然后在循环;需要注意的是每次循环都是当前线程的节点,并没有修改其他节点,这样shouldParkAfterFailedAcquire
最后会返回true,最后parkAndCheckInterrupt
则是挂起当前线程。
如果当前节点的前任节点是头节点,并且能获取到锁,那么则设置当前线程所代表的节点为头结点然后等待被唤醒,如果头结点为空,则唤醒头结点所代表的后继节点。
再来看countDown
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
首先尝试通过tryReleaseShared
获取锁,
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
从源码中可以看出只有当state的值为0是才会返回true,如果state为0,则说明可以唤醒主线程了,那么doReleaseShared
应该是唤醒主线的代码
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
首先得到头结点,如果不为空并且状态可以修改为0,则进入unparkSuccessor()去释放头结点的后继节点。(这里释放也是根据同步队列来释放,如果同步队列还有其他的同步线程,最后不一定会及时的唤醒主线程)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
网友评论