Java阻塞隊列
Java阻塞隊列
一、阻塞隊列
1. 為什么要用阻塞隊列?
在多線程領域,所謂阻塞,是指在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒。
使用阻塞隊列能夠簡化多線程編程,是實現生產者-消費者模型等常見并發模式的重要工具。它能夠有效地銜接生產者和消費者之間的速度差異,提供一種協調和安全的數據交互方式。
2. 為什么需要 BlockingQueue ?
BlcokingQueue繼承了Queue接口,是隊列的一種,Queue和BlockingQueue都是在Java5中加入的,BlockingQueue是線程安全的,我們在很多場景下都可以利用線程安全的隊列來優雅地解決我們業務自身的線程安全問題。
BlockingQueue代表了一個線程安全的隊列,不僅可以由多個線程并發訪問,還添加了等待/通知機制,以便在隊列為空時阻塞獲取元素的線程,直到隊列變得可用,或者在隊列滿時阻塞插入元素的線程,直到隊列變得可用。因此使用BlockingQueue的好處是我們不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這些 BlockingQueue 都包辦了。
在 juc 包發布以前,多線程環境下,我們每個程序員都必須自己去實現這些細節,尤其還要兼顧效率和線程安全,這會給我們的程序帶來不小的復雜性。現在有了阻塞隊列,我們的操作就從手動擋換成了自動擋。
3. Java中的阻塞隊列
Java 中常用的阻塞隊列主要在 java.util.concurrent 包中提供,這些阻塞隊列用于在生產者-消費者模式或多線程環境中安全地進行數據交換和同步。
BlockingQueue常見的實現類如下圖:

