美文网首页
AQS源码分析

AQS源码分析

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-04 16:26 被阅读0次

简介

AbstractQueuedSynchronizer面向的是锁的实现者,它通过模板方法简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待、唤醒等的底层操作。用户可以通过实现AQS提供的模板方法中的指定方法去实现自定义同步组件,例如:ReentrantLock、ReentrantReadWriteLock、CountDownLatch等


image.png

结构

试想一下假如让我们自己去实现一个安全的高效的通用的同步器我们应该怎么做?

  1. 首先需要一个成员变量记录同步状态,考虑到要满足重入锁场景,那么这个状态字段应该是int而非boolean,考虑到安全性应该使用volatile修饰
  2. 其次既然是同步器就肯定会遇到获取不到锁的线程阻塞的情况,那么这些阻塞的线程需要有一个容器来存储;考虑到FIFO可以使用ConcurrentLinkedQueue(Doug Lea大师的实现是用了一个内部类Node,更加简洁)
  3. 考虑到高效的线程间通信,使用CAS自旋+park/unpark实现(可以想一下为啥不用wait/notify?)
  4. 考虑到通用性可以使用模板方法设计模式,把不变的流程设计成final方法,变化的细节设计成空方法让使用者去实现
AQS.png
  • state:同步状态,表示当前有多少个线程同时持有该锁
  • Node:双向队列用于保存阻塞的线程
  • 模板方法:实现自定义同步组件时会调用同步器提供的模板方法
  • 重写方法:重写这些方法实现自定义同步器的个性化逻辑

核心源码解读

独占式

/**
* 独占式同步状态获取
*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        //当前线程设置中断标志
        selfInterrupt();
    }
}
  1. 首先调用自定义同步器实现的tryAcquire方法,该方法保证线程安全的获取同步状态
  2. 获取同步状态失败,则构造同步节点并通过addWaiter方法插入到同步队列队尾
  3. 调用acquireQueued方法,使该节点自旋的方式获取同步状态
private Node addWaiter(Node mode) {
    //创建当前线程的队列节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        //当前节点前驱节点设置为原tail节点
        node.prev = pred;
        //cas设置新的tail节点,成功则返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //cas失败,将并发添加节点的请求通过自旋变得串行化
    enq(node);
    return node;
}

private Node enq(final Node node) {
    //自旋
    for (;;) {
        //tail节点会变化(其他线程节点先插入队列)
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            ////不断尝试cas操作,直到成功为止
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
/**
* 节点进入同步队列之后,进入自旋过程,每个线程节点都在自省得观察,
* 当获取到同步状态,就可以从自旋中退出
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋
        for (;;) {
            //当前节点前驱节点
            final Node p = node.predecessor();
            //前驱节点为head,再次尝试调用自定义tryAcquire方法获取同步状态
            if (p == head && tryAcquire(arg)) {
                //获取到同步状态(前驱节点已经释放锁),把当前节点设置为head
                setHead(node);
                //前驱节点置空,将没有任何引用指向该Node对象,让GC进行回收
                p.next = null;
                failed = false;
                //获取到锁返回继续执行
                return interrupted;
            }
            //shouldParkAfterFailedAcquire判断是否需要park当前线程,parkAndCheckInterrupt:park当前线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) {
        //前驱节点状态为-1,需要阻塞当前线程,等待前驱节点线程release时唤醒当前节点
        //这一次自旋没有走该逻辑的话,下一次自旋时如果当前线程仍没获取到同步状态,则一定会走这段逻辑
        return true;
    }
    if (ws > 0) {
        //前驱节点状态为 1(CANCELLED), 即前驱节点已经超时或被中断,
        //则找到最近一个不等于 1 的节点把当前节点接在后面,并且不需要park当前线程
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //前驱节点状态为0 或 -2 或 -3,cas设置前驱节点状态为 -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    //阻塞当前线程,释放cpu资源,等待被其他线程调用unpark唤醒
    LockSupport.park(this);
    //返回当前线程中断状态
    return Thread.interrupted();
}
/**
* 当前线程获取到同步状态并执行了响应逻辑后,需要释放同步状态
* 独占式的释放同步状态,释放后会唤醒同步队列中头节点中线程
*/
public final boolean release(int arg) {
    //调用自定义tryRelease方法,决定是否要释放锁
    if (tryRelease(arg)) {
        Node h = head;
        //队列不为空,且头节点状态不是初始状态
        if (h != null && h.waitStatus != 0) {
            //唤醒头节点
            unparkSuccessor(h);
        }
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) {
        //头节点状态不是CANCELLED,cas设置为0,?
        compareAndSetWaitStatus(node, ws, 0);
    }
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        //头节点后继节点为空或CANCELLED,向后遍历找到第一个不为CANCELLED的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) {
        //unpark节点s的线程
        LockSupport.unpark(s.thread);
    }
}

共享式

共享式获取于独占式获取最重要的区别在于同一时刻能否有多个线程同时获取到同步状态。以读写锁为例:同一时刻一个线程持有某把读锁,那么其他线程获取读锁可以同时进行,获取写锁则被阻塞(放到阻塞队列排队)

/**
* 同独占式相比,代码主要多了设置头节点后唤醒后继共享节点的逻辑
*/
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    //把当前节点设置为头节点(同独占式)
    setHead(node);
    /*
     * 这里有三种情况执行唤醒操作:
     * 1. propagate > 0,表示调用方指明了后继节点需要被唤醒
     * 2. 头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared()) {
            //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
            doReleaseShared();
        }
    }
}

private void doReleaseShared() {
    //死循环,直到h == head才退出
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //SIGNAL表示后继节点需要被唤醒
            if (ws == Node.SIGNAL) {
                //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
                    continue;      
                } 
                //执行唤醒操作
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
                continue;    
            }
        }
        //退出条件,h不变,head可能被刚唤醒的线程改变
        if (h == head)  
            break;
    }
}
/**
* 共享锁释放
*/
public final boolean releaseShared(int arg) {
    //调用自定义tryReleaseShared方法释放共享锁
    if (tryReleaseShared(arg)) {
        ////唤醒过程,详情见上面分析
        doReleaseShared();
        return true;
    }
    return false;
}

相关文章

网友评论

      本文标题:AQS源码分析

      本文链接:https://www.haomeiwen.com/subject/swhnuktx.html