OkHttp3源碼詳解(六) Okhttp任務隊列工作原理
1 概述
1.1 引言
android完成非阻塞式的異步請求的時候都是通過啟動子線程的方式來解決,子線程執行完任務的之后通過handler的方式來和主線程來完成通信。無限制的創建線程,會給系統帶來大量的開銷。如果在高并發的任務下,啟用個線程池,可以不斷的復用里面不再使用和有效的管理線程的調度和數量的管理。就可以節省系統的成本,有效的提高執行效率。
1.2 線程池ThreadPoolExecutor
okhttp的線程池對象存在于Dispatcher類中。實例過程如下
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
1.2 Call對象
了解源碼或使用過okhttp的都知道。 okttp的操作元是Call對象。異步的實現是RealCall.AsyncCall。而 AsyncCall是實現的一個Runnable接口。
final class AsyncCall extends NamedRunnable {}
所以Call本質就是一個Runable線程操作元肯定是放進excutorService中直接啟動的。
2 線程池的復用和管理
2.1 圖解
為了完成調度和復用,定義了兩個隊列分別用作等待隊列和執行任務的隊列。這兩個隊列都是Dispatcher 成員變量。Dispatcher是一個控制執行,控制所有Call的分發和任務的調度、通信、清理等操作。這里只介紹異步調度任務。
/** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
在 《okhttp連接池復用機制》 文章中我們在緩存Connection連接的時候也是使用的Deque雙端隊列。這里同樣的方式,可以方便在隊列頭添加元素,移除尾部的元素。

2.2 過程分析
Call代用equeue方法的時候
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
方法中滿足執行隊列里面不足最大線程數maxRequests并且Call對應的host數目不超過maxRequestsPerHost 的時候直接把call對象直接推入到執行隊列里,并啟動線程任務(Call本質是一個Runnable)。否則,當前線程數過多,就把他推入到等待隊列中。Call執行完肯定需要在runningAsyncCalls 隊列中移除這個線程。那么readyAsyncCalls隊列中的線程在什么時候才會被執行呢。
追溯下AsyncCall 線程的執行方法
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(forWebSocket); if (canceled) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } }
這里做了核心request的動作,并把失敗和回復數據的結果通過responseCallback 回調到Dispatcher。執行操作完畢了之后不管有無異常都會進入到dispactcher的finished方法。
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } }
在這里call在runningAsyncCalls隊列中被移除了,重新計算了目前正在執行的線程數量。并且調用了promoteCalls() 看來是來調整任務隊列的,跟進去看下
private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
原來實在這里對readyAsyncCalls 進行調度的。最終會在readyAsyncCalls 中通過remove操作把元素迭代取出并移除之后加入到runningAsyncCalls的執行隊列中執行操作。ArrayDeque 是非線程安全的所以finished在調用promoteCalls 的時候都在synchronized塊中執行的。執行等待隊列線程當然的前提是runningAsyncCalls 線程數沒有超上線,而且等待隊列里面有等待的任務。
以上完成了線程線程池的復用和線程的管理工作。
小結,Call在執行任務通過Dispatcher把單元任務優先推到執行隊列里進行操作,如果操作完成再執行等待隊列的任務。
浙公網安備 33010602011771號