<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      kotlin協(xié)程——>通道

      通道:延期的值提供了?種便捷的?法使單個值在多個協(xié)程之間進(jìn)?相互傳輸。通道提供了?種在流中傳輸 值的?法。

       

      通道基礎(chǔ):

        ?個 Channel 是?個和 BlockingQueue ?常相似的概念。其中?個不同是它代替了阻塞的 put 操 作并提供了掛起的 send,還替代了阻塞的 take 操作并提供了掛起的 receive。

      val channel = Channel<Int>()
      launch {
      // 這?可能是消耗?量 CPU 運(yùn)算的異步邏輯,我們將僅僅做 5 次整數(shù)的平?并發(fā)送
          for (x in 1..5) channel.send(x * x)
      }
      // 這?我們打印了 5 次被接收的整數(shù):
      repeat(5) { println(channel.receive()) }
      println("Done!")
      

        這段代碼的輸出如下:

      1
      4
      9
      16
      25
      Done!
      

        

      關(guān)閉與迭代通道

        和隊列不同,?個通道可以通過被關(guān)閉來表明沒有更多的元素將會進(jìn)?通道。在接收者中可以定期的使 ? for 循環(huán)來從通道中接收元素。 從概念上來說,?個 close 操作就像向通道發(fā)送了?個特殊的關(guān)閉指令。這個迭代停?就說明關(guān)閉指令 已經(jīng)被接收了。所以這?保證所有先前發(fā)送出去的元素都在通道關(guān)閉前被接收到。

      val channel = Channel<Int>()
      launch {
          for (x in 1..5) channel.send(x * x)
          channel.close() // 我們結(jié)束發(fā)送
      }
      // 這?我們使? `for` 循環(huán)來打印所有被接收到的元素(直到通道被關(guān)閉)
      for (y in channel) println(y)
      println("Done!")
      

        

      構(gòu)建通道?產(chǎn)者

        協(xié)程?成?系列元素的模式很常?。這是 ?產(chǎn)者?消費(fèi)者 模式的?部分,并且經(jīng)常能在并發(fā)的代碼中 看到它。你可以將?產(chǎn)者抽象成?個函數(shù),并且使通道作為它的參數(shù),但這與必須從函數(shù)中返回結(jié)果的 常識相違悖。 這?有?個名為 produce 的便捷的協(xié)程構(gòu)建器,可以很容易的在?產(chǎn)者端正確?作,并且我們使?擴(kuò) 展函數(shù) consumeEach 在消費(fèi)者端替代 for 循環(huán):

      val squares = produceSquares()
      squares.consumeEach { println(it) }
      println("Done!")
      

        

      管道:管道是?種?個協(xié)程在流中開始?產(chǎn)可能?窮多個元素的模式:

      fun CoroutineScope.produceNumbers() = produce<Int> {
          var x = 1
          while (true) send(x++) // 在流中開始從 1 ?產(chǎn)?窮多個整數(shù)
      }
      

        并且另?個或多個協(xié)程開始消費(fèi)這些流,做?些操作,并?產(chǎn)了?些額外的結(jié)果。在下?的例?中,對這 些數(shù)字僅僅做了平?操作:

      fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
          for (x in numbers) send(x * x)
      }
      

        主要的代碼啟動并連接了整個管道:

      val numbers = produceNumbers() // 從 1 開始?成整數(shù)
      val squares = square(numbers) // 整數(shù)求平?
      repeat(5) {
          println(squares.receive()) // 輸出前五個
      }
      println("Done!") // ?此已完成
      coroutineContext.cancelChildren() // 取消?協(xié)程
      

        所有創(chuàng)建了協(xié)程的函數(shù)被定義在了 CoroutineScope 的擴(kuò)展上,所以我們可以依靠結(jié)構(gòu)化并發(fā)來 確保沒有常駐在我們的應(yīng)?程序中的全局協(xié)程。

       

      使用管道的素數(shù):讓我們來展??個極端的例??在協(xié)程中使??個管道來?成素數(shù)。我們開啟了?個數(shù)字的?限序列。

      fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
          var x = start
          while (true) send(x++) // 開啟了?個?限的整數(shù)流
      }
      

        在下?的管道階段中過濾了來源于流中的數(shù)字,刪除了所有可以被給定素數(shù)整除的數(shù)字。

      fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
          for (x in numbers) if (x % prime != 0) send(x)
      }
      

        現(xiàn)在我們開啟了?個從 2 開始的數(shù)字流管道,從當(dāng)前的通道中取?個素數(shù),并為每?個我們發(fā)現(xiàn)的素數(shù) 啟動?個流?線階段:

      numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ……
      

        下?的例?打印了前?個素數(shù),在主線程的上下?中運(yùn)?整個管道。直到所有的協(xié)程在該主協(xié)程 runBlocking 的作?域中被啟動完成。我們不必使??個顯式的列表來保存所有被我們已經(jīng)啟動的協(xié) 程。我們使? cancelChildren 擴(kuò)展函數(shù)在我們打印了前?個素數(shù)以后來取消所有的?協(xié)程。

      var cur = numbersFrom(2)
      repeat(10) {
          val prime = cur.receive()
          println(prime)
          cur = filter(cur, prime)
      }
      coroutineContext.cancelChildren() // 取消所有的?協(xié)程來讓主協(xié)程結(jié)束
      

        這段代碼的輸出如下:

      2
      3
      5
      7
      11
      13
      17
      19
      23
      29
      

        注意,你可以在標(biāo)準(zhǔn)庫中使? iterator 協(xié)程構(gòu)建器來構(gòu)建?個相似的管道。使? iterator 替換 produce 、yield 替換 send 、next 替換 receive 、Iterator 替換 ReceiveChannel 來擺 脫協(xié)程作?域,你將不再需要 runBlocking 。然?,如上所?,如果你在 Dispatchers.Default 上下? 中運(yùn)?它,使?通道的管道的好處在于它可以充分利?多核? CPU。

        不過,這是?種?常不切實(shí)際的尋找素數(shù)的?法。在實(shí)踐中,管道調(diào)?了另外的?些掛起中的調(diào)?(就像 異步調(diào)?遠(yuǎn)程服務(wù))并且這些管道不能內(nèi)置使? sequence / iterator ,因?yàn)樗鼈儾槐辉试S隨意的掛 起,不像 produce 是完全異步的。

       

      扇出:

        多個協(xié)程也許會接收相同的管道,在它們之間進(jìn)?分布式?作。讓我們啟動?個定期產(chǎn)?整數(shù)的?產(chǎn)者 協(xié)程(每秒?個數(shù)字):

      fun CoroutineScope.produceNumbers() = produce<Int> {
          var x = 1 // 從 1 開始
          while (true) {
              send(x++) // 產(chǎn)?下?個數(shù)字
              delay(100) // 等待 0.1 秒
          }
      }
      

        現(xiàn)在讓我們啟動五個處理器協(xié)程并讓它們?作將近?秒。看看發(fā)?了什么:

      val producer = produceNumbers()
      repeat(5) { launchProcessor(it, producer) }
      delay(950)
      producer.cancel() // 取消協(xié)程?產(chǎn)者從?將它們?nèi)繗⑺?

        該輸出將類似于如下所?,盡管接收每個特定整數(shù)的處理器 id 可能會不同:

      Processor #2 received 1
      Processor #4 received 2
      Processor #0 received 3
      Processor #1 received 4
      Processor #3 received 5
      Processor #2 received 6
      Processor #4 received 7
      Processor #0 received 8
      Processor #1 received 9
      Processor #3 received 10
      

        

      扇入

        多個協(xié)程可以發(fā)送到同?個通道。?如說,讓我們創(chuàng)建?個字符串的通道,和?個在這個通道中以指定 的延遲反復(fù)發(fā)送?個指定字符串的掛起函數(shù):

      suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
          while (true) {
              delay(time)
              channel.send(s)
          }
      }
      

        現(xiàn)在,我們啟動了?個發(fā)送字符串的協(xié)程,讓我們看看會發(fā)?什么(在?例中,我們在主線程的上下?中 作為主協(xié)程的?協(xié)程來啟動它們):

      val channel = Channel<String>()
      launch { sendString(channel, "foo", 200L) }
      launch { sendString(channel, "BAR!", 500L) }
      repeat(6) { // 接收前六個
          println(channel.receive())
      }
      coroutineContext.cancelChildren() // 取消所有?協(xié)程來讓主協(xié)程結(jié)束
      

        輸出如下:

      foo
      foo
      BAR!
      foo
      foo
      BAR!
      

        

      帶緩沖的通道

        到?前為?展?的通道都是沒有緩沖區(qū)的。?緩沖的通道在發(fā)送者和接收者相遇時傳輸元素(也稱“對 接”)。如果發(fā)送先被調(diào)?,則它將被掛起直到接收被調(diào)?,如果接收先被調(diào)?,它將被掛起直到發(fā)送被調(diào) ?。 Channel() ??函數(shù)與 produce 建造器通過?個可選的參數(shù) capacity 來指定 緩沖區(qū)?? 。緩沖允 許發(fā)送者在被掛起前發(fā)送多個元素,就像 BlockingQueue 有指定的容量?樣,當(dāng)緩沖區(qū)被占滿的時 候?qū)鹱枞?/p>

      val channel = Channel<Int>(4) // 啟動帶緩沖的通道
      val sender = launch { // 啟動發(fā)送者協(xié)程
          repeat(10) {
              println("Sending $it") // 在每?個元素發(fā)送前打印它們
              channel.send(it) // 將在緩沖區(qū)被占滿時掛起
          }
      }
      // 沒有接收到東西……只是等待……
      delay(1000)
      sender.cancel() // 取消發(fā)送者協(xié)程
      

        使?緩沖通道并給 capacity 參數(shù)傳? 四 它將打印“sending”五 次:

      Sending 0
      Sending 1
      Sending 2
      Sending 3
      Sending 4
      

        前四個元素被加?到了緩沖區(qū)并且發(fā)送者在試圖發(fā)送第五個元素的時候被掛起。

       

      通道是公平的

        發(fā)送和接收操作是 公平的 并且尊重調(diào)?它們的多個協(xié)程。它們遵守先進(jìn)先出原則,可以看到第?個協(xié) 程調(diào)? receive 并得到了元素。在下?的例?中兩個協(xié)程“乒”和“乓”都從共享的“桌?”通道接收到 這個“球”元素

      data class Ball(var hits: Int)
      fun main() = runBlocking {
          val table = Channel<Ball>() // ?個共享的 table(桌?)
          launch { player("ping", table) }
          launch { player("pong", table) }
          table.send(Ball(0)) // 乒乓球
          delay(1000) // 延遲 1 秒鐘
          coroutineContext.cancelChildren() // 游戲結(jié)束,取消它們
      }
      suspend fun player(name: String, table: Channel<Ball>) {
          for (ball in table) { // 在循環(huán)中接收球
              ball.hits++
              println("$name $ball")
              delay(300) // 等待?段時間
              table.send(ball) // 將球發(fā)送回去
          }
      }
      

        “乒”協(xié)程?先被啟動,所以它?先接收到了球。甚?雖然“乒”協(xié)程在將球發(fā)送會桌?以后?即開始接 收,但是球還是被“乓”協(xié)程接收了,因?yàn)樗?直在等待著接收球:

      ping Ball(hits=1)
      pong Ball(hits=2)
      ping Ball(hits=3)
      pong Ball(hits=4)
      

        注意,有時候通道執(zhí)?時由于執(zhí)?者的性質(zhì)?看起來不那么公平。點(diǎn)擊這個提案來查看更多細(xì)節(jié)。

       

      計數(shù)器通道:

        計時器通道是?種特別的會合通道,每次經(jīng)過特定的延遲都會從該通道進(jìn)?消費(fèi)并產(chǎn)? Unit 。雖然它 看起來似乎沒?,它被?來構(gòu)建分段來創(chuàng)建復(fù)雜的基于時間的 produce 管道和進(jìn)?窗?化操作以及其 它時間相關(guān)的處理。可以在 select 中使?計時器通道來進(jìn)?“打勾”操作。 使????法 ticker 來創(chuàng)建這些通道。為了表明不需要其它元素,請使? ReceiveChannel.cancel ? 法。 現(xiàn)在讓我們看看它是如何在實(shí)踐中?作的:

      import kotlinx.coroutines.*
      import kotlinx.coroutines.channels.*
      fun main() = runBlocking<Unit> {
          val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //創(chuàng)建計時器通道
          var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
          println("Initial element is available immediately: $nextElement") // no initial delay
          nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent
          elements have 100ms delay
          println("Next element is not ready in 50 ms: $nextElement")
          nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
          println("Next element is ready in 100 ms: $nextElement")
      // 模擬?量消費(fèi)延遲
          println("Consumer pauses for 150ms")
          delay(150)
      // 下?個元素?即可?
          nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
          println("Next element is available immediately after large consumer delay:
                  $nextElement")
      // 請注意,`receive` 調(diào)?之間的暫停被考慮在內(nèi),下?個元素的到達(dá)速度更快
          nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
          println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
          tickerChannel.cancel() // 表明不再需要更多的元素
      }
      

        它的打印如下:

      Initial element is available immediately: kotlin.Unit
      Next element is not ready in 50 ms: null
      Next element is ready in 100 ms: kotlin.Unit
      Consumer pauses for 150ms
      Next element is available immediately after large consumer delay: kotlin.Unit
      Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
      

        請注意,ticker 知道可能的消費(fèi)者暫停,并且默認(rèn)情況下會調(diào)整下?個?成的元素如果發(fā)?暫停則延遲, 試圖保持固定的?成元素率。 給可選的 mode 參數(shù)傳? TickerMode.FIXED_DELAY 可以保持固定元素之間的延遲。

      posted @ 2021-03-17 23:00  王世楨  閱讀(346)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久天天躁夜夜躁狠狠| 中文字幕人妻丝袜美腿乱| 久久久精品人妻一区二区三区| 人人玩人人添人人澡超碰| 成人亚欧欧美激情在线观看| 日韩精品无码人妻一区二区三区| 无遮挡高潮国产免费观看| 自拍偷区亚洲综合第二区| 极品vpswindows少妇| 日本福利一区二区精品| 国产AV无码专区亚洲AV漫画| 免费网站看V片在线毛| 国产爆乳无码av在线播放| 伦理片午夜视频在线观看| 亚洲国产精品一区二区久| 精品三级在线| 亚洲综合天堂一区二区三区| 一本大道av人久久综合| 日本欧美一区二区免费视频| 少妇又爽又刺激视频| 国产精品中文字幕第一页| 亚洲一区二区偷拍精品| 午夜dv内射一区二区| 久久欧洲精品成av人片| 国产成人亚洲精品在线看| 三级国产在线观看| 2018年亚洲欧美在线v| 九九热免费在线观看视频| 成全高清在线播放电视剧 | 日韩精品国产另类专区| 国产精品伦人一久二久三久| 无码专区视频精品老司机| 毛片亚洲AV无码精品国产午夜| 九九热热久久这里只有精品| 国产精品亚洲а∨天堂2021| 实拍女处破www免费看| 国产精品一区二区三区专区| 男人猛躁进女人免费播放| 起碰免费公开97在线视频| 国产精品99一区二区三区| 国产精品综合av一区二区国产馆|