java阻塞隊列
對消息的處理有些麻煩,要保證各種確認。為了確保消息的100%發送成功,筆者在之前的基礎上做了一些改進。其中要用到多線程,用于重復發送信息。
所以查了很多關于線程安全的東西,也看到了阻塞隊列,發現這個模式很不錯,可惜我目前用不到。
關于這個的講解已經很多了,阻塞這個,就是當隊列中沒有數據的時候,線程讀取的話會等待。當隊列中的數據滿的時候,線程添加數據的時候,也會等待。
有個例子很生動形象,往盤子里面放雞蛋,只能放固定數目的。盤子里面沒有雞蛋,無法從中拿出來。當盤子里滿了,也放不進去。直到被拿出去才能在放。
代碼如下,這里設置的是一個盤子最多放10個雞蛋:
package com.thread.two; import java.util.ArrayList; import java.util.List; public class Plate { List<Object> eggs=new ArrayList<Object>();public synchronized Object getEgg(){while(eggs.size()==0){ try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } Object egg=null; for (int i = 0; i < 10; i++) { egg=eggs.get(i); System.out.println("拿到雞蛋........."); } //Object egg=eggs.get(0); eggs.clear(); notify(); //System.out.println("拿到雞蛋........."); return egg; } public synchronized void putEgg(Object egg){ while(eggs.size()>9){ try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } eggs.add(egg); notify(); System.out.println("放入雞蛋........."); } static class AddThread extends Thread{ private Plate plate; private Object egg=new Object(); public AddThread(Plate plate){ this.plate=plate; } public void run(){ for (int i = 0; i < 1000; i++) { plate.putEgg(egg); } } } static class GetThread extends Thread{ private Plate plate; public GetThread(Plate plate){ this.plate=plate; } public void run(){ for (int i = 0; i < 1000; i++) { plate.getEgg(); } } } public static void main(String[] args) throws InterruptedException { Plate plate=new Plate(); Thread add=new Thread(new AddThread(plate)); Thread get=new Thread(new GetThread(plate)); add.start(); get.start(); add.join(); get.join(); System.out.println("測試結束"); } }
這個例子很形象,用線程實現了上面所說的。
java現在有concurrent包,里面有很多現成的可以用的類,很多是線程安全的,這樣,像上面寫的put或者get,都不需要自己寫同步方法了,這些類已經包裝好了。
這里有一個ArrayBlockingQueue的例子,和上面實現的差不多。
首先是兩個線程,分別是put和get。
ThreadPut:
package com.thread.three; import java.util.concurrent.ArrayBlockingQueue; public class ThreadPut implements Runnable{ private ArrayBlockingQueue<String> abq=null; public ThreadPut(ArrayBlockingQueue<String> abq){ this.abq=abq; } public void run() { // TODO Auto-generated method stub while(true){ System.out.println("要向隊列中存數據了"); try { Thread.sleep(1000); abq.put("hi"); System.out.println("存入后,數據一共為:"+abq.size()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
ThreadGet:
package com.thread.three; import java.util.concurrent.ArrayBlockingQueue; public class ThreadGet extends Thread { ArrayBlockingQueue<String> abq=null; public ThreadGet(ArrayBlockingQueue<String> abq){ this.abq=abq; } @Override public void run() { // TODO Auto-generated method stub while(true){ try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("我要從隊列中取數據了"); String msg=null; if (abq.size()>0) { msg=abq.remove(); } System.out.println("隊列中取得的數據為:"+msg+",隊列中還有一共:"+abq.size()); } } }
測試類:
public class ArrayBlockQueueApp { public static void main(String[] args) { ExecutorService es=Executors.newCachedThreadPool(); ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(10); ThreadGet tGet=new ThreadGet(abq); Thread tPut=new Thread(new ThreadPut(abq)); es.execute(tGet); es.execute(tPut); } }
這些隊列放消息的話挺不錯的。
除非注明轉載,其他文章均為作者原創,可以自由轉載,但請注明轉載的本文的地址,請尊重作者的勞動成果。

浙公網安備 33010602011771號