美文网首页
AbstractQueuedSynchronizer分析

AbstractQueuedSynchronizer分析

作者: YocnZhao | 来源:发表于2019-03-06 17:55 被阅读0次

相信大家看过AQS的代码的话大家对它应该会有基本的认识。它保存了两个Node,head跟tail。先看Node,Node是个双向链表,存储了pre跟next,两个有参构造方法存储Thread。

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;

        static final int CANCELLED = 1;
        static final int SIGNAL = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        //前任
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

如果硬看AQS的话就很晦涩了,我们可以先写一个最简单的AQS的例子,允许线程获取锁,释放锁。

public class Mutex implements Lock {
    private Sync sync = new Sync();

    class Sync extends AbstractQueuedSynchronizer {
        public boolean tryAcquire(int acquire) {
            assert acquire == 1;
            if (compareAndSetState(0, 1)) {//如果是0 -> 变成1
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int release) {
            assert release == 1;
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        Condition getNewCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    public int getQueueLength() {
        return sync.getQueueLength();
    }

    @NotNull
    @Override
    public Condition newCondition() {
        return sync.getNewCondition();
    }
}

我们用这个自定义的锁来对AQS初窥门径,Mutex就相当于synchronized包裹下的notify跟wait,AQS其实看来就相当于object 的锁池。可以看出来Mutex这个类的所有操作其实都是用Sync来做的操作。相当于是AQS的代理类。
直接看lock

sync.acquire(1);

acquire Sync并没有重写acquire,我们看AQS的acquire

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们看Sync实现了tryAcquire,看上面Mutex的代码 CAS 尝试获取锁,如果能获取到就把当前线程设置为当前线程,否则返回false。
我们继续回去看acquire,如果没有获取到线程的话会继续走acquireQueued(addWaiter(Node.EXCLUSIVE), arg),先看addWaiter

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure 快速设置尾节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;//新节点的pre设置为当前的tail节点
            if (compareAndSetTail(pred, node)) {//尝试设置到tail的尾部
                pred.next = node;//成功的话把当前tail的next设置尾新节点,完成尾节点的add
                return node;
            }
        }
        enq(node);
        return node;
    }

看相关代码:首先是

private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

如果tailOffset位置的Node等于expect,就把update改为tail,这个操作是原子的,如果两个线程进来只有一个会成功,第二个比较的时候tailOffest就不是expect了。然后就是enq

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

循环把node放到尾部,返回尾部之前的node,也就是predecessor。

相关文章

网友评论

      本文标题:AbstractQueuedSynchronizer分析

      本文链接:https://www.haomeiwen.com/subject/mpynuqtx.html