在阻塞隊列中有很多方法,而且都非常相似,這里我把常用的8個方法總結了一下以添加、刪除為主。主要分為三類:
- 拋出異常:add、remove、element
- 返回結果但是不拋出異常:offer、poll、peek
- 阻塞:take、put
阻塞隊列主要方法及其作用:
# 插入元素方法 put(E e):用于向隊尾插入元素,如果隊列已滿,則等待空位。 offer(E e):嘗試向隊尾插入元素,如果隊列已滿,則立即返回 false。 offer(E e, long timeout, TimeUnit unit):在指定時間內嘗試向隊尾插入元素,如果超時且隊列仍滿,則返回 false。 add(E e):立即插入元素,如果隊列已滿,則拋出 IllegalStateException。 # 移除元素方法 take():用于從隊頭移除元素,如果隊列為空,則等待新元素加入。 poll():嘗試從隊頭移除元素,如果隊列為空,則返回 null。 poll(long timeout, TimeUnit unit):在指定時間內嘗試從隊頭移除元素,如果超時且隊列仍為空,則返回 null。 remove(Object o):從隊列中移除指定的元素,成功移除返回 true,否則返回 false。 # 檢查隊列狀態方法 peek():查看隊頭元素但不移除它,如果隊列為空,則返回 null。
element():查看隊頭元素但不移除它,如果隊列為空,則拋出 NoSuchElementException。
二、ArrayBlockingQueue
1. 基本介紹
ArrayBlockingQueue 是一個使用數組實現的有界阻塞隊列。在創建時需要指定容量大小,并支持公平和非公平兩種方式的鎖訪問機制。
ArrayBlockingQueue中的鎖是沒有分離的,即生產和消費用的是同一個鎖。
適合在容量有限、入隊和出隊較為頻繁的場景下使用。
2. 使用場景:生產者-消費者模式
1 public class ArrayBlockingQueueTest { 2 public static void main(String[] args) { 3 // 初始化一個容量為 10 的 ArrayBlockingQueue 4 ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); 5 6 // 生產者線程 7 Thread producer = new Thread(() -> { 8 try { 9 for (int i = 0; i < 50; i++) { 10 queue.put(i); 11 System.out.println("Produced: " + i); 12 } 13 } catch (InterruptedException e) { 14 Thread.currentThread().interrupt(); 15 } 16 }); 17 18 // 消費者線程 19 Thread consumer = new Thread(() -> { 20 try { 21 while (true) { 22 Integer item = queue.take(); 23 System.out.println("Consumed: " + item); 24 } 25 } catch (InterruptedException e) { 26 Thread.currentThread().interrupt(); 27 } 28 }); 29 30 producer.start(); 31 consumer.start(); 32 33 } 34 }
線程池中的任務隊列:
1 public class ArrayBlockingQueueTest { 2 public static void main(String[] args) { 3 // 創建一個容量為 5 的 ArrayBlockingQueue 4 ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5); 5 6 // 創建 ThreadPoolExecutor 7 ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 60L, TimeUnit.SECONDS, taskQueue); 8 9 // 提交任務到線程池 10 for (int i = 0; i < 20; i++) { 11 final int taskId = i; 12 13 try { 14 executor.submit(() -> { 15 System.out.println("Executing task " + taskId); 16 try { 17 // 模擬任務執行 18 System.out.println("Executing Task " + taskId); 19 TimeUnit.SECONDS.sleep(2); // 模擬耗時操作 20 } catch (InterruptedException e) { 21 // 處理中斷異常 22 System.err.println("Task " + taskId + " was interrupted"); 23 Thread.currentThread().interrupt(); // 重新設置中斷狀態 24 } 25 }); 26 } catch (RejectedExecutionException e) { 27 System.out.println("task queue size:" + taskQueue.size()); 28 System.out.println("Task " + taskId + " rejected due to task queue overflow."); 29 } 30 } 31 32 33 // 關閉線程池 34 executor.shutdown(); 35 try { 36 // 等待所有任務完成 37 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { 38 // 超時則強制關閉 39 executor.shutdownNow(); 40 } 41 } catch (InterruptedException e) { 42 executor.shutdownNow(); 43 } 44 45 System.out.println("All tasks completed."); 46 } 47 }
任務數量固定且可預測的情況下,且希望獲得更好的性能,可以考慮使用 ArrayBlockingQueue來實現工作隊列。
3. 實現原理
ArrayBlockingQueue是最典型的有界隊列,其內部是用數組存儲元素的,利用ReentrantLock實現線程安全。部分源碼如下:
1 public ArrayBlockingQueue(int capacity) { 2 this(capacity, false); 3 } 4 5 public ArrayBlockingQueue(int capacity, boolean fair) { 6 if (capacity <= 0) 7 throw new IllegalArgumentException(); 8 this.items = new Object[capacity]; 9 lock = new ReentrantLock(fair); 10 notEmpty = lock.newCondition(); 11 notFull = lock.newCondition(); 12 }
在創建時就需要指定容量,之后就不可以在擴容了。在構造函數中我們同樣可以指定是否是公平的。如果ArrayBlockingQueue被設置為非公平的,那么就存在插隊的可能;如果設置為公平的,那么等待了最長時間的線程會優先被處理,其它線程不允許插隊。
三、LinkedBlockingQueue
1. 基本介紹
使用單向鏈表實現的可選有界/無界阻塞隊列。在創建時可以指定容量大小,如果不指定則默認為Integer.MAX_VALUE(無界隊列)。和ArrayBlockingQueue不同的是, 它僅支持非公平的鎖訪問機制。
LinkedBlockingQueue中的鎖是分離的,即生產用的是putLock,消費是takeLock,這樣可以防止生產者和消費者線程之間的鎖爭奪。因此生產與消費是可以同時進行的。
部分源碼如下:
1 public class LinkedBlockingQueue<E> extends AbstractQueue<E> 2 implements BlockingQueue<E>, java.io.Serializable { 3 4 5 //確保出隊操作的線程安全性 6 private final ReentrantLock takeLock = new ReentrantLock(); 7 8 //用于管理"隊列非空"條件,控制消費者的等待和喚醒 9 private final Condition notEmpty = takeLock.newCondition(); 10 11 //確保入隊操作的線程安全性 12 private final ReentrantLock putLock = new ReentrantLock(); 13 14 //用于管理"隊列未滿"條件,控制生產者的等待和喚醒 15 private final Condition notFull = putLock.newCondition(); 16 }
注意:不指隊列容量大小也是會有風險的,一旦數據生產速度大于消費速度,系統內存將有可能被消耗殆盡,因此要謹慎操作。
2. 典型使用場景:線程池中的任務隊列
線程池在執行任務時會用到 LinkedBlockingQueue 來存儲待處理任務,隊列為空時線程會阻塞等待新任務,隊列滿時新任務將阻塞等待空位。
1 public class LinkedBlockingQueueTest { 2 public static void main(String[] args) { 3 // 創建一個線程池,核心線程數為2,最大線程數為4,隊列容量為10 4 BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(10); 5 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, taskQueue); 6 7 // 提交任務到線程池 8 for (int i = 0; i < 20; i++) { 9 final int taskId = i; 10 try { 11 executor.submit(() -> { 12 System.out.println("Executing task " + taskId); 13 try { 14 Thread.sleep(1000); // 模擬任務執行時間 15 } catch (InterruptedException e) { 16 Thread.currentThread().interrupt(); 17 } 18 }); 19 } catch (RejectedExecutionException e) { 20 System.out.println("task queue size:" + taskQueue.size()); 21 System.out.println("Task " + taskId + " rejected due to task queue overflow."); 22 } 23 } 24 25 executor.shutdown(); // 關閉線程池 26 } 27 }
任務數量動態變化的情況下,且不易預測,可以考慮使用 LinkedBlockingQueue 來實現工作隊列。
四、PriorityBlockingQueue
1. 基本介紹
支持優先級排序的無界阻塞隊列,內部使用數組存儲數據,達到容量時,會自動進行擴容。放入的元素必須實現Comparable接口或者在構造函數中傳入Comparator對象,并且不能插入 null 元素。
2. 典型使用場景:任務調度系統
在任務調度系統中,PriorityBlockingQueue 可用于存儲帶有優先級的任務。優先級越高的任務會優先處理,保證關鍵任務不被延遲。舉個簡單例子:
1 public class PriorityBlockingQueueTest { 2 3 public static void main(String[] args) throws InterruptedException { 4 PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(); 5 6 // 添加不同優先級的任務 7 queue.put(new Task("Low priority task", 5)); 8 queue.put(new Task("Medium priority task", 3)); 9 queue.put(new Task("High priority task", 1)); 10 11 // 處理任務 12 while (!queue.isEmpty()) { 13 // 優先處理優先級高的任務 14 System.out.println("Processing " + queue.take()); 15 } 16 } 17 18 private static class Task implements Comparable<Task> { 19 private final String name; 20 private final int priority; // 數字越小優先級越高 21 22 public Task(String name, int priority) { 23 this.name = name; 24 this.priority = priority; 25 } 26 27 @Override 28 public int compareTo(Task other) { 29 //優先級數字越小的任務優先級越高 30 return Integer.compare(this.priority, other.priority); 31 } 32 33 @Override 34 public String toString() { 35 return "Task{name='" + name + "', priority=" + priority + "}"; 36 } 37 } 38 }
優先級隊列放入元素的時候,會進行排序,所以我們需要指定排序規則,有2種方式:
1)創建PriorityBlockingQueue指定比較器Comparator
2)放入的元素需要實現Comparable接口
上面2種方式必須選一個,如果2個都有,則走第一個規則排序。
五、DelayQueue
1. 基本介紹
Delay這個隊列比較特殊,具有延遲的功能,我們可以設定在隊列中的任務延遲多久之后執行。它是無界隊列,但是放入的元素必須實現Delayed接口,而Delayed接口又繼承了Comparable接口,所以自然就擁有了比較和排序的能力。
2. 典型使用場景:定時任務調度
假設我們需要實現一個簡單的定時任務調度系統,任務將在指定的延遲后執行。下面是一個使用 DelayQueue 的示例:
1 public class DelayQueueTest { 2 public static void main(String[] args) { 3 DelayQueue<DelayedTask> queue = new DelayQueue<>(); 4 5 // 添加任務,延遲5秒和10秒執行 6 queue.put(new DelayedTask("Task 1", 5, TimeUnit.SECONDS)); 7 queue.put(new DelayedTask("Task 2", 10, TimeUnit.SECONDS)); 8 9 10 // 處理任務 11 try { 12 while (!queue.isEmpty()) { 13 // 阻塞,直到有任務可以處理 14 DelayedTask task = queue.take(); 15 System.out.println("Executing: " + task.getTaskName()); 16 } 17 } catch (InterruptedException e) { 18 Thread.currentThread().interrupt(); // 重新設置線程的中斷狀態 19 System.out.println("Task execution was interrupted"); 20 } 21 } 22 23 static class DelayedTask implements Delayed { 24 private final String taskName; 25 private final long delayTime; 26 private final long triggerTime; 27 28 public DelayedTask(String taskName, long delay, TimeUnit unit) { 29 this.taskName = taskName; 30 this.delayTime = delay; 31 this.triggerTime = System.currentTimeMillis() + unit.toMillis(delay); 32 } 33 34 @Override 35 public long getDelay(TimeUnit unit) { 36 long diff = triggerTime - System.currentTimeMillis(); 37 return unit.convert(diff, TimeUnit.MILLISECONDS); 38 } 39 40 @Override 41 public int compareTo(Delayed o) { 42 if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { 43 return -1; 44 } 45 if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { 46 return 1; 47 } 48 return 0; 49 } 50 51 public String getTaskName() { 52 return taskName; 53 } 54 } 55 }
3. 實現原理
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
可以看出這個Delayed接口繼承自Comparable,里面需要實現getDelay方法。這里的getDelay()返回的是還剩下多長的延遲時間才會被執行。如果返回0或者負數則代表任務已過期。元素會根據延遲時間的長短被放到隊列的不同位置,越靠近隊列頭代表越早過期。
六、SynchronousQueue
一種不存儲元素的阻塞隊列,每次插入操作必須等待另一個線程的移除操作,反之亦然。
七、LinkedBlockingDeque
LinkedBlockingDeque 是雙端隊列,允許在隊列兩端插入和移除元素。適合需要從兩端操作的場景,如任務管理中從隊頭和隊尾分別處理任務。
特點:雙端,線程安全,有界或無界,適合雙向處理需求的場景。
八、LinkedTransferQueue
基于鏈表的無界阻塞隊列,允許生產者將元素直接傳輸給消費者。提供 transfer 方法,如果當前沒有消費者,會阻塞直到有消費者消費。
九、總結
在任務數量較大、并發度較高的場景下,LinkedBlockingQueue 往往會比 ArrayBlockingQueue 更具優勢,因為它的鎖機制允許更高效的并行處理。而在任務數量可控且對內存占用有較高要求的場景下,ArrayBlockingQueue 由于其數組基礎結構,可能會更為合適。因此,選擇哪種阻塞隊列需要根據具體的應用場景和需求進行評估。
參考鏈接:
https://juejin.cn/post/7149883540165885966

浙公網安備 33010602011771號