美文网首页
JUC-CyclicBarrier

JUC-CyclicBarrier

作者: 别拿爱情当饭吃 | 来源:发表于2018-10-22 17:26 被阅读6次

一.什么是CyclicBarrier?
CyclicBarrier就是循环屏障
二.CyclicBarrier的用途?
n个线程相互等待,直到所有线程到达同一个点后,再同时执行
三.CyclicBarrier的应用场景
一起去做某件事,才能完成某件事
四.Cyclicbarrier的测试代码

import java.util.concurrent.*;

/**
 * @author Aaron
 * @date 2018/10/22 下午3:10
 * @function 测试CyclicBarrier怎样使用
 */
public class TestCyclicBarrier {
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4,10,60, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());

    private static final CyclicBarrier cb = new CyclicBarrier(4, new Runnable() {
        @Override
        //唤醒所有线程后,第一个执行的线程
        public void run() {
            System.out.println("寝室四兄弟准备一起出发去球场");
        }
    });

    private static class GoThread extends Thread{
        private final String name;
        public GoThread(String name){
            this.name = name;
        }

        public void run(){
            System.out.println(name+"开始从宿舍出发");
            try {
                Thread.sleep(1000);
                cb.await();//拦截线程
                System.out.println(name+"从楼底下出发");
                Thread.sleep(1000);
                System.out.println(name+"到达操场");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        test2();
    }

    public static void test1(){
        String[] str = {"小汤","小赵","小高","小吴"};
        for (int i=0;i<4;i++){
            threadPool.execute(new GoThread(str[i]));
        }
        try {
            Thread.sleep(4000);
            System.out.println("四个人一起到达球场,现在开始打球");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void test2(){
        String[] str = {"小汤","小高","小赵","小吴"};
        String[] str1 = {"大汤","大高","大赵","大吴"};
        for (int i=0;i<4;i++){
            threadPool.execute(new GoThread(str[i]));


        }
        try {
            Thread.sleep(4000);
            System.out.println("四个人一起到达球场,现在开始打球");
            System.out.println("现在对CyclicBarrier进行复用。。。。。。");
            System.out.println("又来了一拨人,看看愿不愿意一起打");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //进行复用
        for (int i=0;i<4;i++){
            threadPool.execute(new GoThread(str1[i]));

        }
        try {
            Thread.sleep(4000);
            System.out.println("四个人一起到达球场,表示愿意一起打球,现在八个人开始打球");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

五.CyclicBarrier的源代码解析
1.内部类

public class CyclicBarrier {
    /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which {@code count} applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
     */
这个Generation类只定义了一个布尔变量
    private static class Generation {
        boolean broken = false;
    }

2.CyclicBarrier的核心代码

/**
 * Main barrier code, covering the various policies.
 */
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;   
//获取独占锁
    lock.lock();
    try {
//获取当前的generation
        final Generation g = generation;
//假如当前的generation被损坏了,就抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
//如果当前线程被中断,调用breakBarrier中断CyclicBarrier,并唤醒所有等待的线程
        if (Thread.interrupted()) {
            breakBarrier();//见下一个方法解析
            throw new InterruptedException();
        }
//将计数器减1
        int index = --count;
//当index等于0时,就意味着parties个线程到达了屏障barrier
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();//唤醒所有线程
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
//如果有parties个线程到达屏障后,或者线程被中断,或者超过最大等待时间,那么就会唤醒线程
        for (;;) {
            try {
//判断调用的方法是“非超时等待”还是“超时等待”
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

3.breakBarrier方法

/**
 * Sets current barrier generation as broken and wakes up everyone.
 * Called only while holding lock.
 */
1.终止线程,并且重新设置count的值为parties
2.唤醒所有线程
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

4.nextGeneration方法

/**
 * Updates state on barrier trip and wakes up everyone.
 * Called only while holding lock.
 */
1、唤醒所有线程
2、重新设置count的值为parties
3、生层新的一代
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}
5.await()方法(本质是调用了doawait方法)
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

六、CyclicBarrier和CountDownLatch的区别
1、CyclicBarrier的屏障可以循环使用;但是CountDownLatch不能循环使用
2、CyclicBarrier是n个线程互相等待;CountDownLatch是m(一个或多个)等待n个
3、CyclicBarrirt可以使用CountDownLatch实现,代码如下

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Aaron
 * @date 2018/10/22 下午3:39
 * @function 用CountDownLatch实现CyclicBarrier
 */
public class TestCyclicBarrierWithCDL {
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(4);
    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(4,10,60, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());

    private static class GoHead extends Thread{
        private final String name;
        public GoHead(String name){
            this.name = name;
        }
        public void  run(){
            System.out.println(name+"开始出发");
            COUNT_DOWN_LATCH.countDown();

            try {
                Thread.sleep(1000);
                COUNT_DOWN_LATCH.await();
                System.out.println(name+"从楼底下出发");
                Thread.sleep(1000);
                System.out.println(name+"到达操场");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        String[] str = {"A","B","C","D"};
        for (int i=0;i<4;i++){
            THREAD_POOL_EXECUTOR.execute(new GoHead(str[i]));
        }
        try {
            Thread.sleep(4000);
            System.out.println("四个人一起到达球场,现在开始打球");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

相关文章

  • JUC-CyclicBarrier

    一.什么是CyclicBarrier?CyclicBarrier就是循环屏障二.CyclicBarrier的用途?...

  • JUC-CyclicBarrier

      CyclicBarrier:【可重复使用的栅栏】,让所有线程都等待完成后才会继续下一步行动。示例代码如下所示:...

网友评论

      本文标题:JUC-CyclicBarrier

      本文链接:https://www.haomeiwen.com/subject/drggzftx.html