【實戰分享】有哪幾種實現生產者消費者模式的方法?
什么是生產者消費者模式
生產者消費者模式其實是一種設計模式,在生活中四處可見,比如我們在排隊買奶茶,奶茶店里面的店員去生產奶茶,然后給消費者消費,在這里,店里面的店員就是一個生產者,顧客就是一個消費者。并且在生產者消費者的概念中,生產者和消費者是一一對應的,也就是說,奶茶店里面的店員生產了一杯奶茶只能供一個顧客,顧客想和第二杯也不行,就需要重新排隊。但是如果當天奶茶店的老板打了雞血給店員們發了獎金說今天要做1000杯奶茶,導致做奶茶太快,老板發現排隊的人消費不了這么多的奶茶,俗稱“產能過剩”,這個時候就需要老板來調度店員先停一停,等店里做好的奶茶先消費完,同時讓店員去店門口吆客。那其實在這里就其實以阻塞隊列的形式形成的生產者消費者模型。

可以看到上面這個圖,生產者去生產奶茶,生產到了1000杯的時候停止生產,此時消費者看到奶茶店有奶茶就會過來消費,當1000杯奶茶都杯消費完了之后,店員就會告訴消費者,讓他稍微等等,消費者也會告訴生產者你們的奶茶不夠1000杯啦可以繼續生產。
相信通過奶茶店賣奶茶的例子大家能夠對生產者消費者模式有一定的了解,總結的來說,生產者消費者模式起到的最重要的作用就是能夠讓生產者生產的東西可以有緩存起來讓消費者慢慢消費。
那么在我們工作當中,如何使用技術來實現這樣的一種生產者消費者模式呢?
實現方式
使用 BlockingQueue 實現生產者消費者模式
public class BlockingQMain {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(()->{
int i = 0;
while (true){
try {
//奶茶店每隔5秒生產一杯奶茶
TimeUnit.SECONDS.sleep(5);
queue.put(++i);
System.out.println("生產者生產了第"+i+"杯奶茶,當前門店還有"+queue.size()+"杯奶茶待消費");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(()->{
while (true){
try {
//消費者隨機排隊,隨機1到9秒出現一個消費者
Random random = new Random();
int seconds = random.nextInt(10)+1;
TimeUnit.SECONDS.sleep(seconds);
System.out.println("過了"+seconds+"秒之后來了一位消費者");
Integer msg = queue.take();
System.out.println("消費者拿到了第"+msg+"杯奶茶");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
先簡單的帶大家了解一下BlockingQueue, BlockingQueue是一個阻塞隊列,其作用就是在隊列滿的時候,生產者會阻塞不再生產,等隊列有空閑位置的時候才去生產。消費等隊列有數據時消費,如果沒有數據則會阻塞等待。而且他也是一種先進先出的隊列
在上面的例子中我們實現了奶茶店的場景,奶茶店會每隔五秒鐘生產一杯奶茶,而出現消費者去排隊會隨機1-9秒出現一個。并起了兩個線程去實現它,從上面的代碼來看,貌似是一個很簡單的實現,但實際上BlockingQueue在里面起到了很重要的作用,如果隊列滿了就去阻塞生產者的線程,隊列有空就去喚醒生產者的線程。
使用 Condition 實現生產者消費者模式
關于Condition 可以先看下這篇文章簡單入門一下
Condition類的介紹與使用
其實使用Condition來實現生產者消費模式原理跟上面BlockingQueue來實現的原理差不多,區別在于BlockingQueue這個阻塞隊列需要我們自己去用Condition來實現他的功能,那么BlockingQueue會有哪些功能是需要我們用到的呢?
- 可以緩存消息
- 保證消息先入先出
- 可以設置隊列最大限制
- 隊列為空時,消費方法阻塞。
- 隊列到達最大值時,生產者阻塞
- 隊列有值時通知消費者消費
- 隊列沒有到達最大值時通知生產者生產者生產
實現代碼如下
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionQueue {
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
private Queue<String> queue;
private int max;
public ConditionQueue(int size) {
max = size;
queue = new LinkedList<String>();
}
public void put(String msg){
try {
lock.lock();
//如果隊列滿了則阻塞生產
while (queue.size() == max){
System.out.println("生產者停止生產了");
producer.await();
System.out.println("生產者開始生產了");
}
queue.add(msg);
//隊列有數據了,喚醒消費
consumer.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try {
//如果隊列空了則阻塞消費
while (queue.isEmpty()) {
System.out.println("消費者停止消費了");
consumer.await();
System.out.println("消費者開始消費了");
}
String msg = queue.remove();
//隊列消費有空位了,喚醒生產者生產
producer.signal();
return msg;
} finally {
lock.unlock();
}
}
public int size(){
return queue.size();
}
public static void main(String[] args) {
ConditionQueue queue = new ConditionQueue(10);
Thread producer = new Thread(()->{
int i = 0;
while (true){
try {
//奶茶店每隔1秒生產一杯奶茶
TimeUnit.SECONDS.sleep(1);
queue.put(String.valueOf(++i));
System.out.println("生產者生產了第"+i+"杯奶茶,當前門店還有"+queue.size()+"杯奶茶待消費");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(()->{
while (true){
try {
//消費者隨機排隊,隨機1到9秒出現一個消費者
Random random = new Random();
int seconds = random.nextInt(10)+1;
TimeUnit.SECONDS.sleep(seconds);
System.out.println("過了"+seconds+"秒之后來了一位消費者");
String msg = queue.take();
System.out.println("消費者拿到了第"+msg+"杯奶茶");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
可能大家看到代碼后會對隊列中while (queue.isEmpty())和while (queue.size() == max)產生疑問,感覺這里可以使用if語句來替換while來使用。其實如果用if來判斷的話,可以適用于單線程的場景下,但是用到多線程的場景下就不適合了,如果有兩個線程來充當消費者,當隊列中沒有數據之后就會發現,需要進行等待,此時可能會發生兩個線程同時會進入到if判斷中,如果第一個消費者消費到了數據,此時隊列又變空了,并且生產者也沒有生產數據。同時第二個線程也被喚醒了也去消費數據,但是隊列中的數據變成了空的,再去取數據就會拋出NoSuchElementException異常。單如果換成while的好處就是,我被喚醒,但是我還是會再去走一遍循環,去判斷隊列中是否還有數據,從而就避免了上面拋異常的這種情況。說白了,while比if更加謹慎,就算你喚醒了我,我還是會再去檢查一遍有沒有問題,如果沒有問題我再走下面的邏輯。
用 wait/notify 實現生產者消費者模式
如果你已經理解了上面Condition的方式去實現生產者消費者模式,那你應該就可以很容易發現,用wait/notify和的方式大同小異,他兩實現的方式其實就是一個兄弟關系。其實這個也說的通Condition的發明就是為了替代wait/notify方式的。我們接下來看使用wait/notify去如何實現。不啰嗦,直接貼代碼
public class WaitQueue {
private LinkedList<String> queue;
private int max;
public WaitQueue(int size) {
max = size;
queue = new LinkedList<String>();
}
public synchronized void put(String msg) throws InterruptedException {
//如果隊列滿了則阻塞生產
while (queue.size() == max){
System.out.println("生產者停止生產了");
wait();
System.out.println("生產者開始生產了");
}
queue.add(msg);
//隊列有數據了,喚醒消費
notifyAll();
}
public synchronized String take() throws InterruptedException {
//如果隊列空了則阻塞消費
while (queue.isEmpty()) {
System.out.println("消費者停止消費了");
wait();
System.out.println("消費者開始消費了");
}
String msg = queue.remove();
//隊列消費有空位了,喚醒生產者生產
notifyAll();
return msg;
}
public int size(){
return queue.size();
}
public static void main(String[] args) {
ConditionQueue queue = new ConditionQueue(10);
Thread producer = new Thread(()->{
int i = 0;
while (true){
try {
//奶茶店每隔5秒生產一杯奶茶
TimeUnit.SECONDS.sleep(1);
queue.put(String.valueOf(++i));
System.out.println("生產者生產了第"+i+"杯奶茶,當前門店還有"+queue.size()+"杯奶茶待消費");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumer = new Thread(()->{
while (true){
try {
//消費者隨機排隊,隨機1到9秒出現一個消費者
Random random = new Random();
int seconds = random.nextInt(10)+1;
TimeUnit.SECONDS.sleep(seconds);
System.out.println("過了"+seconds+"秒之后來了一位消費者");
String msg = queue.take();
System.out.println("消費者拿到了第"+msg+"杯奶茶");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
這里的代碼我就不多解釋了,其實邏輯跟Condition去實現一個阻塞隊列的邏輯一樣,只是換成了用wait和notify的方式。
結束語
如果有看不懂的地方或者有不對的地方,歡迎下方留言評論。一起進步~

浙公網安備 33010602011771號