美文网首页
线程并发工具之Semaphore

线程并发工具之Semaphore

作者: 传达室马大爷 | 来源:发表于2020-04-11 13:27 被阅读0次

Semaphore

信号量,用于控制同时访问某一特定资源的个数,Semaphore可以用于流量控制

比如去食堂打饭,食堂有10个窗口,那么食堂窗口就是信号量,最大能够满足10个人同时打饭,超过10个人的时候只能等待,当有一个人打饭完成后,空出一个位置,则等待的其中一个人可以进来打饭,其他人的继续等待
构造函数
Semaphore(int permits) :创建具有给定的许可数和非公平的公平设置的 Semaphore
Semaphore(int permits, boolean fair) :创建具有给定的许可数和给定的公平设置的Semaphore
  • fair:false表示非公平信号量,即线程启动的顺序与调用semaphore.acquire() 的顺序无关,也就是线程先启动了并不代表先获得许可。
  • fair:true表示公平信号量,即线程启动的顺序与调用semaphore.acquire() 的顺序有关,也就是先启动的线程优先获得许可。但不代表100%获得信号量,仅仅是在概率上能保证,而非公平信号量就是无关的。
常用方法
acquire() // 获取一个许可证,只有获得了许可证,才可以对资源进行操作,对应一个release()
acquire(int n) // 获取N个许可证,对应N个release()
release() // 释放一个许可证
release(int n) // 释放多个许可证
availablePermits() // 返回Semaphore对象中当前可以用的许可数
drainPermits() // 获取并返回所有的许可个数,并且将可用的许可重置为0
getQueueLength() // 取得等待的许可线程个数
hasQueueThreads() // 判断有没有线程在等待这个许可
tryAcquire() // 尝试获取1个许可。如果获取不到则返回false,通常与if语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使不至于在同步处于一直持续等待的状态。
tryAcquire(long timeout,TimeUnit unit) // 在指定的时间内尝试获取1个许可,如果获取不到则返回false
实例1:模拟学生在食堂取餐
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

import com.shawntime.enjoy.architect.concurrency.SleepUtils;

/**
 *  Semaphore信号量模拟学生去食堂取餐
 */
public class SemaphoreTest {

    public static void main(String[] args) {
        // 定义餐厅有10个窗口可同时取餐
        int windowNum = 10;
        DiningRoom diningRoom = new DiningRoom(windowNum);

        int studentNum = 100;
        CountDownLatch countDownLatch = new CountDownLatch(studentNum);
        long startTime = System.currentTimeMillis();
        // 定义100个学生取餐
        for (int i = 1; i <= studentNum; ++i) {
            Student student = new Student("student-" + i, diningRoom, countDownLatch);
            student.start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(studentNum + "个同学在" + windowNum + "个窗口取餐耗时:" + (System.currentTimeMillis() - startTime));
    }

    private static class Student extends Thread {

        private DiningRoom diningRoom;

        private CountDownLatch countDownLatch;

        public Student(String name, DiningRoom diningRoom, CountDownLatch countDownLatch) {
            super(name);
            this.diningRoom = diningRoom;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                diningRoom.takeFood();
            } finally {
                countDownLatch.countDown();
            }
        }
    }

    private static class DiningRoom {

        private Semaphore semaphore;

        public DiningRoom(int windowNum) {
            this.semaphore = new Semaphore(windowNum, false);
        }

        public void takeFood() {

            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + "进入食堂,等待取餐");
            try {
                semaphore.acquire();
                System.out.println(threadName + "进入窗口,正在取餐");
                int random = ThreadLocalRandom.current().nextInt(10);
                SleepUtils.sleepBySeconds(random);
                System.out.println(threadName + "取餐完毕,释放窗口,耗时:" + random + "秒");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }
}
实现不可重入互斥锁
import java.util.concurrent.Semaphore;

/**
 * semaphore实现互斥锁
 */
public class Mutex {

    private Semaphore semaphore = new Semaphore(1, true);

    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }

    public void release() {
        semaphore.release();
    }

    public boolean attempt(int ms) throws InterruptedException {
        return semaphore.tryAcquire(ms);
    }
}

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;

import com.shawntime.enjoy.architect.concurrency.SleepUtils;

public class MutexTest {

    public static void main(String[] args) {
        Mutex mutex = new Mutex();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        for (int i = 0; i < 10; ++i) {
            MyThread myThread = new MyThread("thread-" + i, mutex, cyclicBarrier);
            myThread.start();
        }

    }

    private static class MyThread extends Thread {

        private Mutex mutex;

        private CyclicBarrier cyclicBarrier;

        public MyThread(String name, Mutex mutex, CyclicBarrier cyclicBarrier) {
            super(name);
            this.mutex = mutex;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "准备就绪...");
                cyclicBarrier.await();
                mutex.acquire();
                System.out.println(Thread.currentThread().getName() + "工作开始...");
                int random = ThreadLocalRandom.current().nextInt(1000);
                SleepUtils.sleepByMilliSeconds(random);
                System.out.println(Thread.currentThread().getName() + "工作结束...");
                mutex.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

输出结果:
thread-1工作开始...
thread-1工作结束...
thread-0工作开始...
thread-0工作结束...
thread-4工作开始...
thread-4工作结束...
thread-3工作开始...
thread-3工作结束...
thread-2工作开始...
thread-2工作结束...

结论:每个线程都是独占式,必须等到每个线程处理完之后其他线程才会机会竞争

相关文章

网友评论

      本文标题:线程并发工具之Semaphore

      本文链接:https://www.haomeiwen.com/subject/drdymhtx.html