kotlin協程——>select 表達式(實驗性的)
select 表達式(實驗性的)
select 表達式可以同時等待多個掛起函數,并 選擇 第?個可?的。
在通道中 select
我們現在有兩個字符串?產者:fizz 和 buzz 。其中 fizz 每 300 毫秒?成?個“Fizz”字符串:
fun CoroutineScope.fizz() = produce<String> {
while (true) { // 每 300 毫秒發送?個 "Fizz"
delay(300)
send("Fizz")
}
}
接著 buzz 每 500 毫秒?成?個“Buzz!”字符串:
fun CoroutineScope.buzz() = produce<String> {
while (true) { // 每 500 毫秒發送?個"Buzz!"
delay(500)
send("Buzz!")
}
}
使? receive 掛起函數,我們可以從兩個通道接收 其中?個 的數據。但是 select 表達式允許我們使? 其 onReceive ?句 同時 從兩者接收:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> 意味著該 select 表達式不返回任何結果
fizz.onReceive { value -> // 這是第?個 select ?句
println("fizz -> '$value'")
}
buzz.onReceive { value -> // 這是第?個 select ?句
println("buzz -> '$value'")
}
}
}
讓我們運?代碼 7 次:
val fizz = fizz()
val buzz = buzz()
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // 取消 fizz 和 buzz 協程
這段代碼的執?結果如下:
fizz -> 'Fizz' buzz -> 'Buzz!' fizz -> 'Fizz' fizz -> 'Fizz' buzz -> 'Buzz!' fizz -> 'Fizz' buzz -> 'Buzz!'
通道關閉時 select
select 中的 onReceive ?句在已經關閉的通道執?會發?失敗,并導致相應的 select 拋出異常。我 們可以使? onReceiveOrNull ?句在關閉通道時執?特定操作。以下?例還顯?了 select 是?個返 回其查詢?法結果的表達式:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
注意,onReceiveOrNull 是?個僅在?于不可空元素的通道上定義的擴展函數,以使關閉的通道與空值 之間不會出現意外的混亂。 現在有?個?成四次“Hello”字符串的 a 通道,和?個?成四次“World”字符串的 b 通道,我們在這 兩個通道上使?它:
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(8) { // 打印最早的?個結果
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
這段代碼的結果?常有趣,所以我們將在細節中分析它:
a -> 'Hello 0' a -> 'Hello 1' b -> 'World 0' a -> 'Hello 2' a -> 'Hello 3' b -> 'World 1' Channel 'a' is closed Channel 'a' is closed
有?個結果可以通過觀察得出。
?先,select 偏向于 第?個?句,當可以同時選到多個?句時,第?個?句將被選中。在這?,兩個通 道都在不斷地?成字符串,因此 a 通道作為 select 中的第?個?句獲勝。然?因為我們使?的是?緩 沖通道,所以 a 在其調? send 時會不時地被掛起,進? b 也有機會發送。
第?個觀察結果是,當通道已經關閉時,會?即選擇 onReceiveOrNull。
Select 以發送
Select 表達式具有 onSend ?句,可以很好的與選擇的偏向特性結合使?。
我們來編寫?個整數?成器的?例,當主通道上的消費者?法跟上它時,它會將值發送到 side 通道 上:
fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
for (num in 1..10) { // ?產從 1 到 10 的 10 個數值
delay(100) // 延遲 100 毫秒
select<Unit> {
onSend(num) {} // 發送到主通道
side.onSend(num) {} // 或者發送到 side 通道
}
}
}
消費者將會?常緩慢,每個數值處理需要 250 毫秒:
val side = Channel<Int>() // 分配 side 通道
launch { // 對于 side 通道來說,這是?個很快的消費者
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(side).consumeEach {
println("Consuming $it")
delay(250) // 不要著急,讓我們正確消化消耗被發送來的數字
}
println("Done consuming")
coroutineContext.cancelChildren()
讓我們看看會發?什么:
Consuming 1 Side channel has 2 Side channel has 3 Consuming 4 Side channel has 5 Side channel has 6 Consuming 7 Side channel has 8 Side channel has 9 Consuming 10 Done consuming
Select 延遲值
延遲值可以使? onAwait ?句查詢。讓我們啟動?個異步函數,它在隨機的延遲后會延遲返回字符串:
fun CoroutineScope.asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
讓我們隨機啟動?余個異步函數,每個都延遲隨機的時間。
fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
現在 main 函數在等待第?個函數完成,并統計仍處于激活狀態的延遲值的數量。注意,我們在這?使? select 表達式事實上是作為?種 Kotlin DSL,所以我們可以?任意代碼為它提供?句。在這種情況 下,我們遍歷?個延遲值的隊列,并為每個延遲值提供 onAwait ?句的調?。
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
該輸出如下:
Deferred 4 produced answer 'Waited for 128 ms' 11 coroutines are still active
在延遲值通道上切換
我們現在來編寫?個通道?產者函數,它消費?個產?延遲字符串的通道,并等待每個接收的延遲值, 但它只在下?個延遲值到達或者通道關閉之前處于運?狀態。此?例將 onReceiveOrNull 和 onAwait ?句放在同?個 select 中:
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) =
produce<String> {
var current = input.receive() // 從第?個接收到的延遲值開始
while (isActive) { // 循環直到被取消或關閉
val next = select<Deferred<String>?> { // 從這個 select 中返回下?個延遲值或 null
input.onReceiveOrNull { update ->
update // 替換下?個要等待的值
}
current.onAwait { value ->
send(value) // 發送當前延遲?成的值
input.receiveOrNull() // 然后使?從輸?通道得到的下?個延遲值
}
}
if (next == null) {
println("Channel was closed")
break // 跳出循環
} else {
current = next
}
}
}
為了測試它,我們將??個簡單的異步函數,它在特定的延遲后返回特定的字符串:
fun CoroutineScope.asyncString(str: String, time: Long) = async {
delay(time)
str
}
main 函數只是啟動?個協程來打印 switchMapDeferreds 的結果并向它發送?些測試數據:
val chan = Channel<Deferred<String>>() // 測試使?的通道
launch { // 啟動打印協程
for (s in switchMapDeferreds(chan))
println(s) // 打印每個獲得的字符串
}
chan.send(asyncString("BEGIN", 100))
delay(200) // 充?的時間來?產 "BEGIN"
chan.send(asyncString("Slow", 500))
delay(100) // 不充?的時間來?產 "Slow"
chan.send(asyncString("Replace", 100))
delay(500) // 在最后?個前給它?點時間
chan.send(asyncString("END", 500))
delay(1000) // 給執??段時間
chan.close() // 關閉通道……
delay(500) // 然后等待?段時間來讓它結束
這段代碼的執?結果:
BEGIN Replace END Channel was closed

浙公網安備 33010602011771號