Java并發(fā)編程(十)阻塞隊(duì)列
使用非阻塞隊(duì)列的時(shí)候有一個(gè)很大問題就是:它不會(huì)對(duì)當(dāng)前線程產(chǎn)生阻塞,那么在面對(duì)類似消費(fèi)者-生產(chǎn)者的模型時(shí),就必須額外地實(shí)現(xiàn)同步策略以及線程間喚醒策略,這個(gè)實(shí)現(xiàn)起來就非常麻煩。但是有了阻塞隊(duì)列就不一樣了,它會(huì)對(duì)當(dāng)前線程產(chǎn)生阻塞,比如一個(gè)線程從一個(gè)空的阻塞隊(duì)列中取元素,此時(shí)線程會(huì)被阻塞直到阻塞隊(duì)列中有了元素。當(dāng)隊(duì)列中有元素后,被阻塞的線程會(huì)自動(dòng)被喚醒(不需要我們編寫代碼去喚醒)。這樣提供了極大的方便性。
本文先講述一下java.util.concurrent包下提供主要的幾種阻塞隊(duì)列,然后分析了阻塞隊(duì)列和非阻塞隊(duì)列的中的各個(gè)方法,接著分析了阻塞隊(duì)列的實(shí)現(xiàn)原理,最后給出了一個(gè)實(shí)際例子和幾個(gè)使用場(chǎng)景。
一.幾種主要的阻塞隊(duì)列
自從Java 1.5之后,在java.util.concurrent包下提供了若干個(gè)阻塞隊(duì)列,主要有以下幾個(gè): ArrayBlockingQueue:基于數(shù)組實(shí)現(xiàn)的一個(gè)阻塞隊(duì)列,在創(chuàng)建ArrayBlockingQueue對(duì)象時(shí)必須制定容量大小。并且可以指定公平性與非公平性,默認(rèn)情況下為非公平的,即不保證等待時(shí)間最長的隊(duì)列最優(yōu)先能夠訪問隊(duì)列。 LinkedBlockingQueue:基于鏈表實(shí)現(xiàn)的一個(gè)阻塞隊(duì)列,在創(chuàng)建LinkedBlockingQueue對(duì)象時(shí)如果不指定容量大小,則默認(rèn)大小為Integer.MAX_VALUE。 PriorityBlockingQueue:以上2種隊(duì)列都是先進(jìn)先出隊(duì)列,而PriorityBlockingQueue卻不是,它會(huì)按照元素的優(yōu)先級(jí)對(duì)元素進(jìn)行排序,按照優(yōu)先級(jí)順序出隊(duì),每次出隊(duì)的元素都是優(yōu)先級(jí)最高的元素。注意,此阻塞隊(duì)列為無界阻塞隊(duì)列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號(hào)標(biāo)志),前面2種都是有界隊(duì)列。 DelayQueue:基于PriorityQueue,一種延時(shí)阻塞隊(duì)列,DelayQueue中的元素只有當(dāng)其指定的延遲時(shí)間到了,才能夠從隊(duì)列中獲取到該元素。DelayQueue也是一個(gè)無界隊(duì)列,因此往隊(duì)列中插入數(shù)據(jù)的操作(生產(chǎn)者)永遠(yuǎn)不會(huì)被阻塞,而只有獲取數(shù)據(jù)的操作(消費(fèi)者)才會(huì)被阻塞。
二.阻塞隊(duì)列中的方法 VS 非阻塞隊(duì)列中的方法
1.非阻塞隊(duì)列中的幾個(gè)主要方法:
add(E e):將元素e插入到隊(duì)列末尾,如果插入成功,則返回true;如果插入失敗(即隊(duì)列已滿),則會(huì)拋出異常;
remove():移除隊(duì)首元素,若移除成功,則返回true;如果移除失敗(隊(duì)列為空),則會(huì)拋出異常;
offer(E e):將元素e插入到隊(duì)列末尾,如果插入成功,則返回true;如果插入失敗(即隊(duì)列已滿),則返回false;
poll():移除并獲取隊(duì)首元素,若成功,則返回隊(duì)首元素;否則返回null;
peek():獲取隊(duì)首元素,若成功,則返回隊(duì)首元素;否則返回null
對(duì)于非阻塞隊(duì)列,一般情況下建議使用offer、poll和peek三個(gè)方法,不建議使用add和remove方法。因?yàn)槭褂胦ffer、poll和peek三個(gè)方法可以通過返回值判斷操作成功與否,而使用add和remove方法卻不能達(dá)到這樣的效果。注意,非阻塞隊(duì)列中的方法都沒有進(jìn)行同步措施。
2.阻塞隊(duì)列中的幾個(gè)主要方法:
阻塞隊(duì)列包括了非阻塞隊(duì)列中的大部分方法,上面列舉的5個(gè)方法在阻塞隊(duì)列中都存在,但是要注意這5個(gè)方法在阻塞隊(duì)列中都進(jìn)行了同步措施。除此之外,阻塞隊(duì)列提供了另外4個(gè)非常有用的方法: put(E e) take() offer(E e,long timeout, TimeUnit unit) poll(long timeout, TimeUnit unit) put方法用來向隊(duì)尾存入元素,如果隊(duì)列滿,則等待; take方法用來從隊(duì)首取元素,如果隊(duì)列為空,則等待; offer方法用來向隊(duì)尾存入元素,如果隊(duì)列滿,則等待一定的時(shí)間,當(dāng)時(shí)間期限達(dá)到時(shí),如果還沒有插入成功,則返回false;否則返回true; poll方法用來從隊(duì)首取元素,如果隊(duì)列空,則等待一定的時(shí)間,當(dāng)時(shí)間期限達(dá)到時(shí),如果取到,則返回null;否則返回取得的元素;
三.阻塞隊(duì)列的實(shí)現(xiàn)原理
以ArrayBlockingQueue為例
首先看一下ArrayBlockingQueue類中的幾個(gè)成員變量:
1 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 4 private static final long serialVersionUID = -817911632652898426L; 5 6 /** The queued items */ 7 private final E[] items; 8 /** items index for next take, poll or remove */ 9 private int takeIndex; 10 /** items index for next put, offer, or add. */ 11 private int putIndex; 12 /** Number of items in the queue */ 13 private int count; 14 15 /* 16 * Concurrency control uses the classic two-condition algorithm 17 * found in any textbook. 18 */ 19 20 /** Main lock guarding all access */ 21 private final ReentrantLock lock; 22 /** Condition for waiting takes */ 23 private final Condition notEmpty; 24 /** Condition for waiting puts */ 25 private final Condition notFull; 26 }
可以看出,ArrayBlockingQueue中用來存儲(chǔ)元素的實(shí)際上是一個(gè)數(shù)組,takeIndex和putIndex分別表示隊(duì)首元素和隊(duì)尾元素的下標(biāo),count表示隊(duì)列中元素的個(gè)數(shù)。
lock是一個(gè)可重入鎖,notEmpty和notFull是等待條件。
下面看一下ArrayBlockingQueue的構(gòu)造器,構(gòu)造器有三個(gè)重載版本:
public ArrayBlockingQueue(int capacity) { } public ArrayBlockingQueue(int capacity, boolean fair) { } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { }
第一個(gè)構(gòu)造器只有一個(gè)參數(shù)用來指定容量,第二個(gè)構(gòu)造器可以指定容量和公平性,第三個(gè)構(gòu)造器可以指定容量、公平性以及用另外一個(gè)集合進(jìn)行初始化。
然后看它的兩個(gè)關(guān)鍵方法的實(shí)現(xiàn):put()和take():
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
從put方法的實(shí)現(xiàn)可以看出,它先獲取了鎖,并且獲取的是可中斷鎖,然后判斷當(dāng)前元素個(gè)數(shù)是否等于數(shù)組的長度,如果相等,則調(diào)用notFull.await()進(jìn)行等待,如果捕獲到中斷異常,則喚醒線程并拋出異常。
當(dāng)被其他線程喚醒時(shí),通過insert(e)方法插入元素,最后解鎖。
我們看一下insert方法的實(shí)現(xiàn):
private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
它是一個(gè)private方法,插入成功后,通過notEmpty喚醒正在等待取元素的線程。
下面是take()方法的實(shí)現(xiàn):
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
跟put方法實(shí)現(xiàn)很類似,只不過put方法等待的是notFull信號(hào),而take方法等待的是notEmpty信號(hào)。在take方法中,如果可以取元素,則通過extract方法取得元素,下面是extract方法的實(shí)現(xiàn):
private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
跟insert方法也很類似。
其實(shí)從這里大家應(yīng)該明白了阻塞隊(duì)列的實(shí)現(xiàn)原理,事實(shí)它和我們用Object.wait()、Object.notify()和非阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者的思路類似,只不過它把這些工作一起集成到了阻塞隊(duì)列中實(shí)現(xiàn)。
四.示例和使用場(chǎng)景
先使用Object.wait()和Object.notify()、非阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式:
public class Test { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("隊(duì)列空,等待數(shù)據(jù)"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走隊(duì)首元素 queue.notify(); System.out.println("從隊(duì)列取走一個(gè)元素,隊(duì)列剩余"+queue.size()+"個(gè)元素"); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("隊(duì)列滿,等待有空余空間"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一個(gè)元素 queue.notify(); System.out.println("向隊(duì)列取中插入一個(gè)元素,隊(duì)列剩余空間:"+(queueSize-queue.size())); } } } } }
這個(gè)是經(jīng)典的生產(chǎn)者-消費(fèi)者模式,通過阻塞隊(duì)列和Object.wait()和Object.notify()實(shí)現(xiàn),wait()和notify()主要用來實(shí)現(xiàn)線程間通信。
具體的線程間通信方式(wait和notify的使用)在后續(xù)問章中會(huì)講述到。
下面是使用阻塞隊(duì)列實(shí)現(xiàn)的生產(chǎn)者-消費(fèi)者模式:
public class Test { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ try { queue.take(); System.out.println("從隊(duì)列取走一個(gè)元素,隊(duì)列剩余"+queue.size()+"個(gè)元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ try { queue.put(1); System.out.println("向隊(duì)列取中插入一個(gè)元素,隊(duì)列剩余空間:"+(queueSize-queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
有沒有發(fā)現(xiàn),使用阻塞隊(duì)列代碼要簡單得多,不需要再單獨(dú)考慮同步和線程間通信的問題。
在并發(fā)編程中,一般推薦使用阻塞隊(duì)列,這樣實(shí)現(xiàn)可以盡量地避免程序出現(xiàn)意外的錯(cuò)誤。
阻塞隊(duì)列使用最經(jīng)典的場(chǎng)景就是socket客戶端數(shù)據(jù)的讀取和解析,讀取數(shù)據(jù)的線程不斷將數(shù)據(jù)放入隊(duì)列,然后解析線程不斷從隊(duì)列取數(shù)據(jù)解析。還有其他類似的場(chǎng)景,只要符合生產(chǎn)者-消費(fèi)者模型的都可以使用阻塞隊(duì)列。
posted on 2016-12-10 21:54 安卓筆記俠 閱讀(314) 評(píng)論(0) 收藏 舉報(bào)
浙公網(wǎng)安備 33010602011771號(hào)