美文网首页
通过BlockingQueue实现生产者和消费者功能

通过BlockingQueue实现生产者和消费者功能

作者: 月下饿狼 | 来源:发表于2020-07-08 16:48 被阅读0次

1.使用volatile关键字

2.使用BlockingQueue接口,可赋值对应的7种实现类

3.使用原子类AtomicInteger,在线程中千万别用i++,i-- 会埋雷 用原子类的 incrementAndGet() 或者decrementAndGet() 方法

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 通过 BlockingQueue 实现
 * 生产者消费者案例
 */
public class ProdConsumer_BlockQueueDemo {

    public static void main(String[] args) throws Exception{
        ShareResource shareResource = new ShareResource(new ArrayBlockingQueue(5));
        //生产线程
        new Thread(()->{
            try {
                shareResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"AA").start();
        //消费线程
        new Thread(()->{
            try {
                shareResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"BB").start();

        TimeUnit.SECONDS.sleep(5);

        shareResource.stop();
    }
}

/**
 * 资源类
 */
class ShareResource{
    /**
     * 默认开启,false关闭   volatile其他线程可见性
     */
    private volatile boolean FlAG = true;
    /**
     * 给个接口类,可以赋值任意的实现类   成为高手并经之路
     */
    private BlockingQueue<String> blockingQueue = null;
    /**
     * 原子类  代替 i++
     */
    private AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 给个接口类,可以赋值任意的实现类   成为高手并经之路   通过构造器赋值
     * @param blockingQueue
     */
    public ShareResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    public void myProd() throws Exception{

        String data = null;
        boolean result;
        while(FlAG){

            data = atomicInteger.incrementAndGet()+"";
            //添加队列  超过2s 就超时失败  返回false
            result = blockingQueue.offer(data,2L, TimeUnit.SECONDS);
            if(result){
                System.out.println(Thread.currentThread().getName()+"\t 添加队列"+data+"成功");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t 添加队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1L);
        }

        System.out.println(Thread.currentThread().getName()+"\t 大老板叫停 不能再生产了");
    }

    public void myConsumer() throws Exception{
        String result = null;
        while(FlAG){
            //消费队列  超过2s 就超时失败  返回false
            result = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if(null == result  || "".equalsIgnoreCase(result)){
                //修改
                FlAG = false;
                System.out.println(Thread.currentThread().getName()+"\t 消费队列失败 消费退出");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t 消费成功"+result);
        }

    }

    public void stop(){
        FlAG = false;
    }
}

结果如下

AA   添加队列1成功
BB   消费成功1
AA   添加队列2成功
BB   消费成功2
AA   添加队列3成功
BB   消费成功3
AA   添加队列4成功
BB   消费成功4
AA   添加队列5成功
BB   消费成功5
AA   大老板叫停 不能再生产了
BB   消费队列失败 消费退出

相关文章

网友评论

      本文标题:通过BlockingQueue实现生产者和消费者功能

      本文链接:https://www.haomeiwen.com/subject/ngpqcktx.html