美文网首页
AbstractQueuedSynchronizer源码浅析(二

AbstractQueuedSynchronizer源码浅析(二

作者: 小川君 | 来源:发表于2018-09-25 17:49 被阅读0次

独占锁以ReenttrantLock为例,在同一时间有且只有一个线程能够获取到锁,其他线程此时全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。
因为前一篇文章是通过ReentrantLock来介绍AQS的,这里就不再展示代码了,关于独占锁就单独的做一些总结。
首先ReentrantLock分为公平锁和非公平锁,默认是非公平锁,因为线程的唤醒需要开销,所以爱性能上非公平锁要优于公平锁

  • 公平锁:每个线程抢占锁的顺序为先后调用lock方法的顺序,并依此顺序获得锁,类似于排队做什么做什么;
  • 非公平锁:每个线程抢占锁的顺序不变,谁运气好,谁就获得锁,和调用lock方法的先后顺序无关,类似后来的插队。

非公平锁:
lock为入口,通过tryAcquire尝试获取锁,如果获取成功直接执行当前线程的逻辑,如果失败,则通过addWaiter加入到等待队列,然后将此线程挂起,等待队列中的其他线程执行完轮到此线程时,会将当前线程唤醒,然后执行当前线程的逻辑
公平锁:
公平锁的流程跟非公平锁大致一样,只有一处是不同的就是新加入的线程不会立即去尝试获取锁,而是直接加入到队列中,等待其前任线程执行完毕然后被唤醒.

共享锁以CountDownLatch为例

   @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        countDownLatch = new CountDownLatch(5);
        for (int index = 0; index < 5; index++) {
            MyRunnable runnable = new MyRunnable();
            Thread thread = new Thread(runnable, "Thread" + index);
            thread.start();
        }

        try {
            Log.i("chuan", "onCreate: 主线程开始阻塞 ");
            countDownLatch.await();
            Log.i("chuan", "onCreate: 子线程全部执行完毕   主线程开始执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            Log.i("chuan", "run: " + Thread.currentThread().getName()+"获取到锁 并执行"   );
            countDownLatch.countDown();
//            addIndex();
        }
    }
    
    
    

com.chuan.jun I/chuan: run: Thread0获取到锁 并执行
com.chuan.jun I/chuan: onCreate: 主线程开始阻塞 
com.chuan.jun I/chuan: run: Thread2获取到锁 并执行
com.chuan.jun I/chuan: run: Thread4获取到锁 并执行
com.chuan.jun I/chuan: run: Thread3获取到锁 并执行
com.chuan.jun I/chuan: run: Thread1获取到锁 并执行
com.chuan.jun I/chuan: onCreate: 子线程全部执行完毕   主线程开始执行    

先来看构造方法

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
CountDownLatch#Sync    
    Sync(int count) {
        etState(count);
    }
        
AbstractQueuedSynchronizer#Node    
    private volatile int state;        
    protected final void setState(int newState) {
        state = newState;
    }        

传入的数值为共享锁最大可以接受的线程数,通过state标识值控制
我们先从await看

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }  
    
    protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
    }    

tryAcquireShared()用来判断是否阻塞当前线程(示例中为主线程),依据是state标识,从日志中可以看出如果子线程很少的话,比如说一个,因为countDown会减少state的数量,这是时候主线程将不会去阻塞,返回1;当然大部分情况下是-1,也就是阻塞当前线程,进入到doAcquireSharedInterruptibly

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 这里会将线程挂起,线程进入阻塞状态,知道前一节点来唤醒(前一节点为头结点并且执行完的时候)
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

通过addWaiter将转换成节点node的当前线程(调用await的线程)加入到同步队列中,并返回此节点,然后判断如果当前节点的前任节点是否是头结点,这里先讨论不是的情况
shouldParkAfterFailedAcquire判断当前线程的前任线程状态是否是SIGNAL,如果是的话返回true,如果是CANCELLED的话将前任线程脱离出队列,重新为当前线程匹配前任线程,然后返回false,因为是一个死循环,所以会再次判断,如果前任线程不是SIGNAL也不是CANCELLED则将前任线程的状态修改为SIGNAL,然后在循环;需要注意的是每次循环都是当前线程的节点,并没有修改其他节点,这样shouldParkAfterFailedAcquire最后会返回true,最后parkAndCheckInterrupt则是挂起当前线程。

如果当前节点的前任节点是头节点,并且能获取到锁,那么则设置当前线程所代表的节点为头结点然后等待被唤醒,如果头结点为空,则唤醒头结点所代表的后继节点。

再来看countDown

    public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }    

首先尝试通过tryReleaseShared获取锁,

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

从源码中可以看出只有当state的值为0是才会返回true,如果state为0,则说明可以唤醒主线程了,那么doReleaseShared应该是唤醒主线的代码

    private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

首先得到头结点,如果不为空并且状态可以修改为0,则进入unparkSuccessor()去释放头结点的后继节点。(这里释放也是根据同步队列来释放,如果同步队列还有其他的同步线程,最后不一定会及时的唤醒主线程)

    private void unparkSuccessor(Node node) {
   
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

相关文章

网友评论

      本文标题:AbstractQueuedSynchronizer源码浅析(二

      本文链接:https://www.haomeiwen.com/subject/jsiloftx.html