美文网首页
JUC学习笔记之Semaphore

JUC学习笔记之Semaphore

作者: Moine0828 | 来源:发表于2019-08-28 17:34 被阅读0次

Semaphore(信号量) 又是一个并发包里的常用组件。它是一个计数信号量,包含一组许可证,可以用acquire方法阻塞,直到获取一个许可证,可以用release方法释放一个许可证。Semaphore只是对可获取的数量进行维护,并没有真正的创建许可证对象。

简单介绍一下Semaphore的使用,举一个LeetCode的题为例

https://leetcode-cn.com/problems/print-in-order/
我们提供了一个类:
public class Foo {
  public void one() { print("one"); }
  public void two() { print("two"); }
  public void three() { print("three"); }
}
三个不同的线程将会共用一个 Foo 实例。
  线程 A 将会调用 one() 方法
  线程 B 将会调用 two() 方法
  线程 C 将会调用 three() 方法
请设计修改程序,以确保 two() 方法在 one() 方法之后被执行,three() 方法在 two() 方法之后被执行。

这道题有很多种解法,这里用信号量来实现一种解法

public class Foo {
    //既然有三个方法,可以定义一个有3个许可证的信号量,而且必须得是非公平的
    private Semaphore semaphore = new Semaphore(3);

    public void first(Runnable printFirst) throws InterruptedException {

        for(;;) {
            //自旋阻塞,当信号量可用凭证为3时,获取一个凭证
            if (semaphore.availablePermits() == 3) {
                //初始化凭证数量就是3,那么一开始肯定先进入了这个分支中
                printFirst.run();
                //获取一个凭证,可用凭证数量变为2
                semaphore.acquire();
                break;
            }
        }

    }

    public void second(Runnable printSecond) throws InterruptedException {

        for(;;) {
            //当first获取了一个凭证后可用数量减少,
            //那么此时second可以进入条件分支
            if (semaphore.availablePermits() == 2) {
                printSecond.run();
                semaphore.acquire();
                break;
            }
        }
    }

    public void third(Runnable printThird) throws InterruptedException {

        for(;;) {
            //同上
            if (semaphore.availablePermits() == 1) {
                printThird.run();
                semaphore.acquire();
                break;
            }
        }
    }
}

上文代码中所使用的acquire方法是阻塞的,若不想阻塞可以使用tryAcquire

事实上Semaphore对于凭证数量的管理也是借鉴了AQS的status属性实现的。所以熟悉AQS的朋友应该很容易理解它的实现

下面来看看常用的几个方法的源码实现

acquire

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}

直接调用了AQS的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

acquireSharedInterruptibly方法里首先判断线程是不是被中断了,是的话抛出异常。然后调用了tryAcquireShared方法来尝试获取一个信号量,如果返回值小于0,表示获取失败,将会调用doAcquireSharedInterruptibly方法使当前线程进入队列进行等待。由于这一步存在等待操作,所以方法是阻塞的

tryAcquireShared方法是AQS留给子类实现的模板方法,在Semaphore中有公平和非公平的两种实现,以非公平为例,具体的实现是在Semaphore的内部类Sync的nonfairTryAcquireShared方法中

final int nonfairTryAcquireShared(int acquires) {
      for (;;) {
           int available = getState();
           int remaining = available - acquires;
           if (remaining < 0 ||
           compareAndSetState(available, remaining))
               return remaining;
     }
 }

在Semaphore中,AQS的status用来表示当前剩余可用的凭证数量

先把需要获取的数量减下去,然后尝试用CAS的方式去设置。

如果需要获取的数量超过了当前剩余的凭证数量,则方法返回值将小于0,如果设置成功,那么返回值将大于0。在acquireSharedInterruptibly方法分析我们已经提到了,返回值小于0的话会进入doAcquireSharedInterruptibly方法,将当前线程构造成一个node然后进入AQS的队列进行等待。简单的说就是获取失败,则线程阻塞等待。

release

public void release() { sync.releaseShared(1); }

release方法中同样的,直接调用了AQS的releaseShared方法

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
}

在AQS的releaseShared方法中,首先调用了模板方法tryReleaseShared来对sttaus进行加操作,如果成功,调用doReleaseShared方法做锁释放的后续操作,例如唤醒后续节点等。

tryReleaseShared方法的实现也是在Semaphore的内部类Sync中

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
}

tryReleaseShared方法中,首先将当前的凭证数加上释放的数,然后通过CAS去设置。设置成功则返回true,否则将一直等待直到成功。成功后,如上文所说,会调用doReleaseShared方法进行一些后续的操作。

drainPermits

这个方法的作用是让当前线程获取剩余的所有许可证。实现也很简单,通过CAS将status设置成0

final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
}

tryAcquire

tryAcquire与acquire方法相对应,是acquire的非阻塞实现,调用的方法和acquire一样,唯一不同的是,调用失败以后不会将当前线程加入到队列中等待,而是直接返回

public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
}

相关文章

网友评论

      本文标题:JUC学习笔记之Semaphore

      本文链接:https://www.haomeiwen.com/subject/bhpmectx.html