美文网首页我的学习
02_ReentrantReadWriteLock源码分析

02_ReentrantReadWriteLock源码分析

作者: Mr_Qiang | 来源:发表于2020-12-02 10:50 被阅读0次

类结构

Df2vNV.png

简介

跟ReentrantLock原理一样,都是可以重入,公平/非公平的互斥锁。内部虽然有一个读锁和写锁,但是用的都是同一个Sync。
在原理上跟ReentantLock不同的是,当所有线程去获取读锁的时候,是不会阻塞的。只有当线程持有写锁/读锁,另一个线程再去获取读锁/写锁的时候,才会阻塞。总之,读读不会阻塞,其余都会阻塞。(不包括两次获取锁都是同一线程的情况->锁升级)

锁升级是指当前线程持有读锁的时候,再去获取写锁,反之也是升级。但是需要注意,==只有持有写锁,再去获取读锁的时候,才能升级成功。== 持有读锁,再去获取写锁,必须要先释放读锁,才能去获取写锁。

还有跟ReentantLock不同的是,state不在是0可以获取锁,1获取不到锁。而是用int 的state位标识当前锁的状态。state的高16位代表读锁的状态,0代表可以获取读锁,1代表不能获取读锁。同理低16位没有1就是没有人获取写锁。



        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

加锁流程

场景一:多线程获取读锁

D4xmRS.png

代码

        protected final int tryAcquireShared(int unused) {
   
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
            ....
                return 1;
            }
            return fullTryAcquireShared(current);
        }

如果没有人获取写锁,这时候获取读锁非常的简单,就是state的高位+1。

场景一:多线程获取读锁写锁

加锁

DfqwKx.png
  • 一开始t1去获取锁,因为没有竞争,获取锁成功,setOwner为自己,state的低八位加一。
        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                ...
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
  • 然后t2又来获取读锁。
    1. 先是获取锁,发现写锁已经被占了,且不是当前线程,获取锁失败了。
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    1. 获取锁失败后,去调用doAcquireShared(1)方法。这个跟之前ReenrantLock的阻塞然后排队方法一样,都是创建一个空的Node作为head,然后把当前阻塞的节点跟在head后面。且设置head的waitStatus为-1.然后调用LockSuport.park停住线程。唯一不同的是,t2获取的是读锁,他的节点类型是Node.SHARED
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 同理t3,t4都以此挂在了head后面,根据获取锁的不同分为Node.SHARED和Node.EXCLUSIVE。到此加锁的流程结束了。

解锁

D4TwNQ.png
  • 第一步是t1先释放锁,把state的低八位减一。这时候如果state的低八位为0代表没重入,释放成功,setOwner为null。
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


    protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
  • 第二步,检查head是否不为空,且head的waitstatus是否不等于零,如果是就去unpark其他排队的线程。

唤醒其他线程之前,我们先看下其他获取读锁的线程是在哪儿被park的

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//① 获取前驱节点
                if (p == head) {
                    int r = tryAcquireShared(arg); //② 再次获取锁,state 的高八位+1
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); ③ 设置自己节点为头节点,然后唤醒后继节点如果是shared的,结束。
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //注意注意: t2,t3都是在这里被park的,unpark后,从这里唤醒,回到上面第一步。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

以下是如何唤醒其他线程

    1. t2是在doAcquireShared(1)复活,这个时候又一次循环后,进入tryAcquireShared(1)方法获取锁,获取锁后,state的高八位+1。
    1. t2获取锁成功后,第一件事就是setHeadAndPropagate把自己设置成head,然后唤醒下一个如果是shared的节点。同理下个节点唤醒后第一件事就是把自己设置成head,然后唤醒下一个,直到下一个的下一个不是shared或者已经是tail了为止,程序中止。
      ==注意因为被唤醒的是读锁线程,那他是有责任唤醒他后面连续排队的获取读锁线程,即读锁的传播==
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    
    private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

相关文章

网友评论

    本文标题:02_ReentrantReadWriteLock源码分析

    本文链接:https://www.haomeiwen.com/subject/wjuywktx.html