詳解Java中的ArrayBlockingQueue
ArrayBlockingQueue 是 Java 中 java.util.concurrent 包下的一個(gè)阻塞隊(duì)列實(shí)現(xiàn),底層基于數(shù)組,是線程安全的。它是一個(gè) 有界隊(duì)列,需要在創(chuàng)建時(shí)指定容量大小。此類的主要特性包括:
-
線程安全:
-
使用 獨(dú)占鎖(ReentrantLock) 和 條件變量 來(lái)實(shí)現(xiàn)線程安全的操作。
-
-
先進(jìn)先出(FIFO):
-
元素按插入順序排列,頭部是最先插入的元素,尾部是最后插入的元素。
-
-
-
隊(duì)列容量固定,不能超過(guò)初始化時(shí)指定的大小。
-
核心構(gòu)造方法
public ArrayBlockingQueue(int capacity)
-
創(chuàng)建一個(gè)指定容量的隊(duì)列。
public ArrayBlockingQueue(int capacity, boolean fair)
-
fair: 決定鎖的公平性,true表示公平鎖,false為非公平鎖(性能更高)。
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
-
初始化隊(duì)列,并將給定集合中的所有元素按迭代順序添加到隊(duì)列。
核心字段
private final Object[] items; // 存儲(chǔ)隊(duì)列元素的數(shù)組 private int takeIndex; // 指向隊(duì)列頭部(下一個(gè)被取出的元素) private int putIndex; // 指向隊(duì)列尾部(下一個(gè)插入的位置) private int count; // 隊(duì)列中當(dāng)前的元素個(gè)數(shù) ? private final ReentrantLock lock; // 獨(dú)占鎖,保證線程安全 private final Condition notEmpty; // 條件變量:隊(duì)列不為空的等待條件 private final Condition notFull; // 條件變量:隊(duì)列未滿的等待條件
關(guān)鍵方法解讀
1. offer(E e)
嘗試向隊(duì)列中插入元素(非阻塞方法)。如果隊(duì)列已滿,返回 false。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock(); // 獲取鎖
try {
if (count == items.length) // 隊(duì)列已滿
return false;
enqueue(e); // 插入元素
return true;
} finally {
lock.unlock(); // 釋放鎖
}
}
2. put(E e)
向隊(duì)列中插入元素,如果隊(duì)列已滿,當(dāng)前線程阻塞直到隊(duì)列有空間。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 可中斷獲取鎖
try {
while (count == items.length) // 隊(duì)列已滿,等待 notFull 信號(hào)
notFull.await();
enqueue(e); // 插入元素
} finally {
lock.unlock(); // 釋放鎖
}
}
3. take()
從隊(duì)列中取出一個(gè)元素,如果隊(duì)列為空,當(dāng)前線程阻塞直到有可用元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 可中斷獲取鎖
try {
while (count == 0) // 隊(duì)列為空,等待 notEmpty 信號(hào)
notEmpty.await();
return dequeue(); // 獲取隊(duì)頭元素
} finally {
lock.unlock(); // 釋放鎖
}
}
4. peek()
非阻塞方法,返回隊(duì)列頭部元素但不移除。如果隊(duì)列為空,返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock(); // 獲取鎖
try {
return (count == 0) ? null : itemAt(takeIndex); // 獲取頭部元素
} finally {
lock.unlock(); // 釋放鎖
}
}
底層核心方法
1. enqueue(E e)
向隊(duì)列尾部插入元素。
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; // 在 putIndex 位置插入元素
if (++putIndex == items.length) // 如果到達(dá)數(shù)組尾部,回到起始位置(循環(huán)數(shù)組)
putIndex = 0;
count++; // 更新隊(duì)列元素個(gè)數(shù)
notEmpty.signal(); // 喚醒等待 notEmpty 的線程
}
2. dequeue()
從隊(duì)列頭部取出元素。
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; // 獲取隊(duì)列頭部的元素
items[takeIndex] = null; // 釋放隊(duì)列頭部的引用
if (++takeIndex == items.length) // 如果到達(dá)數(shù)組尾部,回到起始位置
takeIndex = 0;
count--; // 更新隊(duì)列元素個(gè)數(shù)
notFull.signal(); // 喚醒等待 notFull 的線程
return x;
}
線程安全性
-
使用 獨(dú)占鎖 來(lái)控制對(duì)隊(duì)列的并發(fā)訪問(wèn)。
-
條件變量
notEmpty和notFull分別控制取元素和插入元素的等待邏輯。 -
鎖的公平性(可選)決定線程獲取鎖的順序,公平鎖更公平但性能稍差。
應(yīng)用場(chǎng)景
-
生產(chǎn)者-消費(fèi)者模型
:
-
生產(chǎn)者線程向隊(duì)列插入元素,消費(fèi)者線程從隊(duì)列取出元素。
-
-
任務(wù)調(diào)度
:
-
用于線程池中存儲(chǔ)和分發(fā)任務(wù)。
-
-
流量控制
:
-
通過(guò)有界性限制隊(duì)列中的元素?cái)?shù)量,防止內(nèi)存溢出。
-
示例
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
?
// 生產(chǎn)者線程
new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
?
// 消費(fèi)者線程
new Thread(() -> {
try {
while (true) {
Integer value = queue.take();
System.out.println("Consumed: " + value);
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

浙公網(wǎng)安備 33010602011771號(hào)