什么是生产者和消费者
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

一、wait() 和 notify() 方法实现
缓冲区满和为空时都调用 wait() 方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。
public class WaitAndNotify {
private static final Object LOCK = new Object();
private static Integer count = 0;
private static final Integer FULL = 20;
public static void main(String[] args) {
WaitAndNotify wn = new WaitAndNotify();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Comsumer()).start();
}
class Comsumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < FULL; i++) {
try {
Thread.sleep(1000);
synchronized (LOCK) {
while (count == 0) {
LOCK.wait();
}
count--;
System.out.println(Thread.currentThread().getName() + " -消费者消费,总共" + count);
LOCK.notifyAll();
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < FULL; i++) {
try {
Thread.sleep(1000);
synchronized (LOCK) {
while (count == FULL) {
LOCK.wait();
}
count++;
System.out.println(Thread.currentThread().getName() + " -生产者生成,总共" + count);
LOCK.notifyAll();
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
}
二、ReentrantLock 方法实现
ReentrantLock
类实现了 Lock
,它拥有与 synchronized
相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。
public class LockAndCondition {
private static Integer count = 0;
private static final Integer FULL = 20;
ReentrantLock producerLock = new ReentrantLock();
ReentrantLock comsumerLock = new ReentrantLock();
Condition not_full = producerLock.newCondition();
Condition not_empty = comsumerLock.newCondition();
public static void main(String[] args) {
WaitAndNotify wn = new WaitAndNotify();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Comsumer()).start();
}
class Comsumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < FULL; i++) {
try {
Thread.sleep(1000);
comsumerLock.lockInterruptibly();
while (count == 0) {
not_empty.await();
}
count--;
System.out.println(Thread.currentThread().getName() + " -消费者消费,总共" + count);
not_full.signalAll();
} catch (Exception e1) {
e1.printStackTrace();
} finally {
comsumerLock.unlock();
}
}
}
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < FULL; i++) {
try {
Thread.sleep(1000);
producerLock.lockInterruptibly();
while (count == FULL) {
not_full.await();
}
count++;
System.out.println(Thread.currentThread().getName() + " -生产者生成,总共" + count);
not_empty.signalAll();
} catch (Exception e1) {
e1.printStackTrace();
} finally {
producerLock.unlock();
}
}
}
}
}
三、BlockingQueue 方法实现
阻塞队列实现的生产者消费者模型,这里我们使用 take() 和 put() 方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象。
public class BlockingQueueTest {
private static final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
private static final Integer FULL = 20;
private static Integer count = 0;
public static void main(String[] args) {
BlockingQueueTest wn = new BlockingQueueTest();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Producer()).start();
new Thread(wn.new Comsumer()).start();
new Thread(wn.new Comsumer()).start();
}
class Comsumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < FULL; i++) {
try {
Thread.sleep(1000);
blockingQueue.take();
count--;
System.out.println(Thread.currentThread().getName() + " -消费者消费,总共" + count);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < FULL; i++) {
try {
Thread.sleep(1000);
blockingQueue.put(i);
count++;
System.out.println(Thread.currentThread().getName() + " -生产者生成,总共" + count);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
}
网友评论