Java的優先級任務隊列的實踐
隊列的基本理解
在說隊列之前說兩個名詞:Task是任務,TaskExecutor是任務執行器
而我們今天要說的隊列就完全符合某機構這個情況,隊列在有Task進來的時候TaskExecutor就立刻開始執行Task,當沒有Task的時候TaskExecutor就處于一個阻塞狀態,當有很多Task的時候Task也需要排隊,TaskExecutor也可以是多個,并且可以指定某幾個Task優先執行或者滯后執行。
綜上所說我們得出一個這樣的關系:隊列相當于某機構,TaskExecutor相當于窗口,辦事者就是Task。
普通隊列
當然很多機構也沒有設置什么軍人優先的窗口,所以隊列也有不帶優先級的隊列,因此我們先來實現一個非優先級的隊列。
和上述某機構不一樣,某機構可以先有機構,再有窗口,再有辦事者。但是我們寫代碼的時候,要想寫一個隊列,那么務必要在隊列中寫TaskExecutor,那么就得先寫好TaskExecutor類,以此類推就得先有Task類。
因此我們先寫一個Task的接口,也就是辦事的人,我把它設計為接口,方便辦各種不同事的人進來:
// 辦事的人。
public interface ITask {
// 辦事,我們把辦事的方法給辦事的人,也就是你要辦什么事,由你自己決定。
void run();
}
接下來再寫一個TaskExecutor的類,也就是窗口,用來執行Task,認真看注釋,非常有助于理解:
// 窗口
public class TaskExecutor extends Thread {
// 在窗口拍的隊,這個隊里面是辦事的人。
private BlockingQueue<ITask> taskQueue;
// 這個辦事窗口是否在等待著辦事。
private boolean isRunning = true;
public TaskExecutor(BlockingQueue<ITask> taskQueue) {
this.taskQueue = taskQueue;
}
// 下班。
public void quit() {
isRunning = false;
interrupt();
}
@Override
public void run() {
while (isRunning) { // 如果是上班狀態就待著。
ITask iTask;
try {
iTask = taskQueue.take(); // 叫下一個辦事的人進來,沒有人就等著。
} catch (InterruptedException e) {
if (!isRunning) {
// 發生意外了,是下班狀態的話就把窗口關閉。
interrupt();
break; // 如果執行到break,后面的代碼就無效了。
}
// 發生意外了,不是下班狀態,那么窗口繼續等待。
continue;
}
// 為這個辦事的人辦事。
iTask.run();
}
}
}
這里要稍微解釋下BlockingQueue<T>#take()方法,這個方法當隊列里面的item為空的時候,它會一直處于阻塞狀態,當隊列中進入item的時候它會立刻有一個返回值,它就和ServerSocket.accept()方法一樣,所以我們把它放入一個Thread中,以免阻塞調用它的線程(Android中可能是主線程)。
辦事的人和窗口都有了,下面我們封裝一個隊列,也就是某機構,用來管理這些窗口:
// 某機構。
public class TaskQueue {
// 某機構排的隊,隊里面是辦事的人。
private BlockingQueue<ITask> mTaskQueue;
// 好多窗口。
private TaskExecutor[] mTaskExecutors;
// 在開發者new隊列的時候,要指定窗口數量。
public TaskQueue(int size) {
mTaskQueue = new LinkedBlockingQueue<>();
mTaskExecutors = new TaskExecutor[size];
}
// 開始上班。
public void start() {
stop();
// 把各個窗口都打開,讓窗口開始上班。
for (int i = 0; i < mTaskExecutors.length; i++) {
mTaskExecutors[i] = new TaskExecutor(mTaskQueue);
mTaskExecutors[i].start();
}
}
// 統一各個窗口下班。
public void stop() {
if (mTaskExecutors != null)
for (TaskExecutor taskExecutor : mTaskExecutors) {
if (taskExecutor != null) taskExecutor.quit();
}
}
// 開一個門,讓辦事的人能進來。
public <T extends ITask> int add(T task) {
if (!mTaskQueue.contains(task)) {
mTaskQueue.add(task);
}
// 返回排的隊的人數,公開透明,讓外面的人看的有多少人在等著辦事。
return mTaskQueue.size();
}
}
某機構、窗口、辦事的人都有了,下面我們就派一個人去一件具體的事,但是上面我的Task是一個接口,所以我們需要用一個類來實現這個接口,來做某一件事:
// 做一件打印自己的id的事。
public class PrintTask implements ITask {
private int id;
public PrintTask(int id) {
this.id = id;
}
@Override
public void run() {
// 為了盡量模擬窗口辦事的速度,我們這里停頓兩秒。
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {
}
System.out.println("我的id是:" + id);
}
}
下面就讓我們模擬的虛擬世界運行一次:
public class Main {
public static void main(String... args) {
// 這里暫時只開一個窗口。
TaskQueue taskQueue = new TaskQueue(1);
taskQueue.start();
for (int i = 0; i < 10; i++) {
PrintTask task = new PrintTask(i);
taskQueue.add(task);
}
}
}
沒錯,隊列按照我們理想的狀況打印出來了:
我的id是:0 我的id是:1 我的id是:2 我的id是:3 我的id是:4 我的id是:5 我的id是:6 我的id是:7 我的id是:8 我的id是:9
上面我門只開了一個窗口,下面我多開幾個窗口:
public class Main {
public static void main(String... args) {
// 開三個窗口。
TaskQueue taskQueue = new TaskQueue(3);
taskQueue.start(); // 某機構開始工作。
for (int i = 0; i < 10; i++) {
// new 10 個需要辦事的人,并且進入某機構辦事。
PrintTask task = new PrintTask(i);
taskQueue.add(task);
}
}
}
這里要說明一下,在初始化的時候我們開了3個窗口,內部的順序應該是這樣的:
當某機構的大門開了以后,第一個辦事的人進去到了第一個窗口,第二個辦事的人進去到了第二個窗口,第三個辦事的人進去到了第三個窗口,第四個辦事的人進去排隊在第一位,當第一、第二、第三個窗口中不論哪一個窗口的事辦完了,第四個人就去哪一個窗口繼續辦事,第五個人等待,一次類推。這樣子就達到了隊列同事并發三個任務的效果。
這就是一個普通的隊列,其它的一些特性也是基于此再次封裝的,那么下面我就基于此再把人物的優先級加上,也就是我們上面說的特殊窗口->軍人優先!
優先級隊列
我們排隊等待辦事的時候,來了一個辦事的人,那么如何判斷這個辦事人是否可以優先辦理呢?那就要判斷它是否具有優先的權限甚至他可以優先到什么程度。
所以我們需要讓這個Task有一標志,那就是優先級,所以我用一個枚舉類標記優先級:
public enum Priority {
LOW, // 最低。
DEFAULT, // 默認級別。
HIGH, // 高于默認級別。
Immediately // 立刻執行。
}
這里我把分了四個等級:最低、默認、高、立刻,這個等級肯定要給到我們的辦事的人,也就是Task
public interface ITask {
void run();
void setPriority(Priority priority);
Priority getPriority();
}
可以設置優先級和可以拿到優先級。
下面我們要把上面的LinkedBlockingQueue替換成PriorityBlockingQueue<E>,因為它可以自動做到優先級的比較,它要求泛型<E>,也就是我們的Task必須實現Comparable<E>接口,而Comparable<E>有一個compareTo(E)方法可以對兩個<T>做比較,因此我們的隊列需要改一下實現的方法:
// 某機構。
public class TaskQueue {
// 某機構排的隊,隊里面是辦事的人。
private BlockingQueue<ITask> mTaskQueue;
// 好多窗口。
private TaskExecutor[] mTaskExecutors;
// 在開發者new隊列的時候,要指定窗口數量。
public TaskQueue(int size) {
mTaskQueue = new PriorityBlockingQueue<>();
mTaskExecutors = new TaskExecutor[size];
}
...
然后ITask接口繼承Comparable<E>接口:
public interface ITask extends Comparable<ITask> {
void run();
void setPriority(Priority priority);
Priority getPriority();
}
因為有setPriority(Priority)方法和getPriority()方法和Comparable<E>的compareTo(E)方法,所以我們的每一個Task都需要實現這幾個方法,這樣就會很麻煩,所以我們封裝一個BasicTask:
public abstract class BasicTask implements ITask {
// 默認優先級。
private Priority priority = Priority.DEFAULT;
@Override
public void setPriority(Priority priority) {
this.priority = priority;
}
@Override
public Priority getPriority() {
return priority;
}
// 做優先級比較。
@Override
public int compareTo(ITask another) {
final Priority me = this.getPriority();
final Priority it = another.getPriority();
return me == it ? [...] : it.ordinal() - me.ordinal();
}
}
其它都好說,我們看到compareTo(E)方法就不太理解了,這里說一下這個方法:
compareTo(E)中傳進來的E是另一個Task,如果當前Task比另一個Task更靠前就返回負數,如果比另一個Task靠后,那就返回正數,如果優先級相等,那就返回0。
這里要特別注意,我們看到上面當兩個Task優先級不一樣的時候調用了Priority.orinal()方法,并有后面的orinal減去了當前的orinal,怎么理解呢?首先要理解Priority.orinal()方法,在Java中每一個枚舉值都有這個方法,這個枚舉的值是它的下標+1,也就是[index + 1],所以我們寫的Priority類其實可以這樣理解:
public enum Priority {
1,
2,
3,
4
}
繼續,如果給當前Task比較低,給compareTo(E)中的Task設置的優先級別比較高,那么Priority不一樣,那么返回的值就是整數,因此當前Task就會被PriorityBlockingQueue<E>排到后面,如果調換那么返回結果也就調換了。
但是我們注意到me == it ? [...] : it.ordinal() - me.ordinal();中的[...]是什么鬼啊?因為這里缺一段代碼呀哈哈哈(這個作者怎么傻乎乎的),這一段代碼的意思是當優先級別一樣的時候怎么辦,那就是誰先加入隊列誰排到前面唄,那么怎樣返回值呢,我們怎么知道哪個Task先加入隊列呢?這個時候可愛的我就出現了,我給它給一個序列標記它什么時候加入隊列的不久完事了,于是我們可以修改下ITask接口,增加兩個方法:
public interface ITask extends Comparable<ITask> {
void run();
void setPriority(Priority priority);
Priority getPriority();
void setSequence(int sequence);
int getSequence();
}
我們用setSequence(int)標記它加入隊列的順序,然后因為setSequence(int)和getSequence()是所有Task都需要實現的,所以我們在BasicTask中實現這兩個方法:
public abstract class BasicTask implements ITask {
// 默認優先級。
private Priority priority = Priority.DEFAULT;
private int sequence;
@Override
public void setPriority(Priority priority) {
this.priority = priority;
}
@Override
public Priority getPriority() {
return priority;
}
@Override
public void setSequence(int sequence) {
this.sequence = sequence;
}
@Override
public int getSequence() {
return sequence;
}
// 做優先級比較。
@Override
public int compareTo(ITask another) {
final Priority me = this.getPriority();
final Priority it = another.getPriority();
return me == it ? this.getSequence() - another.getSequence() :
it.ordinal() - me.ordinal();
}
}
看到了吧,剛才的[...]已經變成了this.getSequence() - another.getSequence(),這里需要和上面的it.ordinal() - me.ordinal();的邏輯對應,上面說到如果給當前Task比較低,給compareTo(E)中的Task設置的優先級別比較高,那么Priority不一樣,那么返回的值就是整數,因此當前Task就會被PriorityBlockingQueue<E>排到后面,如果調換那么返回結果也就調換了。
這里的邏輯和上面對應就是和上面的邏輯相反,因為這里是當兩個優先級一樣時的返回,上面是兩個優先級不一樣時的返回,所以當優先級別一樣時,返回負數表示當前Task在前,返回正數表示當前Task在后,正好上面上的邏輯對應。
接下來就是給Task設置序列了,于是我們在TaskQueue中的T void add(T)方法做個手腳:
public class TaskQueue {
private AtomicInteger mAtomicInteger = new AtomicInteger();
...
public TaskQueue(int size) {
...
}
public void start() {
...
}
public void stop() {
...
}
public <T extends ITask> int add(T task) {
if (!mTaskQueue.contains(task)) {
task.setSequence(mAtomicInteger.incrementAndGet()); // 注意這行。
mTaskQueue.add(task);
}
return mTaskQueue.size();
}
}
這里我們使用了AtomicInteger類,它的incrementAndGet()方法會每次遞增1,其實它相當于:
mAtomicInteger.addAndGet(1);
其它具體用法請自行搜索,這里不再贅述。
到此為止,我們的優先級別的隊列就實現完畢了,我們來做下測試:
public static void main(String... args) {
// 開一個窗口,這樣會讓優先級更加明顯。
TaskQueue taskQueue = new TaskQueue(1);
taskQueue.start(); // // 某機構開始工作。
// 為了顯示出優先級效果,我們預添加3個在前面堵著,讓后面的優先級效果更明顯。
taskQueue.add(new PrintTask(110));
taskQueue.add(new PrintTask(112));
taskQueue.add(new PrintTask(122));
for (int i = 0; i < 10; i++) { // 從第0個人開始。
PrintTask task = new PrintTask(i);
if (1 == i) {
task.setPriority(Priority.LOW); // 讓第2個進入的人最后辦事。
} else if (8 == i) {
task.setPriority(Priority.HIGH); // 讓第9個進入的人第二個辦事。
} else if (9 == i) {
task.setPriority(Priority.Immediately); // 讓第10個進入的人第一個辦事。
}
// ... 其它進入的人,按照進入順序辦事。
taskQueue.add(task);
}
沒錯這就是我們看到的效果:
我的id是:9 我的id是:8 我的id是:110 我的id是:112 我的id是:122 我的id是:0 我的id是:2 我的id是:3 我的id是:4 我的id是:5 我的id是:6 我的id是:7 我的id是:1
浙公網安備 33010602011771號