<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      kotlin協(xié)程——>異步流

      異步流

        掛起函數(shù)可以異步的返回單個值,但是該如何異步返回多個計算好的值呢?這正是 Kotlin 流(Flow)的 ?武之地。

      表示多個值

        在 Kotlin 中可以使?集合來表?多個值。?如說,我們可以擁有?個函數(shù) foo() ,它返回?個包含三 個數(shù)字的 List,然后使? forEach 打印它們:

      fun foo(): List<Int> = listOf(1, 2, 3)
      fun main() {
          foo().forEach { value -> println(value) }
      }
      

        這段代碼輸出如下:

      1
      2
      3
      

        序列:如果使??些消耗 CPU 資源的阻塞代碼計算數(shù)字(每次計算需要 100 毫秒)那么我們可以使? Sequence 來表?數(shù)字:

      fun foo(): Sequence<Int> = sequence { // 序列構(gòu)建器
          for (i in 1..3) {
              Thread.sleep(100) // 假裝我們正在計算
              yield(i) // 產(chǎn)?下?個值
          }
      }
      fun main() {
          foo().forEach { value -> println(value) }
      }
      

        這段代碼輸出相同的數(shù)字,但在打印每個數(shù)字之前等待 100 毫秒。

       

        掛起函數(shù):然?,計算過程阻塞運?該代碼的主線程。當這些值由異步代碼計算時,我們可以使? suspend 修飾 符標記函數(shù) foo ,這樣它就可以在不阻塞的情況下執(zhí)?其?作并將結(jié)果作為列表返回:

      suspend fun foo(): List<Int> {
          delay(1000) // 假裝我們在這?做了?些異步的事情
          return listOf(1, 2, 3)
      }
      fun main() = runBlocking<Unit> {
          foo().forEach { value -> println(value) }
      }
      

        這段代碼將會在等待?秒之后打印數(shù)字。

        

        流:使? List 結(jié)果類型,意味著我們只能?次返回所有值。為了表?異步計算的值流(stream),我們可以使 ? Flow 類型(正如同步計算值會使? Sequence 類型):

         foo(): Flow<Int> = flow { // 流構(gòu)建器
          for (i in 1..3) {
              delay(100) // 假裝我們在這?做了?些有?的事情
              emit(i) // 發(fā)送下?個值
          }
          main() = runBlocking<Unit> {
        // 啟動并發(fā)的協(xié)程以驗證主線程并未阻塞
          launch {
                  for (k in 1..3) {
                      println("I'm not blocked $k")
                      delay(100)
                  }
           }
      // 收集這個流
           foo().collect { value -> println(value) }
      

        這段代碼在不阻塞主線程的情況下每等待 100 毫秒打印?個數(shù)字。在主線程中運??個單獨的協(xié)程每 100 毫秒打印?次“I'm not blocked”已經(jīng)經(jīng)過了驗證。

      I'm not blocked 1
      1
      I'm not blocked 2
      2
      I'm not blocked 3
      3
      

        注意使? Flow 的代碼與先前?例的下述區(qū)別:

      名為 flow 的 Flow 類型構(gòu)建器函數(shù)。
      flow { ... } 構(gòu)建塊中的代碼可以掛起。
      函數(shù) foo() 不再標有 suspend 修飾符。
      流使? emit 函數(shù) 發(fā)射 值。
      流使? collect 函數(shù) 收集 值。
      

        

      流是冷的

        Flow 是?種類似于序列的冷流 — 這段 flow 構(gòu)建器中的代碼直到流被收集的時候才運?。這在以下的 ?例中?常明顯:

      foo(): Flow<Int> = flow {
      println("Flow started")
      for (i in 1..3) {
          delay(100)
          emit(i)
      }
      
      main() = runBlocking<Unit> {
      println("Calling foo...")
      val flow = foo()
      println("Calling collect...")
      flow.collect { value -> println(value) }
      println("Calling collect again...")
      flow.collect { value -> println(value) }
      

        打印如下:

      Calling foo...
      Calling collect...
      Flow started
      1
      2
      3
      Calling collect again...
      Flow started
      1
      2
      3
      

        這是返回?個流的 foo() 函數(shù)沒有標記 suspend 修飾符的主要原因。通過它??,foo() 會盡快 返回且不會進?任何等待。該流在每次收集的時候啟動,這就是為什么當我們再次調(diào)? collect 時我 們會看到“Flow started”。

       

      流取消

        流采?與協(xié)程同樣的協(xié)作取消。然?,流的基礎(chǔ)設(shè)施未引?其他取消點。取消完全透明。像往常?樣,流 的收集可以在當流在?個可取消的掛起函數(shù)(例如 delay)中掛起的時候取消,否則不能取消。 下?的?例展?了當 withTimeoutOrNull 塊中代碼在運?的時候流是如何在超時的情況下取消并停 ?執(zhí)?其代碼的:

      foo(): Flow<Int> = flow {
      for (i in 1..3) {
          delay(100)
          println("Emitting $i")
          emit(i)
      }
      main() = runBlocking<Unit> {
      withTimeoutOrNull(250) { // 在 250 毫秒后超時
          foo().collect { value -> println(value) }
      }
      println("Done")
      

        注意,在 foo() 函數(shù)中流僅發(fā)射兩個數(shù)字,產(chǎn)?以下輸出:

      Emitting 1
      1
      Emitting 2
      2
      Done
      

        

      流的構(gòu)建起

        先前?例中的 flow { ... } 構(gòu)建器是最基礎(chǔ)的?個。還有其他構(gòu)建器使流的聲明更簡單:

      flowOf 構(gòu)建器定義了?個發(fā)射固定值集的流。
      使? .asFlow() 擴展函數(shù),可以將各種集合與序列轉(zhuǎn)換為流。
      

        因此,從流中打印從 1 到 3 的數(shù)字的?例可以寫成:

      // 將?個整數(shù)區(qū)間轉(zhuǎn)化為流
      (1..3).asFlow().collect { value -> println(value) }
      

        

      過渡流操作符

        可以使?操作符轉(zhuǎn)換流,就像使?集合與序列?樣。過渡操作符應(yīng)?于上游流,并返回下游流。這些操 作符也是冷操作符,就像流?樣。這類操作符本?不是掛起函數(shù)。它運?的速度很快,返回新的轉(zhuǎn)換流的 定義。

        基礎(chǔ)的操作符擁有相似的名字,?如 map 與 filter。流與序列的主要區(qū)別在于這些操作符中的代碼可以 調(diào)?掛起函數(shù)。

        舉例來說,?個請求中的流可以使? map 操作符映射出結(jié)果,即使執(zhí)??個?時間的請求操作也可以 使?掛起函數(shù)來實現(xiàn):

      end fun performRequest(request: Int): String {
      delay(1000) // 模仿?時間運?的異步?作
      return "response $request"
      
      main() = runBlocking<Unit> {
      (1..3).asFlow() // ?個請求流
            .map { request -> performRequest(request) }
            .collect { response -> println(response) }    
      

        它產(chǎn)?以下三?,每??每秒出現(xiàn)?次:

      response 1
      response 2
      response 3
      

        轉(zhuǎn)換操作符:在流轉(zhuǎn)換操作符中,最通?的?種稱為 transform。它可以?來模仿簡單的轉(zhuǎn)換,例如 map 與 filter,以 及實施更復(fù)雜的轉(zhuǎn)換。使? transform 操作符,我們可以 發(fā)射 任意值任意次。

        ?如說,使? transform 我們可以在執(zhí)??時間運?的異步請求之前發(fā)射?個字符串并跟蹤這個響應(yīng):

      (1..3).asFlow() // ?個請求流
              .transform { request ->
                  emit("Making request $request")
                  emit(performRequest(request))
              }
          .collect { response -> println(response) }    
      

        這段代碼的輸出如下

      Making request 1
      response 1
      Making request 2
      response 2
      Making request 3
      response 3
      

        限長操作符:限?過渡操作符(例如 take)在流觸及相應(yīng)限制的時候會將它的執(zhí)?取消。協(xié)程中的取消操作總是通過 拋出異常來執(zhí)?,這樣所有的資源管理函數(shù)(如 try {...} finally {...} 塊)會在取消的情況下 正常運?:

      fun numbers(): Flow<Int> = flow {
          try {
              emit(1)
              emit(2)
              println("This line will not execute")
              emit(3)
          } finally {
              println("Finally in numbers")
          }
      }
      fun main() = runBlocking<Unit> {
          numbers()
                  .take(2) // 只獲取前兩個
                  .collect { value -> println(value) }
      }
      

        這段代碼的輸出清楚地表明,numbers() 函數(shù)中對 flow {...} 函數(shù)體的執(zhí)?在發(fā)射出第?個數(shù) 字后停?:

      1
      2
      Finally in numbers
      

        

      末端流操作符

        末端操作符是在流上?于啟動流收集的掛起函數(shù)。collect 是最基礎(chǔ)的末端操作符,但是還有另外?些 更?便使?的末端操作符:

      轉(zhuǎn)化為各種集合,例如 toList 與 toSet。
      獲取第?個(first)值與確保流發(fā)射單個(single)值的操作符。
      使? reduce 與 fold 將流規(guī)約到單個值。
      

        舉例來說:

      val sum = (1..5).asFlow()
              .map { it * it } // 數(shù)字 1 ? 5 的平?
              .reduce { a, b -> a + b } // 求和(末端操作符)
      println(sum)
      

        打印單個數(shù)字:

      55
      

        

      流是連續(xù)的

        流的每次單獨收集都是按順序執(zhí)?的,除?進?特殊操作的操作符使?多個流。該收集過程直接在協(xié)程 中運?,該協(xié)程調(diào)?末端操作符。默認情況下不啟動新協(xié)程。從上游到下游每個過渡操作符都會處理每 個發(fā)射出的值然后再交給末端操作符。 請參?以下?例,該?例過濾偶數(shù)并將其映射到字符串:

      (1..5).asFlow()
          .filter {
              println("Filter $it")
              it % 2 == 0
          }
          .map {
              println("Map $it")
              "string $it"
          }.collect {
              println("Collect $it")
          }
      

        執(zhí)行:

      Filter 1
      Filter 2
      Map 2
      Collect string 2
      Filter 3
      Filter 4
      Map 4
      Collect string 4
      Filter 5
      

        

      流上下文:

        流的收集總是在調(diào)?協(xié)程的上下?中發(fā)?。例如,如果有?個流 foo ,然后以下代碼在它的編寫者指定 的上下?中運?,??論流 foo 的實現(xiàn)細節(jié)如何:

      withContext(context) {
          foo.collect { value ->
              println(value) // 運?在指定上下?中
          }
      }
      

        流的該屬性稱為上下文保存:所以默認的,flow { ... } 構(gòu)建器中的代碼運?在相應(yīng)流的收集器提供的上下?中。舉例來說,考慮 打印線程的 foo 的實現(xiàn),它被調(diào)?并發(fā)射三個數(shù)字

      fun foo(): Flow<Int> = flow {
          log("Started foo flow")
          for (i in 1..3) {
              emit(i)
          }
      }
      fun main() = runBlocking<Unit> {
          foo().collect { value -> log("Collected $value") }
      }
      

        運?這段代碼:

      [main @coroutine#1] Started foo flow
      [main @coroutine#1] Collected 1
      [main @coroutine#1] Collected 2
      [main @coroutine#1] Collected 3
      

        由于 foo().collect 是在主線程調(diào)?的,則 foo 的流主體也是在主線程調(diào)?的。這是快速運?或 異步代碼的理想默認形式,它不關(guān)?執(zhí)?的上下?并且不會阻塞調(diào)?者。

       

          withContext發(fā)出錯誤:然?,?時間運?的消耗 CPU 的代碼也許需要在 Dispatchers.Default 上下?中執(zhí)?,并且更新 UI 的 代碼也許需要在 Dispatchers.Main 中執(zhí)?。通常,withContext ?于在 Kotlin 協(xié)程中改變代碼的上下 ?,但是 flow {...} 構(gòu)建器中的代碼必須遵循上下?保存屬性,并且不允許從其他上下?中發(fā)射 (emit)。

        嘗試運?下?的代碼:

      fun foo(): Flow<Int> = flow {
      // 在流構(gòu)建器中更改消耗 CPU 代碼的上下?的錯誤?式
          kotlinx.coroutines.withContext(Dispatchers.Default) {
              for (i in 1..3) {
                  Thread.sleep(100) // 假裝我們以消耗 CPU 的?式進?計算
                  emit(i) // 發(fā)射下?個值
              }
          }
      }
      fun main() = runBlocking<Unit> {
          foo().collect { value -> println(value) }
      }
      

        這段代碼產(chǎn)?如下的異常:

      Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
          Flow was collected in [CoroutineId(1),
      "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
          but emission happened in [CoroutineId(1),
      "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
          Please refer to 'flow' documentation or use 'flowOn' instead
      at ...
      

        flowOn 操作符:例外的是 flowOn 函數(shù),該函數(shù)?于更改流發(fā)射的上下?。以下?例展?了更改流上下?的正確?法, 該?例還通過打印相應(yīng)線程的名字以展?它們的?作?式:

      fun foo(): Flow<Int> = flow {
          for (i in 1..3) {
              Thread.sleep(100) // 假裝我們以消耗 CPU 的?式進?計算
              log("Emitting $i")
              emit(i) // 發(fā)射下?個值
          }
      }.flowOn(Dispatchers.Default) // 在流構(gòu)建器中改變消耗 CPU 代碼上下?的正確?式
      fun main() = runBlocking<Unit> {
          foo().collect { value ->
              log("Collected $value")
          }
      }
      

        注意,當收集發(fā)?在主線程中,flow { ... } 是如何在后臺線程中?作的: 這?要觀察的另?件事是 flowOn 操作符已改變流的默認順序性?,F(xiàn)在收集發(fā)?在?個協(xié)程中 (“coroutine#1”)?發(fā)射發(fā)?在運?于另?個線程中與收集協(xié)程并發(fā)運?的另?個協(xié)程 (“coroutine#2”)中。當上游流必須改變其上下?中的 CoroutineDispatcher 的時候,flowOn 操作符 創(chuàng)建了另?個協(xié)程。

       

      緩沖:

        從收集流所花費的時間來看,將流的不同部分運?在不同的協(xié)程中將會很有幫助,特別是當涉及到?時 間運?的異步操作時。例如,考慮?種情況,foo() 流的發(fā)射很慢,它每花費 100 毫秒才產(chǎn)??個元素;?收集器也?常慢,需要花費 300 毫秒來處理元素。讓我們看看從該流收集三個數(shù)字要花費多?時間:

      fun foo(): Flow<Int> = flow {
          for (i in 1..3) {
              delay(100) // 假裝我們異步等待了 100 毫秒
              emit(i) // 發(fā)射下?個值
          }
      }
      fun main() = runBlocking<Unit> {
          val time = measureTimeMillis {
              foo().collect { value ->
                  delay(300) // 假裝我們花費 300 毫秒來處理它
                  println(value)
              }
          }
          println("Collected in $time ms")
      }
      

        它會產(chǎn)?這樣的結(jié)果,整個收集過程?約需要 1200 毫秒(3 個數(shù)字,每個花費 400 毫秒):

      1
      2
      3
      Collected in 1220 ms
      

        我們可以在流上使? buffer 操作符來并發(fā)運? foo() 中發(fā)射元素的代碼以及收集的代碼,?不是順序運?它們:

      val time = measureTimeMillis {
          foo()
                  .buffer() // 緩沖發(fā)射項,?需等待
                  .collect { value ->
                      delay(300) // 假裝我們花費 300 毫秒來處理它
                      println(value)
                  }
      }
      println("Collected in $time ms")
      

        它產(chǎn)?了相同的數(shù)字,只是更快了,由于我們?效地創(chuàng)建了處理流?線,僅僅需要等待第?個數(shù)字產(chǎn)? 的 100 毫秒以及處理每個數(shù)字各需花費的 300 毫秒。這種?式?約花費了 1000 毫秒來運?:

      1
      2
      3
      Collected in 1071 ms
      注意,當必須更改 CoroutineDispatcher 時,flowOn 操作符使?了相同的緩沖機制,但是我們在 這?顯式地請求緩沖?不改變執(zhí)?上下?。

        合并:當流代表部分操作結(jié)果或操作狀態(tài)更新時,可能沒有必要處理每個值,?是只處理最新的那個。在本? 例中,當收集器處理它們太慢的時候,conflate 操作符可以?于跳過中間值。構(gòu)建前?的?例:

      val time = measureTimeMillis {
            foo()
                  .conflate() // 合并發(fā)射項,不對每個值進?處理
                  .collect { value ->
                      delay(300) // 假裝我們花費 300 毫秒來處理它
                      println(value)
                  }
      }
      println("Collected in $time ms")
      

        我們看到,雖然第?個數(shù)字仍在處理中,但第?個和第三個數(shù)字已經(jīng)產(chǎn)?,因此第?個是 conflated ,只 有最新的(第三個)被交付給收集器:

      1
      3
      Collected in 758 ms
      

        處理最新值:當發(fā)射器和收集器都很慢的時候,合并是加快處理速度的?種?式。它通過刪除發(fā)射值來實現(xiàn)。另?種 ?式是取消緩慢的收集器,并在每次發(fā)射新值的時候重新啟動它。有?組與 xxx 操作符執(zhí)?相同基本 邏輯的 xxxLatest 操作符,但是在新值產(chǎn)?的時候取消執(zhí)?其塊中的代碼。讓我們在先前的?例中嘗 試更換 conflate 為 collectLatest:

      val time = measureTimeMillis {
              foo()
                  .collectLatest { value -> // 取消并重新發(fā)射最后?個值
                      println("Collecting $value")
                      delay(300) // 假裝我們花費 300 毫秒來處理它
                      println("Done $value")
                  }
      }
      println("Collected in $time ms")
      

        由于 collectLatest 的函數(shù)體需要花費 300 毫秒,但是新值每 100 秒發(fā)射?次,我們看到該代碼塊對每 個值運?,但是只收集最后?個值:

      Collecting 1
      Collecting 2
      Collecting 3
      Done 3
      Collected in 741 ms
      

        由于 collectLatest 的函數(shù)體需要花費 300 毫秒,但是新值每 100 秒發(fā)射?次,我們看到該代碼塊對每 個值運?,但是只收集最后?個值:

      Collecting 1
      Collecting 2
      Collecting 3
      Done 3
      Collected in 741 ms
      

        

      組合多個流,組合多個流有很多種方式

        Zip:就像 Kotlin 標準庫中的 Sequence.zip 擴展函數(shù)?樣,流擁有?個 zip 操作符?于組合兩個流中的相關(guān)值:

      val nums = (1..3).asFlow() // 數(shù)字 1..3
      val strs = flowOf("one", "two", "three") // 字符串
      nums.zip(strs) { a, b -> "$a -> $b" } // 組合單個字符串
          .collect { println(it) } // 收集并打印
      

        ?例打印如下:

      1 -> one
      2 -> two
      3 -> three
      

        Combine:當流表??個變量或操作的最新值時(請參閱相關(guān)?節(jié) conflation),可能需要執(zhí)?計算,這依賴于相應(yīng) 流的最新值,并且每當上游流產(chǎn)?值的時候都需要重新計算。這種相應(yīng)的操作符家族稱為 combine。 例如,先前?例中的數(shù)字如果每 300 毫秒更新?次,但字符串每 400 毫秒更新?次,然后使? zip 操作 符合并它們,但仍會產(chǎn)?相同的結(jié)果,盡管每 400 毫秒打印?次結(jié)果:

      val nums = (1..3).asFlow().onEach { delay(300) } // 發(fā)射數(shù)字 1..3,間隔 300 毫秒
      val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發(fā)射?次字符串
      val startTime = System.currentTimeMillis() // 記錄開始的時間
      nums.zip(strs) { a, b -> "$a -> $b" } // 使?“zip”組合單個字符串
          .collect { value -> // 收集并打印
              println("$value at ${System.currentTimeMillis() - startTime} ms from start")
      }
      

        然?,當在這?使? combine 操作符來替換 zip:

      val nums = (1..3).asFlow().onEach { delay(300) } // 發(fā)射數(shù)字 1..3,間隔 300 毫秒
      val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發(fā)射?次字符串
      val startTime = System.currentTimeMillis() // 記錄開始的時間
      nums.combine(strs) { a, b -> "$a -> $b" } // 使?“combine”組合單個字符串
          .collect { value -> // 收集并打印
              println("$value at ${System.currentTimeMillis() - startTime} ms from start")
      }
      

        我們得到了完全不同的輸出,其中,nums 或 strs 流中的每次發(fā)射都會打印??:

      1 -> one at 452 ms from start
      2 -> one at 651 ms from start
      2 -> two at 854 ms from start
      3 -> two at 952 ms from start
      3 -> three at 1256 ms from start
      

        

      展平流:流表?異步接收的值序列,所以很容易遇到這樣的情況:每個值都會觸發(fā)對另?個值序列的請求。?如 說,我們可以擁有下?這樣?個返回間隔 500 毫秒的兩個字符串流的函數(shù):

      fun requestFlow(i: Int): Flow<String> = flow {
          emit("$i: First")
          delay(500) // 等待 500 毫秒
          emit("$i: Second")
      }
      

        現(xiàn)在,如果我們有?個包含三個整數(shù)的流,并為每個整數(shù)調(diào)? requestFlow ,如下所?:

      (1..3).asFlow().map { requestFlow(it) }
      

        然后我們得到了?個包含流的流( Flow<flow> ),需要將其進?展平為單個流以進?下? 步處理。集合與序列都擁有 flatten 與 flatMap 操作符來做這件事。然?,由于流具有異步的性質(zhì),因此 需要不同的展平模式,為此,存在?系列的流展平操作符。

        flatMapConcat:連接模式由 flatMapConcat 與 flattenConcat 操作符實現(xiàn)。它們是相應(yīng)序列操作符最相近的類似物。它 們在等待內(nèi)部流完成之前開始收集下?個值,如下?的?例所?:

      val startTime = System.currentTimeMillis() // 記錄開始時間
      (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發(fā)射?個數(shù)字
          .flatMapConcat { requestFlow(it) }
          .collect { value -> // 收集并打印
              println("$value at ${System.currentTimeMillis() - startTime} ms from start")
      }
      

        在輸出中可以清楚地看到 flatMapConcat 的順序性質(zhì):

      1: First at 121 ms from start
      1: Second at 622 ms from start
      2: First at 727 ms from start
      2: Second at 1227 ms from start
      3: First at 1328 ms from start
      3: Second at 1829 ms from start
      

        flatMapMerge:另?種展平模式是并發(fā)收集所有傳?的流,并將它們的值合并到?個單獨的流,以便盡快的發(fā)射值。它 由 flatMapMerge 與 flattenMerge 操作符實現(xiàn)。他們都接收可選的?于限制并發(fā)收集的流的個數(shù)的 concurrency 參數(shù)(默認情況下,它等于 DEFAULT_CONCURRENCY)。

      val startTime = System.currentTimeMillis() // 記錄開始時間
      (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發(fā)射?個數(shù)字
          .flatMapMerge { requestFlow(it) }
          .collect { value -> // 收集并打印
              println("$value at ${System.currentTimeMillis() - startTime} ms from start")
      }
      

        flatMapMerge 的并發(fā)性質(zhì)很明顯:

      1: First at 136 ms from start
      2: First at 231 ms from start
      3: First at 333 ms from start
      1: Second at 639 ms from start
      2: Second at 732 ms from start
      3: Second at 833 ms from start
      注意,flatMapMerge 會順序調(diào)?代碼塊(本?例中的 { requestFlow(it) }),但是并發(fā)收集結(jié) 果流,相當于執(zhí)?順序是?先執(zhí)? map { requestFlow(it) }
      然后在其返回結(jié)果上調(diào)? flattenMerge。

        flatMapLatest:與 collectLatest 操作符類似(在"處理最新值" ?節(jié)中已經(jīng)討論過),也有相對應(yīng)的“最新”展平模式,在 發(fā)出新流后?即取消先前流的收集。這由 flatMapLatest 操作符來實現(xiàn)。

      val startTime = System.currentTimeMillis() // 記錄開始時間
      (1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發(fā)射?個數(shù)字
          .flatMapLatest { requestFlow(it) }
          .collect { value -> // 收集并打印
           println("$value at ${System.currentTimeMillis() - startTime} ms from start")
      }
      

        該?例的輸出很好的展?了 flatMapLatest 的?作?式:

      1: First at 142 ms from start
      2: First at 322 ms from start
      3: First at 425 ms from start
      3: Second at 931 ms from start
      注意,flatMapLatest 在?個新值到來時取消了塊中的所有代碼 (本?例中的 { requestFlow(it) })。這在該特定?例中不會有什么區(qū)別,由于調(diào)? requestFlow ??的速度是很快的,
      不會發(fā)?掛起,所以不會被取消。然?,如果我們要在塊中調(diào)?諸如 delay 之類的掛 起函數(shù),這將會被表現(xiàn)出來。

        

      流異常:當運算符中的發(fā)射器或代碼拋出異常時,流收集可以帶有異常的完成。有?種處理異常的?法。

        收集器 try 與 catch:收集者可以使? Kotlin 的 try/catch 塊來處理異常:

      fun foo(): Flow<Int> = flow {
          for (i in 1..3) {
              println("Emitting $i")
              emit(i) // 發(fā)射下?個值
          }
      }
      fun main() = runBlocking<Unit> {
          try {
              foo().collect { value ->
                  println(value)
                  check(value <= 1) { "Collected $value" }
              }
          } catch (e: Throwable) {
              println("Caught $e")
          }
      }
      

        這段代碼成功的在末端操作符 collect 中捕獲了異常,并且,如我們所?,在這之后不再發(fā)出任何值:

      Emitting 1
      1
      Emitting 2
      2
      Caught java.lang.IllegalStateException: Collected 2
      

        一切都已捕獲:前?的?例實際上捕獲了在發(fā)射器或任何過渡或末端操作符中發(fā)?的任何異常。例如,讓我們修改代碼 以便將發(fā)出的值映射為字符串,但是相應(yīng)的代碼會產(chǎn)??個異常:

      fun foo(): Flow<String> =
              flow {
                  for (i in 1..3) {
                      println("Emitting $i")
                      emit(i) // 發(fā)射下?個值
                  }
              }.map { value ->
                  check(value <= 1) { "Crashed on $value" }
                  "string $value"
              }
      fun main() = runBlocking<Unit> {
          try {
              foo().collect { value -> println(value) }
          } catch (e: Throwable) {
              println("Caught $e")
          }
      }
      

        仍然會捕獲該異常并停?收集:

      Emitting 1
      string 1
      Emitting 2
      Caught java.lang.IllegalStateException: Crashed on 2
      

        

      異常透明性:發(fā)射器的代碼如何封裝其異常處理?為?

        流必須對異常透明,即在 flow { ... } 構(gòu)建器內(nèi)部的 try/catch 塊中發(fā)射值是違反異常透明性 的。這樣可以保證收集器拋出的?個異常能被像先前?例中那樣的 try/catch 塊捕獲。

        發(fā)射器可以使? catch 操作符來保留此異常的透明性并允許封裝它的異常處理。catch 操作符的代碼塊 可以分析異常并根據(jù)捕獲到的異常以不同的?式對其做出反應(yīng):

      可以使? throw 重新拋出異常。
      可以使? catch 代碼塊中的 emit 將異常轉(zhuǎn)換為值發(fā)射出去。
      可以將異常忽略,或??志打印,或使??些其他代碼處理它
      

        例如,讓我們在捕獲異常的時候發(fā)射?本:

      foo()
          .catch { e -> emit("Caught $e") } // 發(fā)射?個異常
          .collect { value -> println(value) }
      

        即使我們不再在代碼的外層使? try/catch,?例的輸出也是相同的。

        透明捕獲:catch 過渡操作符遵循異常透明性,僅捕獲上游異常( catch 操作符上游的異常,但是它下?的不是)。 如果 collect { ... } 塊(位于 catch 之下)拋出?個異常,那么異常會逃逸:

      fun foo(): Flow<Int> = flow {
          for (i in 1..3) {
              println("Emitting $i")
              emit(i)
          }
      }
      fun main() = runBlocking<Unit> {
          foo()
               .catch { e -> println("Caught $e") } // 不會捕獲下游異常
               .collect { value ->
                   check(value <= 1) { "Collected $value" }
                   println(value)
                }
      }
      

        盡管有 catch 操作符,但不會打印“Caught …”消息:

        聲明式捕獲:我們可以將 catch 操作符的聲明性與處理所有異常的期望相結(jié)合,將 collect 操作符的代碼塊移動到 onEach 中,并將其放到 catch 操作符之前。收集該流必須由調(diào)??參的 collect() 來觸發(fā):

      foo()
          .onEach { value ->
              check(value <= 1) { "Collected $value" }
              println(value)
          }
          .catch { e -> println("Caught $e") }
          .collect()
      

        現(xiàn)在我們可以看到已經(jīng)打印了“Caught …”消息,并且我們可以在沒有顯式使? try/catch 塊的情況 下捕獲所有異常:

       

      流完成:

        當流收集完成時(普通情況或異常情況),它可能需要執(zhí)??個動作。你可能已經(jīng)注意到,它可以通過兩 種?式完成:命令式或聲明式。

        命令式finally塊:除了 try / catch 之外,收集器還能使? finally 塊在 collect 完成時執(zhí)??個動作

        

      fun foo(): Flow<Int> = (1..3).asFlow()
      fun main() = runBlocking<Unit> {
          try {
              foo().collect { value -> println(value) }
          } finally {
              println("Done")
          }
      }
      

        這段代碼打印出 foo() 流產(chǎn)?的三個數(shù)字,后?跟?個“Done”字符串:

      1
      2
      3
      Done
      

        聲明式處理:對于聲明式,流擁有 onCompletion 過渡操作符,它在流完全收集時調(diào)???梢允? onCompletion 操作符重寫前?的?例,并產(chǎn)?相同的輸出:

      foo()
          .onCompletion { println("Done") }
          .collect { value -> println(value) }
      

        onCompletion 的主要優(yōu)點是其 lambda 表達式的可空參數(shù) Throwable 可以?于確定流收集是正常 完成還是有異常發(fā)?。在下?的?例中 foo() 流在發(fā)射數(shù)字 1 之后拋出了?個異常:

      fun foo(): Flow<Int> = flow {
          emit(1)
          throw RuntimeException()
      }
      fun main() = runBlocking<Unit> {
          foo()
               .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
                     .catch { cause -> println("Caught exception") }
                     .collect { value -> println(value) }
      }
      

        如你所期望的,它打印了:

      1
      Flow completed exceptionally
      Caught exception
      

        onCompletion 操作符與 catch 不同,它不處理異常。我們可以看到前?的?例代碼,異常仍然流向下 游。它將被提供給后?的 onCompletion 操作符,并可以由 catch 操作符處理。

        成功完成:與 catch 操作符的另?個不同點是 onCompletion 能觀察到所有異常并且僅在上游流成功完成(沒有 取消或失?。┑那闆r下接收?個 null 異常。

      fun foo(): Flow<Int> = (1..3).asFlow()
      fun main() = runBlocking<Unit> {
          foo()
                  .onCompletion { cause -> println("Flow completed with $cause") }
                  .collect { value ->
                      check(value <= 1) { "Collected $value" }
                      println(value)
                  }
      }
      

        我們可以看到完成時 cause 不為空,因為流由于下游異常?中?:

      1
      Flow completed with java.lang.IllegalStateException: Collected 2
      Exception in thread "main" java.lang.IllegalStateException: Collected 2
      

        

      命令式還是聲明式

        現(xiàn)在我們知道如何收集流,并以命令式與聲明式的?式處理其完成及異常情況。這?有?個很?然的問 題是,哪種?式應(yīng)該是?選的?為什么?作為?個庫,我們不主張采?任何特定的?式,并且相信這兩種 選擇都是有效的,應(yīng)該根據(jù)??的喜好與代碼?格進?選擇。

       

      啟動流

        使?流表?來??些源的異步事件是很簡單的。在這個案例中,我們需要?個類似 addEventListener 的函數(shù),該函數(shù)注冊?段響應(yīng)的代碼處理即將到來的事件,并繼續(xù)進?進?步的 處理。onEach 操作符可以擔任該??。然?,onEach 是?個過渡操作符。我們也需要?個末端操作符 來收集流。否則僅調(diào)? onEach 是?效的。 如果我們在 onEach 之后使? collect 末端操作符,那么后?的代碼會?直等待直?流被收集

      // 模仿事件流
      fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
      fun main() = runBlocking<Unit> {
          events()
                  .onEach { event -> println("Event: $event") }
                  .collect() // <--- 等待流收集
          println("Done")
      }
      

        你可以看到它的輸出:

      Event: 1
      Event: 2
      Event: 3
      Done
      

        launchIn 末端操作符可以在這?派上?場。使? launchIn 替換 collect 我們可以在單獨的協(xié)程 中啟動流的收集,這樣就可以?即繼續(xù)進?步執(zhí)?代碼:

      fun main() = runBlocking<Unit> {
          events()
                  .onEach { event -> println("Event: $event") }
                  .launchIn(this) // <--- 在單獨的協(xié)程中執(zhí)?流
          println("Done")
      }
      

        它打印了:

      Done
      Event: 1
      Event: 2
      Event: 3
      

        launchIn 必要的參數(shù) CoroutineScope 指定了?哪?個協(xié)程來啟動流的收集。在先前的?例中這個 作?域來? runBlocking 協(xié)程構(gòu)建器,在這個流運?的時候,runBlocking 作?域等待它的?協(xié)程執(zhí)? 完畢并防? main 函數(shù)返回并終?此?例。 在實際的應(yīng)?中,作?域來?于?個壽命有限的實體。在該實體的壽命終?后,相應(yīng)的作?域就會被取 消,即取消相應(yīng)流的收集。這種成對的 onEach { ... }.launchIn(scope) ?作?式就像 addEventListener ?樣。?且,這不需要相應(yīng)的 removeEventListener 函數(shù),因為取消與結(jié)構(gòu) 化并發(fā)可以達成這個?的。 注意,launchIn 也會返回?個 Job,可以在不取消整個作?域的情況下僅取消相應(yīng)的流收集或?qū)ζ溥M ? join。

       

      流(Flow)與響應(yīng)式流(Reactive Streams)

        對于熟悉響應(yīng)式流(Reactive Streams)或諸如 RxJava 與 Project Reactor 這樣的響應(yīng)式框架的?來 說,F(xiàn)low 的設(shè)計也許看起來會?常熟悉。 確實,其設(shè)計靈感來源于響應(yīng)式流以及其各種實現(xiàn)。但是 Flow 的主要?標是擁有盡可能簡單的設(shè)計, 對 Kotlin 以及掛起友好且遵從結(jié)構(gòu)化并發(fā)。沒有響應(yīng)式的先驅(qū)及他們?量的?作,就不可能實現(xiàn)這?? 標。你可以閱讀 Reactive Streams and Kotlin Flows 這篇?章來了解完成 Flow 的故事。 雖然有所不同,但從概念上講,F(xiàn)low 依然是響應(yīng)式流,并且可以將它轉(zhuǎn)換為響應(yīng)式(規(guī)范及符合 TCK)的 發(fā)布者(Publisher),反之亦然。這些開箱即?的轉(zhuǎn)換器可以在 kotlinx.coroutines 提供的相關(guān)響 應(yīng)式模塊( kotlinx-coroutines-reactive ?于 Reactive Streams,kotlinx-coroutinesreactor ?于 Project Reactor,以及 kotlinx-coroutines-rx2 / kotlinx-coroutinesrx3 ?于 RxJava2/RxJava3)中找到。集成模塊包含 Flow 與其他實現(xiàn)之間的轉(zhuǎn)換,與 Reactor 的 Context 集成以及與?系列響應(yīng)式實體配合使?的掛起友好的使??式。

       

      posted @ 2021-03-17 21:47  王世楨  閱讀(631)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲人成在线观看网站不卡| 国产成人一区二区免av| 自偷自拍亚洲综合精品| 不卡高清AV手机在线观看 | 蜜桃av色偷偷av老熟女| 亚洲综合av男人的天堂| 亚洲av区一区二区三区| 国产老头多毛Gay老年男| 四虎永久精品在线视频| 久久久久久久久18禁秘| 粗壮挺进邻居人妻无码| 武威市| 蜜臀一区二区三区精品免费| 国产草草影院ccyycom| 清远市| 久久国产成人午夜av影院| 国产成人无码免费看视频软件| 辛集市| 色色97| 日韩放荡少妇无码视频| 99久久无码私人网站| 玖玖在线精品免费视频| 性色a∨精品高清在线观看| 精品综合久久久久久97| 日韩人妖精品一区二区av| 精品无码国产日韩制服丝袜| 国产精品日韩av在线播放| 美女爽到高潮嗷嗷嗷叫免费网站| 国产成人亚洲精品狼色在线 | a级亚洲片精品久久久久久久| 国产乱码精品一区二三区| 国产成人高清精品亚洲一区| 九九热精彩视频在线免费| 亚洲欧美综合精品成人网站| 中文字幕少妇人妻精品| 银川市| 免费可以在线看a∨网站| 伊人久久精品一区二区三区| 国产乱码精品一区二区三| 成人综合人人爽一区二区| 日韩一区二区三区女优丝袜|