kotlin協(xié)程——>共享的可變狀態(tài)與并發(fā)
共享的可變狀態(tài)與并發(fā)
協(xié)程可?多線程調(diào)度器(?如默認的 Dispatchers.Default)并發(fā)執(zhí)?。這樣就可以提出所有常?的并發(fā) 問題。主要的問題是同步訪問共享的可變狀態(tài)。協(xié)程領域?qū)@個問題的?些解決?案類似于多線程領域 中的解決?案,但其它解決?案則是獨???的。
問題
我們啟動?百個協(xié)程,它們都做?千次相同的操作。我們同時會測量它們的完成時間以便進?步的?較
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協(xié)程數(shù)量
val k = 1000 // 每個協(xié)程重復執(zhí)?同?動作的次數(shù)
val time = measureTimeMillis {
coroutineScope { // 協(xié)程的作?域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
我們從?個?常簡單的動作開始:使?多線程的 Dispatchers.Default 來遞增?個共享的可變變量
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
這段代碼最后打印出什么結果?它不太可能打印出“Counter = 100000”,因為?百個協(xié)程在多個線程中 同時遞增計數(shù)器但沒有做并發(fā)處理。
volatile ?濟于事
有?種常?的誤解:volatile 可以解決并發(fā)問題。讓我們嘗試?下:
@Volatile // 在 Kotlin 中 `volatile` 是?個注解
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
這段代碼運?速度更慢了,但我們最后仍然沒有得到“Counter = 100000”這個結果,因為 volatile 變量 保證可線性化(這是“原?”的技術術語)讀取和寫?變量,但在?量動作(在我們的?例中即“遞增”操 作)發(fā)?時并不提供原?性。
線程安全的數(shù)據(jù)結構
?種對線程、協(xié)程都有效的常規(guī)解決?法,就是使?線程安全(也稱為同步的、可線性化、原?)的數(shù)據(jù)結 構,它為需要在共享狀態(tài)上執(zhí)?的相應操作提供所有必需的同步處理。在簡單的計數(shù)器場景中,我們可 以使?具有 incrementAndGet 原?操作的 AtomicInteger 類:
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
這是針對此類特定問題的最快解決?案。它適?于普通計數(shù)器、集合、隊列和其他標準數(shù)據(jù)結構以及它 們的基本操作。然?,它并不容易被擴展來應對復雜狀態(tài)、或?些沒有現(xiàn)成的線程安全實現(xiàn)的復雜操作
以細粒度限制線程
限制線程 是解決共享可變狀態(tài)問題的?種?案:對特定共享狀態(tài)的所有訪問權都限制在單個線程中。它 通常應?于 UI 程序中:所有 UI 狀態(tài)都局限于單個事件分發(fā)線程或應?主線程中。這在協(xié)程中很容易實 現(xiàn),通過使??個單線程上下?:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 將每次?增限制在單線程上下?中
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
這段代碼運??常緩慢,因為它進?了 細粒度 的線程限制。每個增量操作都得使? [withContext(counterContext)] 塊從多線程 Dispatchers.Default 上下?切換到單線程上下?。
以粗粒度限制線程
在實踐中,線程限制是在?段代碼中執(zhí)?的,例如:狀態(tài)更新類業(yè)務邏輯中?部分都是限于單線程中。下 ?的?例演?了這種情況,在單線程上下?中運?每個協(xié)程。
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 將?切都限制在單線程上下?中
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
這段代碼運?更快?且打印出了正確的結果。
互斥
該問題的互斥解決?案:使?永遠不會同時執(zhí)?的 關鍵代碼塊 來保護共享狀態(tài)的所有修改。在阻塞的 世界中,你通常會為此?的使? synchronized 或者 ReentrantLock 。在協(xié)程中的替代品叫做 Mutex 。它具有 lock 和 unlock ?法,可以隔離關鍵的部分。關鍵的區(qū)別在于 Mutex.lock() 是?個 掛起函數(shù),它不會阻塞線程。 還有 withLock 擴展函數(shù),可以?便的替代常?的 mutex.lock(); try { …… } finally { mutex.unlock() } 模式:
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// ?鎖保護每次?增
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
此?例中鎖是細粒度的,因此會付出?些代價。但是對于某些必須定期修改共享狀態(tài)的場景,它是?個 不錯的選擇,但是沒有?然線程可以限制此狀態(tài)。
Actors
?個 actor 是由協(xié)程、被限制并封裝到該協(xié)程中的狀態(tài)以及?個與其它協(xié)程通信的 通道 組合?成的? 個實體。?個簡單的 actor 可以簡單的寫成?個函數(shù),但是?個擁有復雜狀態(tài)的 actor 更適合由類來表 ?。
有?個 actor 協(xié)程構建器,它可以?便地將 actor 的郵箱通道組合到其作?域中(?來接收消息)、組合 發(fā)送 channel 與結果集對象,這樣對 actor 的單個引?就可以作為其句柄持有。
使? actor 的第?步是定義?個 actor 要處理的消息類。Kotlin 的密封類很適合這種場景。我們使? IncCounter 消息(?來遞增計數(shù)器)和 GetCounter 消息(?來獲取值)來定義 CounterMsg 密 封類。后者需要發(fā)送回復。CompletableDeferred 通信原語表?未來可知(可傳達)的單個值,這?被? 于此?的。
// 計數(shù)器 Actor 的各種類型 sealed class CounterMsg object IncCounter : CounterMsg() // 遞增計數(shù)器的單向消息 class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 攜帶回復的請求
接下來我們定義?個函數(shù),使? actor 協(xié)程構建器來啟動?個 actor:
// 這個函數(shù)啟動?個新的計數(shù)器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 狀態(tài)
for (msg in channel) { // 即將到來消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
main 函數(shù)代碼很簡單:
fun main() = runBlocking<Unit> {
val counter = counterActor() // 創(chuàng)建該 actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// 發(fā)送?條消息以?來從?個 actor 中獲取計數(shù)值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // 關閉該actor
}
actor 本?執(zhí)?時所處上下?(就正確性??)?關緊要。?個 actor 是?個協(xié)程,??個協(xié)程是按順序 執(zhí)?的,因此將狀態(tài)限制到特定協(xié)程可以解決共享可變狀態(tài)的問題。實際上,actor 可以修改??的私有 狀態(tài),但只能通過消息互相影響(避免任何鎖定)。
actor 在?負載下?鎖更有效,因為在這種情況下它總是有?作要做,?且根本不需要切換到不同的上下?。
注意,actor 協(xié)程構建器是?個雙重的 produce 協(xié)程構建器。?個 actor 與它接收消息的通道相關 聯(lián),??個 producer 與它發(fā)送元素的通道相關聯(lián)。

浙公網(wǎng)安備 33010602011771號