JAVA深化篇_33——線程并發(fā)協(xié)作(生產(chǎn)者/消費(fèi)者模式)
線程并發(fā)協(xié)作(生產(chǎn)者/消費(fèi)者模式)
多線程環(huán)境下,我們經(jīng)常需要多個線程的并發(fā)和協(xié)作。這個時候,就需要了解一個重要的多線程并發(fā)協(xié)作模型“生產(chǎn)者/消費(fèi)者模式”。
角色介紹
-
什么是生產(chǎn)者?
生產(chǎn)者指的是負(fù)責(zé)生產(chǎn)數(shù)據(jù)的模塊(這里模塊可能是:方法、對象、線程、進(jìn)程)。
-
什么是消費(fèi)者?
消費(fèi)者指的是負(fù)責(zé)處理數(shù)據(jù)的模塊(這里模塊可能是:方法、對象、線程、進(jìn)程)。
-
什么是緩沖區(qū)?
消費(fèi)者不能直接使用生產(chǎn)者的數(shù)據(jù),它們之間有個“緩沖區(qū)”。生產(chǎn)者將生產(chǎn)好的數(shù)據(jù)放入“緩沖區(qū)”,消費(fèi)者從“緩沖區(qū)”拿要處理的數(shù)據(jù)。
緩沖區(qū)是實現(xiàn)并發(fā)的核心,緩沖區(qū)的設(shè)置有兩個好處:
-
實現(xiàn)線程的并發(fā)協(xié)作
有了緩沖區(qū)以后,生產(chǎn)者線程只需要往緩沖區(qū)里面放置數(shù)據(jù),而不需要管消費(fèi)者消費(fèi)的情況;同樣,消費(fèi)者只需要從緩沖區(qū)拿數(shù)據(jù)處理即可,也不需要管生產(chǎn)者生產(chǎn)的情況。 這樣,就從邏輯上實現(xiàn)了“生產(chǎn)者線程”和“消費(fèi)者線程”的分離,解除了生產(chǎn)者與消費(fèi)者之間的耦合。
-
解決忙閑不均,提高效率
生產(chǎn)者生產(chǎn)數(shù)據(jù)慢時,緩沖區(qū)仍有數(shù)據(jù),不影響消費(fèi)者消費(fèi);消費(fèi)者處理數(shù)據(jù)慢時,生產(chǎn)者仍然可以繼續(xù)往緩沖區(qū)里面放置數(shù)據(jù) 。
實現(xiàn)生產(chǎn)者與消費(fèi)者模式
創(chuàng)建緩沖區(qū)
/**
* 定義饅頭類
*/
class ManTou{
private int id;
public ManTou(int id){
this.id = id;
}
public int getId(){
return this.id;
}
}
/**
* 定義緩沖區(qū)類
*/
class SyncStack{
//定義存放饅頭的盒子
private ManTou[] mt = new ManTou[10];
//定義操作盒子的索引
private int index;
/**
* 放饅頭
*/
public synchronized void push(ManTou manTou){
//判斷盒子是否已滿
while(this.index == this.mt.length){
try {
/**
* 語法:wait(),該方法必須要在synchronized塊中調(diào)用。
* wait執(zhí)行后,線程會將持有的對象鎖釋放,并進(jìn)入阻塞狀態(tài),
* 其他需要該對象鎖的線程就可以繼續(xù)運(yùn)行了。
*/
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//喚醒取饅頭的線程
/**
* 語法:該方法必須要在synchronized塊中調(diào)用。
* 該方法會喚醒處于等待狀態(tài)隊列中的一個線程。
*/
this.notify();
this.mt[this.index] = manTou;
this.index++;
}
/**
* 取饅頭
*/
public synchronized ManTou pop(){
while(this.index == 0){
try {
/**
* 語法:wait(),該方法必須要在synchronized塊中調(diào)用。
* wait執(zhí)行后,線程會將持有的對象鎖釋放,并進(jìn)入阻塞狀態(tài),
* 其他需要該對象鎖的線程就可以繼續(xù)運(yùn)行了。
*/
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
this.index--;
return this.mt[this.index];
}
}
public class TestProduceThread {
public static void main(String[] args) {
}
}
創(chuàng)建生產(chǎn)者消費(fèi)者線程
/**
* 定義饅頭類
*/
class ManTou{
private int id;
public ManTou(int id){
this.id = id;
}
public int getId(){
return this.id;
}
}
/**
* 定義緩沖區(qū)類
*/
class SyncStack{
//定義存放饅頭的盒子
private ManTou[] mt = new ManTou[10];
//定義操作盒子的索引
private int index;
/**
* 放饅頭
*/
public synchronized void push(ManTou manTou){
//判斷盒子是否已滿
while(this.index == this.mt.length){
try {
/**
* 語法:wait(),該方法必須要在synchronized塊中調(diào)用。
* wait執(zhí)行后,線程會將持有的對象鎖釋放,并進(jìn)入阻塞狀態(tài),
* 其他需要該對象鎖的線程就可以繼續(xù)運(yùn)行了。
*/
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//喚醒取饅頭的線程
/**
* 語法:該方法必須要在synchronized塊中調(diào)用。
* 該方法會喚醒處于等待狀態(tài)隊列中的一個線程。
*/
this.notify();
this.mt[this.index] = manTou;
this.index++;
}
/**
* 取饅頭
*/
public synchronized ManTou pop(){
while(this.index == 0){
try {
/**
* 語法:wait(),該方法必須要在synchronized塊中調(diào)用。
* wait執(zhí)行后,線程會將持有的對象鎖釋放,并進(jìn)入阻塞狀態(tài),
* 其他需要該對象鎖的線程就可以繼續(xù)運(yùn)行了。
*/
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
this.index--;
return this.mt[this.index];
}
}
/**
* 定義生產(chǎn)者線程類
*/
class ShengChan extends Thread{
private SyncStack ss;
public ShengChan(SyncStack ss){
this.ss = ss;
}
@Override
public void run() {
for(int i=0;i<10;i++){
System.out.println("生產(chǎn)饅頭:"+i);
ManTou manTou = new ManTou(i);
this.ss.push(manTou);
}
}
}
/**
* 定義消費(fèi)者線程類
*/
class XiaoFei extends Thread{
private SyncStack ss;
public XiaoFei(SyncStack ss){
this.ss = ss;
}
@Override
public void run() {
for(int i=0;i<10;i++){
ManTou manTou = this.ss.pop();
System.out.println("消費(fèi)饅頭:"+i);
}
}
}
public class ProduceThread {
public static void main(String[] args) {
SyncStack ss = new SyncStack();
new ShengChan(ss).start();
new XiaoFei(ss).start();
}
}
線程并發(fā)協(xié)作總結(jié)
線程并發(fā)協(xié)作(也叫線程通信)
生產(chǎn)者消費(fèi)者模式:
-
生產(chǎn)者和消費(fèi)者共享同一個資源,并且生產(chǎn)者和消費(fèi)者之間相互依賴,互為條件。
-
對于生產(chǎn)者,沒有生產(chǎn)產(chǎn)品之前,消費(fèi)者要進(jìn)入等待狀態(tài)。而生產(chǎn)了產(chǎn)品之后,又需要馬上通知消費(fèi)者消費(fèi)。
-
對于消費(fèi)者,在消費(fèi)之后,要通知生產(chǎn)者已經(jīng)消費(fèi)結(jié)束,需要繼續(xù)生產(chǎn)新產(chǎn)品以供消費(fèi)。
-
在生產(chǎn)者消費(fèi)者問題中,僅有synchronized是不夠的。synchronized可阻止并發(fā)更新同一個共享資源,實現(xiàn)了同步但是synchronized不能用來實現(xiàn)不同線程之間的消息傳遞(通信)。
-
那線程是通過哪些方法來進(jìn)行消息傳遞(通信)的呢?見如下總結(jié):
方法名 作 用 final void wait() 表示線程一直等待,直到得到其它線程通知 void wait(long timeout) 線程等待指定毫秒?yún)?shù)的時間 final void wait(long timeout,int nanos) 線程等待指定毫秒、微秒的時間 final void notify() 喚醒一個處于等待狀態(tài)的線程 final void notifyAll() 喚醒同一個對象上所有調(diào)用wait()方法的線程,優(yōu)先級別高的線程優(yōu)先運(yùn)行 -
以上方法均是java.lang.Object類的方法;
都只能在同步方法或者同步代碼塊中使用,否則會拋出異常。
建議
在實際開發(fā)中,尤其是“架構(gòu)設(shè)計”中,會大量使用這個模式。 對于初學(xué)者了解即可,如果晉升到中高級開發(fā)人員,這就是必須掌握的內(nèi)容。
浙公網(wǎng)安備 33010602011771號