《程序員修煉之道》-讀書筆記七-線程安全的Queue
Java中有些多線程編程模式在很大程度上都依賴于Queue實現的線程安全性,所以很有必要充分認識它。
隊列經常用來在線程之間傳遞工作單元,這個模式通常適合用Queue最簡單的并發擴展BlockingQueue來實現。
1.BlockingQueue
BlockingQueue還有兩個特性。
- 在向隊列中put()時,如果隊列己滿,它會讓放人線程等待隊列騰出空間。
- 在從隊列中take()時,如果隊列為空,會導致取出線程阻塞。
BlockingQueue定義的常用方法如下:
| 拋出異常 | 特殊值 | 阻塞 | 超時 | |
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 |
2.一個冰激凌店的小例子
顧客類
/** * 顧客類 */ public class Customer { private String name; public Customer(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
冰激凌店類
/** * 冰激凌店以及測試方法 */ public class IceCreamShop extends Thread{ /** * 等待購買冰激凌的隊伍 */ protected final BlockingQueue<Customer> queue; /** * 制作冰激凌的時間 */ protected final int restTime; /** * 故障標記 */ private boolean shutdown = false; public IceCreamShop(BlockingQueue queue, int restTime) { this.queue = queue; this.restTime = restTime; } @Override public void run() { while (!shutdown) { try { Thread.sleep(restTime); Customer customer = queue.take(); System.out.println(customer.getName() + "已取餐-------------"); } catch (InterruptedException e) { shutdown = true; } } } public static void main(String[] args) { // 設置隊列中有三個顧客 BlockingQueue<Customer> customers = new LinkedBlockingQueue<>(); Customer customer1 = new Customer("zhangsan"); Customer customer2 = new Customer("lisi"); Customer customer3 = new Customer("ww"); customers.add(customer1); customers.add(customer2); customers.add(customer3); IceCreamShop iceCreamShop = new IceCreamShop(customers, 1000); iceCreamShop.start(); } }
在IceCreamShop類的run()方法中,顧客需要先等待冰激凌店員制作冰激凌,然后再取餐,當沒有顧客后,店員會一直等待顧客,進入阻塞狀態。
3. TransferQueue — Java7中的新貴
Java7引人了TransferQueue。它本質上是多了一項transfer()操作的BlockingQueue。
LinkedTransferQueue采用的一種預占模式。意思就是消費者線程取元素時,如果隊列為空,那就生成一個節點(節點元素為null)入隊,然后消費者線程被等待在這個節點上,后面生產者線程入隊時發現有一個元素為null的節點,生產者線程就不入隊了,直接就將元素填充到該節點,喚醒該節點等待的線程,被喚醒的消費者線程取走元素,從調用的方法返回。即找到匹配的節點不入隊,找不到根據how參數入隊。(簡單說就是 進一個,出一個)
冰激凌店類
import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicInteger; public abstract class IceCreamShop extends Thread{ /** * 等待購買冰激凌的隊伍 */ protected final TransferQueue<Customer> queue; /** * 制作冰激凌的時間 */ protected final int restTime; /** * 顧客號 */ protected static AtomicInteger number = new AtomicInteger(1); /** * 故障標記 */ private boolean shutdown = false; public IceCreamShop(TransferQueue<Customer> queue, int restTime) { this.queue = queue; this.restTime = restTime; } @Override public void run() { while (!shutdown) { try { Thread.sleep(restTime); } catch (InterruptedException e) { shutdown = true; } doAction(); } } public abstract void doAction(); }
測試方法類
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TransferQueue; public class IceCreamShopMain { public static void main(String[] args) { final TransferQueue<Customer> customers = new LinkedTransferQueue<Customer>(); /** * 店員制作冰激凌 */ IceCreamShop t1 = new IceCreamShop(customers, 100) { @Override public void doAction() { Customer customer = new Customer("顧客:" + number); boolean handed = false; try { handed = customers.tryTransfer(customer, 100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } if (!handed) { System.out.println("這個店員做的很快,趁著顧客取餐的時間,去吃了根薯條"); } else { System.out.println(customer.getName() + "的冰激凌已制作完成---------請取餐"); number.getAndIncrement(); } } }; /** * 顧客取冰激凌 */ IceCreamShop t2 = new IceCreamShop(customers, 100) { @Override public void doAction() { Customer customer = null; try { customer = customers.take(); Thread.sleep(900); System.out.println(customer.getName() + "已取餐----------------"); } catch (InterruptedException e) { return; } } }; t1.start(); t2.start(); } }

浙公網安備 33010602011771號