java并發(fā):阻塞隊(duì)列之DelayQueue
延時(shí)隊(duì)列
DelayQueue是一個(gè)支持延時(shí)獲取元素的使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界的阻塞隊(duì)列。
在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素,只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。
類圖如下:

DelayQueue的定義以及構(gòu)造函數(shù)如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */ private Thread leader; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ private final Condition available = lock.newCondition(); /** * Creates a new {@code DelayQueue} that is initially empty. */ public DelayQueue() {} /** * Creates a {@code DelayQueue} initially containing the elements of the * given collection of {@link Delayed} instances. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
解讀:
DelayQueue 內(nèi)部使用 PriorityQueue 存放數(shù)據(jù),使用 ReentrantLock 實(shí)現(xiàn)線程同步。
重點(diǎn)解釋一下變量 leader:
其使用基于 Leader-Follower模式的變體,用于減少不必要的線程等待。
當(dāng)一個(gè)線程調(diào)用隊(duì)列的 take 方法變?yōu)?leader 線程后,它會(huì)調(diào)用 available. awaitNanos(delay) 等待 delay 時(shí)間;其他線程(follwer線程) 則調(diào)用 available. await()進(jìn)行無(wú)限等待。
leader 線程延遲時(shí)間過(guò)期后會(huì)退出 take 方法,并通過(guò)調(diào)用 available.signal()方法喚醒一個(gè) follwer線程,被喚醒的 follwer線程被選舉為新的 leader線程。
Note:
隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口和Comparable接口,也就是說(shuō)DelayQueue里面的元素必須有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在。
原因:由于每個(gè)元素都有一個(gè)過(guò)期時(shí)間,所以要實(shí)現(xiàn)獲取當(dāng)前元素還剩下多少時(shí)間就過(guò)期了的接口;由于DelayQueue底層使用優(yōu)先級(jí)隊(duì)列來(lái)實(shí)現(xiàn),所以要實(shí)現(xiàn)元素之間相互比較的接口。
Delayed接口的定義如下:
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
解讀:
此接口繼承自Comparable接口。
Comparable接口的定義如下:
public interface Comparable<T> { public int compareTo(T o); }
添加元素
offer方法的代碼如下:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
解讀:
該方法在獲取獨(dú)占鎖之后調(diào)用優(yōu)先級(jí)隊(duì)列的offer方法實(shí)現(xiàn)入隊(duì)
/** * Inserts the specified element into this priority queue. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws ClassCastException if the specified element cannot be * compared with elements currently in this priority queue * according to the priority queue's ordering * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) grow(i + 1); siftUp(i, e); size = i + 1; return true; }
解讀:
如果待插入元素 e 為 null,則拋出 NullPointerException 異常。
由于DelayQueue是無(wú)界隊(duì)列,所以方法一直返回 true。
Note:
前述offer中 q.peek()方法返回的并不一定是當(dāng)前添加的元素。
如果 q.peek()方法返回的是 e,則說(shuō)明當(dāng)前元素 e將是最先過(guò)期的,于是重置 leader線程為 null,進(jìn)而激活 avaliable變量對(duì)應(yīng)的條件隊(duì)列里的一個(gè)線程,告訴它隊(duì)列里面有元素了。
put方法的代碼如下:
/** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @throws NullPointerException {@inheritDoc} */ public void put(E e) { offer(e); }
解讀:
此方法直接調(diào)用offer方法來(lái)實(shí)現(xiàn)。
獲取元素
poll方法的代碼如下:
/** * Retrieves and removes the head of this queue, or returns {@code null} * if this queue has no elements with an expired delay. * * @return the head of this queue, or {@code null} if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : q.poll(); } finally { lock.unlock(); } }
解讀:
如果隊(duì)列里面沒(méi)有過(guò)期元素,則返回null;否則返回隊(duì)首元素。
take方法的代碼如下:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
解讀:
如果隊(duì)列里面沒(méi)有過(guò)期元素,則等待。
示例
可以將延時(shí)隊(duì)列DelayQueue運(yùn)用在以下場(chǎng)景中:
(1)緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素,則表示緩存有效期到了。
(2)定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將要執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行任務(wù),比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
具體示例如下:
(1)Student對(duì)象作為DelayQueue的元素,其必須實(shí)現(xiàn)Delayed接口的兩個(gè)方法
package com.test; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Student implements Delayed {//必須實(shí)現(xiàn)Delayed接口 private String name; private long submitTime;// 交卷時(shí)間 private long workTime;// 考試時(shí)間 public Student(String name, long submitTime) { this.name = name; this.workTime = submitTime; this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime(); System.out.println(this.name + " 交卷,用時(shí)" + workTime); } public String getName() { return this.name + " 交卷,用時(shí)" + workTime; } //必須實(shí)現(xiàn)getDelay方法 public long getDelay(TimeUnit unit) { //返回一個(gè)延遲時(shí)間 return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS); } //必須實(shí)現(xiàn)compareTo方法 public int compareTo(Delayed o) { Student that = (Student) o; return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0); } }
(2)主線程程序
package com.test;
import java.util.concurrent.DelayQueue;
public class DelayQueueTest { public static void main(String[] args) throws Exception { // 新建一個(gè)等待隊(duì)列 final DelayQueue<Student> bq = new DelayQueue<Student>(); for (int i = 0; i < 5; i++) { Student student = new Student("學(xué)生"+i,Math.round((Math.random()*10+i))); bq.put(student); // 將數(shù)據(jù)存到隊(duì)列里! } //獲取但不移除此隊(duì)列的頭部;如果此隊(duì)列為空,則返回 null。 System.out.println(bq.peek().getName()); } }
小結(jié):


浙公網(wǎng)安備 33010602011771號(hào)