kotlin協程——>協程上下文與調度器
協程上下?與調度器
協程總是運?在?些以 CoroutineContext 類型為代表的上下?中,它們被定義在了 Kotlin 的標準庫 ?。 協程上下?是各種不同元素的集合。其中主元素是協程中的 Job,我們在前?的?檔中?過它以及它的 調度器,?本?將對它進?介紹。
調度器與線程
協程上下?包含?個 協程調度器(參? CoroutineDispatcher)它確定了哪些線程或與線程相對應的 協程執?。協程調度器可以將協程限制在?個特定的線程執?,或將它分派到?個線程池,亦或是讓它 不受限地運?。 所有的協程構建器諸如 launch 和 async 接收?個可選的 CoroutineContext 參數,它可以被?來顯式 的為?個新協程或其它上下?元素指定?個調度器。
嘗試下?的?例:
launch { // 運?在?協程的上下?中,即 runBlocking 主協程
println("main runBlocking : I'm working in thread
${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限的——將?作在主線程中
println("Unconfined : I'm working in thread
${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 將會獲取默認調度器
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // 將使它獲得?個新的線程
println("newSingleThreadContext: I'm working in thread
${Thread.currentThread().name}")
}
它執?后得到了如下輸出(也許順序會有所不同):
Unconfined : I'm working in thread main Default : I'm working in thread DefaultDispatcher-worker-1 newSingleThreadContext: I'm working in thread MyOwnThread main runBlocking : I'm working in thread main
當調? launch { …… } 時不傳參數,它從啟動了它的 CoroutineScope 中承襲了上下?(以及調度 器)。在這個案例中,它從 main 線程中的 runBlocking 主協程承襲了上下?。 Dispatchers.Unconfined 是?個特殊的調度器且似乎也運?在 main 線程中,但實際上,它是?種不 同的機制,這會在后?中講到。 當協程在 GlobalScope 中啟動時,使?的是由 Dispatchers.Default 代表的默認調度器。默認調度器使 ?共享的后臺線程池。所以 launch(Dispatchers.Default) { …… } 與 GlobalScope.launch { …… } 使?相同的調度器。 newSingleThreadContext 為協程的運?啟動了?個線程。?個專?的線程是?種?常昂貴的資源。 在真實的應?程序中兩者都必須被釋放,當不再需要的時候,使? close 函數,或存儲在?個頂層變量 中使它在整個應?程序中被重?。
?受限調度器 vs 受限調度器
Dispatchers.Unconfined 協程調度器在調?它的線程啟動了?個協程,但它僅僅只是運?到第?個掛 起點。掛起后,它恢復線程中的協程,?這完全由被調?的掛起函數來決定。?受限的調度器?常適?于 執?不消耗 CPU 時間的任務,以及不更新局限于特定線程的任何共享數據(如UI)的協程。 另???,該調度器默認繼承了外部的 CoroutineScope。runBlocking 協程的默認調度器,特別是,當 它被限制在了調?者線程時,繼承?它將會有效地限制協程在該線程運?并且具有可預測的 FIFO 調 度。
launch(Dispatchers.Unconfined) { // ?受限的——將和主線程?起?作
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(500)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
launch { // ?協程的上下?,主 runBlocking 協程
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
執?后的輸出:
Unconfined : I'm working in thread main main runBlocking: I'm working in thread main Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor main runBlocking: After delay in thread main
所以,該協程的上下?繼承? runBlocking {...} 協程并在 main 線程中運?,當 delay 函數調 ?的時候,?受限的那個協程在默認的執?者線程中恢復執?。
?受限的調度器是?種?級機制,可以在某些極端情況下提供幫助?不需要調度協程以便稍后執?或產?不希望的副作?,因為某些操作必須?即在協程中執?。?受限調度器不應該在通常的
代碼中使?。
調試協程與線程
協程可以在?個線程上掛起并在其它線程上恢復。甚??個單線程的調度器也是難以弄清楚協程在何 時何地正在做什么事情。使?通常調試應?程序的?法是讓線程在每?個?志?件的?志聲明中打印 線程的名字。這種特性在?志框架中是普遍受?持的。但是在使?協程時,單獨的線程名稱不會給出很 多協程上下?信息,所以 kotlinx.coroutines 包含了調試?具來讓它更簡單。 使? -Dkotlinx.coroutines.debug JVM 參數運?下?的代碼:
val a = async {
log("I'm computing a piece of the answer")
6
}
val b = async {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
這?有三個協程,包括 runBlocking 內的主協程 (#1),以及計算延期的值的另外兩個協程 a (#2) 和 b (#3)。它們都在 runBlocking 上下?中執?并且被限制在了主線程內。這段代碼的輸出如下:
[main @coroutine#2] I'm computing a piece of the answer [main @coroutine#3] I'm computing another piece of the answer [main @coroutine#1] The answer is 42
這個 log 函數在?括號種打印了線程的名字,并且你可以看到它是 main 線程,并且附帶了當前正在 其上執?的協程的標識符。這個標識符在調試模式開啟時,將連續分配給所有創建的協程。
當 JVM 以 -ea 參數配置運?時,調試模式也會開啟。你可以在 DEBUG_PROPERTY_NAME 屬性 的?檔中閱讀有關調試?具的更多信息。
在不同線程間跳轉
使? -Dkotlinx.coroutines.debug JVM 參數運?下?的代碼
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
它演?了?些新技術。其中?個使? runBlocking 來顯式指定了?個上下?,并且另?個使? withContext 函數來改變協程的上下?,?仍然駐留在相同的協程中,正如可以在下?的輸出中所?到的:
[Ctx1 @coroutine#1] Started in ctx1 [Ctx2 @coroutine#1] Working in ctx2 [Ctx1 @coroutine#1] Back to ctx1
注意,在這個例?中,當我們不再需要某個在 newSingleThreadContext 中創建的線程的時候,它使? 了 Kotlin 標準庫中的 use 函數來釋放該線程。
上下?中的作業
協程的 Job 是上下?的?部分,并且可以使? coroutineContext [Job] 表達式在上下?中檢索 它:
println("My job is ${coroutineContext[Job]}")
在調試模式下,它將輸出如下這些信息:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
請注意,CoroutineScope 中的 isActive 只是 coroutineContext[Job]?.isActive == true 的?種?便的快捷?式。
?協程
當?個協程被其它協程在 CoroutineScope 中啟動的時候,它將通過 CoroutineScope.coroutineContext 來承襲上下?,并且這個新協程的 Job 將會成為?協程作業的?作業。當?個?協程被取消的時候,所有它的?協程也會被遞歸的取消。 然?,當使? GlobalScope 來啟動?個協程時,則新協程的作業沒有?作業。因此它與這個啟動的作?域?關且獨?運作。
// 啟動?個協程來處理某種傳?請求(request)
val request = launch {
// 孵化了兩個?作業, 其中?個通過 GlobalScope 啟動
GlobalScope.launch {
println("job1: I run in GlobalScope and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// 另?個則承襲了?協程的上下?
launch {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
}
delay(500)
request.cancel() // 取消請求(request)的執?
delay(1000) // 延遲?秒鐘來看看發?了什么
println("main: Who has survived request cancellation?")
這段代碼的輸出如下:
job1: I run in GlobalScope and execute independently! job2: I am a child of the request coroutine job1: I am not affected by cancellation of the request main: Who has survived request cancellation?
父協程的職責
?個?協程總是等待所有的?協程執?結束。?協程并不顯式的跟蹤所有?協程的啟動,并且不必使? Job.join 在最后的時候等待它們:
// 啟動?個協程來處理某種傳?請求(request)
val request = launch {
repeat(3) { i -> // 啟動少量的?作業
launch {
delay((i + 1) * 200L) // 延遲 200 毫秒、400 毫秒、600 毫秒的時間
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still
active")
}
request.join() // 等待請求的完成,包括其所有?協程
println("Now processing of the request is complete")
結果如下所?:
request: I'm done and I don't explicitly join my children that are still active Coroutine 0 is done Coroutine 1 is done Coroutine 2 is done Now processing of the request is complete
命名協程以用于調試
當協程經常打印?志并且你只需要關聯來?同?個協程的?志記錄時,則?動分配的 id 是?常好的。 然?,當?個協程與特定請求的處理相關聯時或做?些特定的后臺任務,最好將其明確命名以?于調試 ?的。CoroutineName 上下?元素與線程名具有相同的?的。當調試模式開啟時,它被包含在正在執 ?此協程的線程名中。
log("Started main coroutine")
// 運?兩個后臺值計算
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
程序執?使?了 -Dkotlinx.coroutines.debug JVM 參數,輸出如下所?:
[main @main#1] Started main coroutine [main @v1coroutine#2] Computing v1 [main @v2coroutine#3] Computing v2 [main @main#1] The answer for v1 / v2 = 42
結合上下文中的元素
有時我們需要在協程上下?中定義多個元素。我們可以使? + 操作符來實現。?如說,我們可以顯式指 定?個調度器來啟動協程并且同時顯式指定?個命名:
launch(Dispatchers.Default + CoroutineName("test")) {
println("I'm working in thread ${Thread.currentThread().name}")
}
這段代碼使?了 -Dkotlinx.coroutines.debug JVM 參數,輸出如下所?:
I'm working in thread DefaultDispatcher-worker-1 @test#2
協程作用域
讓我們將關于上下?,?協程以及作業的知識綜合在?起。假設我們的應?程序擁有?個具有?命周期 的對象,但這個對象并不是?個協程。舉例來說,我們編寫了?個 Android 應?程序并在 Android 的 activity 上下?中啟動了?組協程來使?異步操作拉取并更新數據以及執?動畫等等。所有這些協程必 須在這個 activity 銷毀的時候取消以避免內存泄漏。當然,我們也可以?動操作上下?與作業,以結合 activity 的?命周期與它的協程,但是 kotlinx.coroutines 提供了?個封裝:CoroutineScope 的 抽象。你應該已經熟悉了協程作?域,因為所有的協程構建器都聲明為在它之上的擴展。 我們通過創建?個 CoroutineScope 實例來管理協程的?命周期,并使它與 activit 的?命周期相關 聯。CoroutineScope 可以通過 CoroutineScope() 創建或者通過MainScope() ??函數。前者創建 了?個通?作?域,?后者為使? Dispatchers.Main 作為默認調度器的 UI 應?程序 創建作?域:
class Activity {
private val mainScope = MainScope()
fun destroy() {
mainScope.cancel()
}
// 繼續運?……
// 在 Activity 類中
fun doSomething() {
// 在?例中啟動了 10 個協程,且每個都?作了不同的時?
repeat(10) { i ->
mainScope.launch {
delay((i + 1) * 200L) // 延遲 200 毫秒、400 毫秒、600 毫秒等等不同的時間
println("Coroutine $i is done")
}
}
}
} // Activity 類結束
在 main 函數中我們創建 activity,調?測試函數 doSomething ,并且在 500 毫秒后銷毀這個 activity。這取消了從 doSomething 啟動的所有協程。我們可以觀察到這些是由于在銷毀之后,即使 我們再等?會?,activity 也不再打印消息。
val activity = Activity()
activity.doSomething() // 運?測試函數
println("Launched coroutines")
delay(500L) // 延遲半秒鐘
println("Destroying activity!")
activity.destroy() // 取消所有的協程
delay(1000) // 為了在視覺上確認它們沒有?作
這個?例的輸出如下所?:
Launched coroutines Coroutine 0 is done Coroutine 1 is done Destroying activity!
你可以看到,只有前兩個協程打印了消息,?另?個協程在 Activity.destroy() 中單次調?了 job.cancel() 。
線程局部數據
有時,能夠將?些線程局部數據傳遞到協程與協程之間是很?便的。然?,由于它們不受任何特定線程 的約束,如果?動完成,可能會導致出現樣板代碼。
ThreadLocal,asContextElement 擴展函數在這?會充當救兵。它創建了額外的上下?元素,且保 留給定 ThreadLocal 的值,并在每次協程切換其上下?時恢復它。 它很容易在下?的代碼中演?:
threadLocal.set("main")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value:
'${threadLocal.get()}'")
val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
println("Launch start, current thread: ${Thread.currentThread()}, thread local value:
'${threadLocal.get()}'")
yield()
println("After yield, current thread: ${Thread.currentThread()}, thread local value:
'${threadLocal.get()}'")
}
job.join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value:
'${threadLocal.get()}'")
在這個例?中我們使? Dispatchers.Default 在后臺線程池中啟動了?個新的協程,所以它?作在線程 池中的不同線程中,但它仍然具有線程局部變量的值,我們指定使? threadLocal.asContextElement(value = "launch") ,?論協程執?在什么線程中都是沒有問題的。因此,其輸出如(調試)所?:
Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main' Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch' After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch' Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
這很容易忘記去設置相應的上下?元素。如果運?協程的線程不同,在協程中訪問的線程局部變量則可 能會產?意外的值。為了避免這種情況,建議使? ensurePresent ?法并且在不正確的使?時快速失 敗。 ThreadLocal 具有?流的?持,可以與任何 kotlinx.coroutines 提供的原語?起使?。但它有 ?個關鍵限制,即:當?個線程局部變量變化時,則這個新值不會傳播給協程調?者(因為上下?元素? 法追蹤所有 ThreadLocal 對象訪問),并且下次掛起時更新的值將丟失。使? withContext 在協程 中更新線程局部變量,詳? asContextElement。 另外,?個值可以存儲在?個可變的域中,例如 class Counter(var i: Int) ,是的,反過來,可以 存儲在線程局部的變量中。然?,在這個案例中你完全有責任來進?同步可能的對這個可變的域進?的 并發的修改。 對于?級的使?,例如,那些在內部使?線程局部傳遞數據的?于與?志記錄 MDC 集成,以及事務上下 ?或任何其它庫,請參?需要實現的 ThreadContextElement 接?的?檔。

浙公網安備 33010602011771號