kotlin協(xié)程——>通道
通道:延期的值提供了?種便捷的?法使單個值在多個協(xié)程之間進(jìn)?相互傳輸。通道提供了?種在流中傳輸 值的?法。
通道基礎(chǔ):
?個 Channel 是?個和 BlockingQueue ?常相似的概念。其中?個不同是它代替了阻塞的 put 操 作并提供了掛起的 send,還替代了阻塞的 take 操作并提供了掛起的 receive。
val channel = Channel<Int>()
launch {
// 這?可能是消耗?量 CPU 運(yùn)算的異步邏輯,我們將僅僅做 5 次整數(shù)的平?并發(fā)送
for (x in 1..5) channel.send(x * x)
}
// 這?我們打印了 5 次被接收的整數(shù):
repeat(5) { println(channel.receive()) }
println("Done!")
這段代碼的輸出如下:
1 4 9 16 25 Done!
關(guān)閉與迭代通道
和隊列不同,?個通道可以通過被關(guān)閉來表明沒有更多的元素將會進(jìn)?通道。在接收者中可以定期的使 ? for 循環(huán)來從通道中接收元素。 從概念上來說,?個 close 操作就像向通道發(fā)送了?個特殊的關(guān)閉指令。這個迭代停?就說明關(guān)閉指令 已經(jīng)被接收了。所以這?保證所有先前發(fā)送出去的元素都在通道關(guān)閉前被接收到。
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 我們結(jié)束發(fā)送
}
// 這?我們使? `for` 循環(huán)來打印所有被接收到的元素(直到通道被關(guān)閉)
for (y in channel) println(y)
println("Done!")
構(gòu)建通道?產(chǎn)者
協(xié)程?成?系列元素的模式很常?。這是 ?產(chǎn)者?消費(fèi)者 模式的?部分,并且經(jīng)常能在并發(fā)的代碼中 看到它。你可以將?產(chǎn)者抽象成?個函數(shù),并且使通道作為它的參數(shù),但這與必須從函數(shù)中返回結(jié)果的 常識相違悖。 這?有?個名為 produce 的便捷的協(xié)程構(gòu)建器,可以很容易的在?產(chǎn)者端正確?作,并且我們使?擴(kuò) 展函數(shù) consumeEach 在消費(fèi)者端替代 for 循環(huán):
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
管道:管道是?種?個協(xié)程在流中開始?產(chǎn)可能?窮多個元素的模式:
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 在流中開始從 1 ?產(chǎn)?窮多個整數(shù)
}
并且另?個或多個協(xié)程開始消費(fèi)這些流,做?些操作,并?產(chǎn)了?些額外的結(jié)果。在下?的例?中,對這 些數(shù)字僅僅做了平?操作:
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
主要的代碼啟動并連接了整個管道:
val numbers = produceNumbers() // 從 1 開始?成整數(shù)
val squares = square(numbers) // 整數(shù)求平?
repeat(5) {
println(squares.receive()) // 輸出前五個
}
println("Done!") // ?此已完成
coroutineContext.cancelChildren() // 取消?協(xié)程
所有創(chuàng)建了協(xié)程的函數(shù)被定義在了 CoroutineScope 的擴(kuò)展上,所以我們可以依靠結(jié)構(gòu)化并發(fā)來 確保沒有常駐在我們的應(yīng)?程序中的全局協(xié)程。
使用管道的素數(shù):讓我們來展??個極端的例??在協(xié)程中使??個管道來?成素數(shù)。我們開啟了?個數(shù)字的?限序列。
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // 開啟了?個?限的整數(shù)流
}
在下?的管道階段中過濾了來源于流中的數(shù)字,刪除了所有可以被給定素數(shù)整除的數(shù)字。
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
現(xiàn)在我們開啟了?個從 2 開始的數(shù)字流管道,從當(dāng)前的通道中取?個素數(shù),并為每?個我們發(fā)現(xiàn)的素數(shù) 啟動?個流?線階段:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ……
下?的例?打印了前?個素數(shù),在主線程的上下?中運(yùn)?整個管道。直到所有的協(xié)程在該主協(xié)程 runBlocking 的作?域中被啟動完成。我們不必使??個顯式的列表來保存所有被我們已經(jīng)啟動的協(xié) 程。我們使? cancelChildren 擴(kuò)展函數(shù)在我們打印了前?個素數(shù)以后來取消所有的?協(xié)程。
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 取消所有的?協(xié)程來讓主協(xié)程結(jié)束
這段代碼的輸出如下:
2 3 5 7 11 13 17 19 23 29
注意,你可以在標(biāo)準(zhǔn)庫中使? iterator 協(xié)程構(gòu)建器來構(gòu)建?個相似的管道。使? iterator 替換 produce 、yield 替換 send 、next 替換 receive 、Iterator 替換 ReceiveChannel 來擺 脫協(xié)程作?域,你將不再需要 runBlocking 。然?,如上所?,如果你在 Dispatchers.Default 上下? 中運(yùn)?它,使?通道的管道的好處在于它可以充分利?多核? CPU。
不過,這是?種?常不切實(shí)際的尋找素數(shù)的?法。在實(shí)踐中,管道調(diào)?了另外的?些掛起中的調(diào)?(就像 異步調(diào)?遠(yuǎn)程服務(wù))并且這些管道不能內(nèi)置使? sequence / iterator ,因?yàn)樗鼈儾槐辉试S隨意的掛 起,不像 produce 是完全異步的。
扇出:
多個協(xié)程也許會接收相同的管道,在它們之間進(jìn)?分布式?作。讓我們啟動?個定期產(chǎn)?整數(shù)的?產(chǎn)者 協(xié)程(每秒?個數(shù)字):
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // 從 1 開始
while (true) {
send(x++) // 產(chǎn)?下?個數(shù)字
delay(100) // 等待 0.1 秒
}
}
現(xiàn)在讓我們啟動五個處理器協(xié)程并讓它們?作將近?秒。看看發(fā)?了什么:
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 取消協(xié)程?產(chǎn)者從?將它們?nèi)繗⑺?
該輸出將類似于如下所?,盡管接收每個特定整數(shù)的處理器 id 可能會不同:
Processor #2 received 1 Processor #4 received 2 Processor #0 received 3 Processor #1 received 4 Processor #3 received 5 Processor #2 received 6 Processor #4 received 7 Processor #0 received 8 Processor #1 received 9 Processor #3 received 10
扇入
多個協(xié)程可以發(fā)送到同?個通道。?如說,讓我們創(chuàng)建?個字符串的通道,和?個在這個通道中以指定 的延遲反復(fù)發(fā)送?個指定字符串的掛起函數(shù):
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
現(xiàn)在,我們啟動了?個發(fā)送字符串的協(xié)程,讓我們看看會發(fā)?什么(在?例中,我們在主線程的上下?中 作為主協(xié)程的?協(xié)程來啟動它們):
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 接收前六個
println(channel.receive())
}
coroutineContext.cancelChildren() // 取消所有?協(xié)程來讓主協(xié)程結(jié)束
輸出如下:
foo foo BAR! foo foo BAR!
帶緩沖的通道
到?前為?展?的通道都是沒有緩沖區(qū)的。?緩沖的通道在發(fā)送者和接收者相遇時傳輸元素(也稱“對 接”)。如果發(fā)送先被調(diào)?,則它將被掛起直到接收被調(diào)?,如果接收先被調(diào)?,它將被掛起直到發(fā)送被調(diào) ?。 Channel() ??函數(shù)與 produce 建造器通過?個可選的參數(shù) capacity 來指定 緩沖區(qū)?? 。緩沖允 許發(fā)送者在被掛起前發(fā)送多個元素,就像 BlockingQueue 有指定的容量?樣,當(dāng)緩沖區(qū)被占滿的時 候?qū)鹱枞?/p>
val channel = Channel<Int>(4) // 啟動帶緩沖的通道
val sender = launch { // 啟動發(fā)送者協(xié)程
repeat(10) {
println("Sending $it") // 在每?個元素發(fā)送前打印它們
channel.send(it) // 將在緩沖區(qū)被占滿時掛起
}
}
// 沒有接收到東西……只是等待……
delay(1000)
sender.cancel() // 取消發(fā)送者協(xié)程
使?緩沖通道并給 capacity 參數(shù)傳? 四 它將打印“sending”五 次:
Sending 0 Sending 1 Sending 2 Sending 3 Sending 4
前四個元素被加?到了緩沖區(qū)并且發(fā)送者在試圖發(fā)送第五個元素的時候被掛起。
通道是公平的
發(fā)送和接收操作是 公平的 并且尊重調(diào)?它們的多個協(xié)程。它們遵守先進(jìn)先出原則,可以看到第?個協(xié) 程調(diào)? receive 并得到了元素。在下?的例?中兩個協(xié)程“乒”和“乓”都從共享的“桌?”通道接收到 這個“球”元素
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // ?個共享的 table(桌?)
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // 乒乓球
delay(1000) // 延遲 1 秒鐘
coroutineContext.cancelChildren() // 游戲結(jié)束,取消它們
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 在循環(huán)中接收球
ball.hits++
println("$name $ball")
delay(300) // 等待?段時間
table.send(ball) // 將球發(fā)送回去
}
}
“乒”協(xié)程?先被啟動,所以它?先接收到了球。甚?雖然“乒”協(xié)程在將球發(fā)送會桌?以后?即開始接 收,但是球還是被“乓”協(xié)程接收了,因?yàn)樗?直在等待著接收球:
ping Ball(hits=1) pong Ball(hits=2) ping Ball(hits=3) pong Ball(hits=4)
注意,有時候通道執(zhí)?時由于執(zhí)?者的性質(zhì)?看起來不那么公平。點(diǎn)擊這個提案來查看更多細(xì)節(jié)。
計數(shù)器通道:
計時器通道是?種特別的會合通道,每次經(jīng)過特定的延遲都會從該通道進(jìn)?消費(fèi)并產(chǎn)? Unit 。雖然它 看起來似乎沒?,它被?來構(gòu)建分段來創(chuàng)建復(fù)雜的基于時間的 produce 管道和進(jìn)?窗?化操作以及其 它時間相關(guān)的處理。可以在 select 中使?計時器通道來進(jìn)?“打勾”操作。 使????法 ticker 來創(chuàng)建這些通道。為了表明不需要其它元素,請使? ReceiveChannel.cancel ? 法。 現(xiàn)在讓我們看看它是如何在實(shí)踐中?作的:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //創(chuàng)建計時器通道
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // no initial delay
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent
elements have 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// 模擬?量消費(fèi)延遲
println("Consumer pauses for 150ms")
delay(150)
// 下?個元素?即可?
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay:
$nextElement")
// 請注意,`receive` 調(diào)?之間的暫停被考慮在內(nèi),下?個元素的到達(dá)速度更快
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // 表明不再需要更多的元素
}
它的打印如下:
Initial element is available immediately: kotlin.Unit Next element is not ready in 50 ms: null Next element is ready in 100 ms: kotlin.Unit Consumer pauses for 150ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
請注意,ticker 知道可能的消費(fèi)者暫停,并且默認(rèn)情況下會調(diào)整下?個?成的元素如果發(fā)?暫停則延遲, 試圖保持固定的?成元素率。 給可選的 mode 參數(shù)傳? TickerMode.FIXED_DELAY 可以保持固定元素之間的延遲。

浙公網(wǎng)安備 33010602011771號