kotlin協(xié)程——>異步流
異步流
掛起函數(shù)可以異步的返回單個值,但是該如何異步返回多個計算好的值呢?這正是 Kotlin 流(Flow)的 ?武之地。
表示多個值
在 Kotlin 中可以使?集合來表?多個值。?如說,我們可以擁有?個函數(shù) foo() ,它返回?個包含三 個數(shù)字的 List,然后使? forEach 打印它們:
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
這段代碼輸出如下:
1 2 3
序列:如果使??些消耗 CPU 資源的阻塞代碼計算數(shù)字(每次計算需要 100 毫秒)那么我們可以使? Sequence 來表?數(shù)字:
fun foo(): Sequence<Int> = sequence { // 序列構(gòu)建器
for (i in 1..3) {
Thread.sleep(100) // 假裝我們正在計算
yield(i) // 產(chǎn)?下?個值
}
}
fun main() {
foo().forEach { value -> println(value) }
}
這段代碼輸出相同的數(shù)字,但在打印每個數(shù)字之前等待 100 毫秒。
掛起函數(shù):然?,計算過程阻塞運?該代碼的主線程。當這些值由異步代碼計算時,我們可以使? suspend 修飾 符標記函數(shù) foo ,這樣它就可以在不阻塞的情況下執(zhí)?其?作并將結(jié)果作為列表返回:
suspend fun foo(): List<Int> {
delay(1000) // 假裝我們在這?做了?些異步的事情
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
foo().forEach { value -> println(value) }
}
這段代碼將會在等待?秒之后打印數(shù)字。
流:使? List 結(jié)果類型,意味著我們只能?次返回所有值。為了表?異步計算的值流(stream),我們可以使 ? Flow 類型(正如同步計算值會使? Sequence 類型):
foo(): Flow<Int> = flow { // 流構(gòu)建器
for (i in 1..3) {
delay(100) // 假裝我們在這?做了?些有?的事情
emit(i) // 發(fā)送下?個值
}
main() = runBlocking<Unit> {
// 啟動并發(fā)的協(xié)程以驗證主線程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集這個流
foo().collect { value -> println(value) }
這段代碼在不阻塞主線程的情況下每等待 100 毫秒打印?個數(shù)字。在主線程中運??個單獨的協(xié)程每 100 毫秒打印?次“I'm not blocked”已經(jīng)經(jīng)過了驗證。
I'm not blocked 1 1 I'm not blocked 2 2 I'm not blocked 3 3
注意使? Flow 的代碼與先前?例的下述區(qū)別:
名為 flow 的 Flow 類型構(gòu)建器函數(shù)。
flow { ... } 構(gòu)建塊中的代碼可以掛起。
函數(shù) foo() 不再標有 suspend 修飾符。
流使? emit 函數(shù) 發(fā)射 值。
流使? collect 函數(shù) 收集 值。
流是冷的
Flow 是?種類似于序列的冷流 — 這段 flow 構(gòu)建器中的代碼直到流被收集的時候才運?。這在以下的 ?例中?常明顯:
foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
打印如下:
Calling foo... Calling collect... Flow started 1 2 3 Calling collect again... Flow started 1 2 3
這是返回?個流的 foo() 函數(shù)沒有標記 suspend 修飾符的主要原因。通過它??,foo() 會盡快 返回且不會進?任何等待。該流在每次收集的時候啟動,這就是為什么當我們再次調(diào)? collect 時我 們會看到“Flow started”。
流取消
流采?與協(xié)程同樣的協(xié)作取消。然?,流的基礎(chǔ)設(shè)施未引?其他取消點。取消完全透明。像往常?樣,流 的收集可以在當流在?個可取消的掛起函數(shù)(例如 delay)中掛起的時候取消,否則不能取消。 下?的?例展?了當 withTimeoutOrNull 塊中代碼在運?的時候流是如何在超時的情況下取消并停 ?執(zhí)?其代碼的:
foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 在 250 毫秒后超時
foo().collect { value -> println(value) }
}
println("Done")
注意,在 foo() 函數(shù)中流僅發(fā)射兩個數(shù)字,產(chǎn)?以下輸出:
Emitting 1 1 Emitting 2 2 Done
流的構(gòu)建起
先前?例中的 flow { ... } 構(gòu)建器是最基礎(chǔ)的?個。還有其他構(gòu)建器使流的聲明更簡單:
flowOf 構(gòu)建器定義了?個發(fā)射固定值集的流。 使? .asFlow() 擴展函數(shù),可以將各種集合與序列轉(zhuǎn)換為流。
因此,從流中打印從 1 到 3 的數(shù)字的?例可以寫成:
// 將?個整數(shù)區(qū)間轉(zhuǎn)化為流
(1..3).asFlow().collect { value -> println(value) }
過渡流操作符
可以使?操作符轉(zhuǎn)換流,就像使?集合與序列?樣。過渡操作符應(yīng)?于上游流,并返回下游流。這些操 作符也是冷操作符,就像流?樣。這類操作符本?不是掛起函數(shù)。它運?的速度很快,返回新的轉(zhuǎn)換流的 定義。
基礎(chǔ)的操作符擁有相似的名字,?如 map 與 filter。流與序列的主要區(qū)別在于這些操作符中的代碼可以 調(diào)?掛起函數(shù)。
舉例來說,?個請求中的流可以使? map 操作符映射出結(jié)果,即使執(zhí)??個?時間的請求操作也可以 使?掛起函數(shù)來實現(xiàn):
end fun performRequest(request: Int): String {
delay(1000) // 模仿?時間運?的異步?作
return "response $request"
main() = runBlocking<Unit> {
(1..3).asFlow() // ?個請求流
.map { request -> performRequest(request) }
.collect { response -> println(response) }
它產(chǎn)?以下三?,每??每秒出現(xiàn)?次:
response 1 response 2 response 3
轉(zhuǎn)換操作符:在流轉(zhuǎn)換操作符中,最通?的?種稱為 transform。它可以?來模仿簡單的轉(zhuǎn)換,例如 map 與 filter,以 及實施更復(fù)雜的轉(zhuǎn)換。使? transform 操作符,我們可以 發(fā)射 任意值任意次。
?如說,使? transform 我們可以在執(zhí)??時間運?的異步請求之前發(fā)射?個字符串并跟蹤這個響應(yīng):
(1..3).asFlow() // ?個請求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
這段代碼的輸出如下
Making request 1 response 1 Making request 2 response 2 Making request 3 response 3
限長操作符:限?過渡操作符(例如 take)在流觸及相應(yīng)限制的時候會將它的執(zhí)?取消。協(xié)程中的取消操作總是通過 拋出異常來執(zhí)?,這樣所有的資源管理函數(shù)(如 try {...} finally {...} 塊)會在取消的情況下 正常運?:
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 只獲取前兩個
.collect { value -> println(value) }
}
這段代碼的輸出清楚地表明,numbers() 函數(shù)中對 flow {...} 函數(shù)體的執(zhí)?在發(fā)射出第?個數(shù) 字后停?:
1 2 Finally in numbers
末端流操作符
末端操作符是在流上?于啟動流收集的掛起函數(shù)。collect 是最基礎(chǔ)的末端操作符,但是還有另外?些 更?便使?的末端操作符:
轉(zhuǎn)化為各種集合,例如 toList 與 toSet。 獲取第?個(first)值與確保流發(fā)射單個(single)值的操作符。 使? reduce 與 fold 將流規(guī)約到單個值。
舉例來說:
val sum = (1..5).asFlow()
.map { it * it } // 數(shù)字 1 ? 5 的平?
.reduce { a, b -> a + b } // 求和(末端操作符)
println(sum)
打印單個數(shù)字:
55
流是連續(xù)的
流的每次單獨收集都是按順序執(zhí)?的,除?進?特殊操作的操作符使?多個流。該收集過程直接在協(xié)程 中運?,該協(xié)程調(diào)?末端操作符。默認情況下不啟動新協(xié)程。從上游到下游每個過渡操作符都會處理每 個發(fā)射出的值然后再交給末端操作符。 請參?以下?例,該?例過濾偶數(shù)并將其映射到字符串:
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
執(zhí)行:
Filter 1 Filter 2 Map 2 Collect string 2 Filter 3 Filter 4 Map 4 Collect string 4 Filter 5
流上下文:
流的收集總是在調(diào)?協(xié)程的上下?中發(fā)?。例如,如果有?個流 foo ,然后以下代碼在它的編寫者指定 的上下?中運?,??論流 foo 的實現(xiàn)細節(jié)如何:
withContext(context) {
foo.collect { value ->
println(value) // 運?在指定上下?中
}
}
流的該屬性稱為上下文保存:所以默認的,flow { ... } 構(gòu)建器中的代碼運?在相應(yīng)流的收集器提供的上下?中。舉例來說,考慮 打印線程的 foo 的實現(xiàn),它被調(diào)?并發(fā)射三個數(shù)字
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
運?這段代碼:
[main @coroutine#1] Started foo flow [main @coroutine#1] Collected 1 [main @coroutine#1] Collected 2 [main @coroutine#1] Collected 3
由于 foo().collect 是在主線程調(diào)?的,則 foo 的流主體也是在主線程調(diào)?的。這是快速運?或 異步代碼的理想默認形式,它不關(guān)?執(zhí)?的上下?并且不會阻塞調(diào)?者。
withContext發(fā)出錯誤:然?,?時間運?的消耗 CPU 的代碼也許需要在 Dispatchers.Default 上下?中執(zhí)?,并且更新 UI 的 代碼也許需要在 Dispatchers.Main 中執(zhí)?。通常,withContext ?于在 Kotlin 協(xié)程中改變代碼的上下 ?,但是 flow {...} 構(gòu)建器中的代碼必須遵循上下?保存屬性,并且不允許從其他上下?中發(fā)射 (emit)。
嘗試運?下?的代碼:
fun foo(): Flow<Int> = flow {
// 在流構(gòu)建器中更改消耗 CPU 代碼的上下?的錯誤?式
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // 假裝我們以消耗 CPU 的?式進?計算
emit(i) // 發(fā)射下?個值
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> println(value) }
}
這段代碼產(chǎn)?如下的異常:
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1),
"coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1),
"coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
flowOn 操作符:例外的是 flowOn 函數(shù),該函數(shù)?于更改流發(fā)射的上下?。以下?例展?了更改流上下?的正確?法, 該?例還通過打印相應(yīng)線程的名字以展?它們的?作?式:
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假裝我們以消耗 CPU 的?式進?計算
log("Emitting $i")
emit(i) // 發(fā)射下?個值
}
}.flowOn(Dispatchers.Default) // 在流構(gòu)建器中改變消耗 CPU 代碼上下?的正確?式
fun main() = runBlocking<Unit> {
foo().collect { value ->
log("Collected $value")
}
}
注意,當收集發(fā)?在主線程中,flow { ... } 是如何在后臺線程中?作的: 這?要觀察的另?件事是 flowOn 操作符已改變流的默認順序性?,F(xiàn)在收集發(fā)?在?個協(xié)程中 (“coroutine#1”)?發(fā)射發(fā)?在運?于另?個線程中與收集協(xié)程并發(fā)運?的另?個協(xié)程 (“coroutine#2”)中。當上游流必須改變其上下?中的 CoroutineDispatcher 的時候,flowOn 操作符 創(chuàng)建了另?個協(xié)程。
緩沖:
從收集流所花費的時間來看,將流的不同部分運?在不同的協(xié)程中將會很有幫助,特別是當涉及到?時 間運?的異步操作時。例如,考慮?種情況,foo() 流的發(fā)射很慢,它每花費 100 毫秒才產(chǎn)??個元素;?收集器也?常慢,需要花費 300 毫秒來處理元素。讓我們看看從該流收集三個數(shù)字要花費多?時間:
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假裝我們異步等待了 100 毫秒
emit(i) // 發(fā)射下?個值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
foo().collect { value ->
delay(300) // 假裝我們花費 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
}
它會產(chǎn)?這樣的結(jié)果,整個收集過程?約需要 1200 毫秒(3 個數(shù)字,每個花費 400 毫秒):
1 2 3 Collected in 1220 ms
我們可以在流上使? buffer 操作符來并發(fā)運? foo() 中發(fā)射元素的代碼以及收集的代碼,?不是順序運?它們:
val time = measureTimeMillis {
foo()
.buffer() // 緩沖發(fā)射項,?需等待
.collect { value ->
delay(300) // 假裝我們花費 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
它產(chǎn)?了相同的數(shù)字,只是更快了,由于我們?效地創(chuàng)建了處理流?線,僅僅需要等待第?個數(shù)字產(chǎn)? 的 100 毫秒以及處理每個數(shù)字各需花費的 300 毫秒。這種?式?約花費了 1000 毫秒來運?:
1 2 3 Collected in 1071 ms
注意,當必須更改 CoroutineDispatcher 時,flowOn 操作符使?了相同的緩沖機制,但是我們在 這?顯式地請求緩沖?不改變執(zhí)?上下?。
合并:當流代表部分操作結(jié)果或操作狀態(tài)更新時,可能沒有必要處理每個值,?是只處理最新的那個。在本? 例中,當收集器處理它們太慢的時候,conflate 操作符可以?于跳過中間值。構(gòu)建前?的?例:
val time = measureTimeMillis {
foo()
.conflate() // 合并發(fā)射項,不對每個值進?處理
.collect { value ->
delay(300) // 假裝我們花費 300 毫秒來處理它
println(value)
}
}
println("Collected in $time ms")
我們看到,雖然第?個數(shù)字仍在處理中,但第?個和第三個數(shù)字已經(jīng)產(chǎn)?,因此第?個是 conflated ,只 有最新的(第三個)被交付給收集器:
1 3 Collected in 758 ms
處理最新值:當發(fā)射器和收集器都很慢的時候,合并是加快處理速度的?種?式。它通過刪除發(fā)射值來實現(xiàn)。另?種 ?式是取消緩慢的收集器,并在每次發(fā)射新值的時候重新啟動它。有?組與 xxx 操作符執(zhí)?相同基本 邏輯的 xxxLatest 操作符,但是在新值產(chǎn)?的時候取消執(zhí)?其塊中的代碼。讓我們在先前的?例中嘗 試更換 conflate 為 collectLatest:
val time = measureTimeMillis {
foo()
.collectLatest { value -> // 取消并重新發(fā)射最后?個值
println("Collecting $value")
delay(300) // 假裝我們花費 300 毫秒來處理它
println("Done $value")
}
}
println("Collected in $time ms")
由于 collectLatest 的函數(shù)體需要花費 300 毫秒,但是新值每 100 秒發(fā)射?次,我們看到該代碼塊對每 個值運?,但是只收集最后?個值:
Collecting 1 Collecting 2 Collecting 3 Done 3 Collected in 741 ms
由于 collectLatest 的函數(shù)體需要花費 300 毫秒,但是新值每 100 秒發(fā)射?次,我們看到該代碼塊對每 個值運?,但是只收集最后?個值:
Collecting 1 Collecting 2 Collecting 3 Done 3 Collected in 741 ms
組合多個流,組合多個流有很多種方式
Zip:就像 Kotlin 標準庫中的 Sequence.zip 擴展函數(shù)?樣,流擁有?個 zip 操作符?于組合兩個流中的相關(guān)值:
val nums = (1..3).asFlow() // 數(shù)字 1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) { a, b -> "$a -> $b" } // 組合單個字符串
.collect { println(it) } // 收集并打印
?例打印如下:
1 -> one 2 -> two 3 -> three
Combine:當流表??個變量或操作的最新值時(請參閱相關(guān)?節(jié) conflation),可能需要執(zhí)?計算,這依賴于相應(yīng) 流的最新值,并且每當上游流產(chǎn)?值的時候都需要重新計算。這種相應(yīng)的操作符家族稱為 combine。 例如,先前?例中的數(shù)字如果每 300 毫秒更新?次,但字符串每 400 毫秒更新?次,然后使? zip 操作 符合并它們,但仍會產(chǎn)?相同的結(jié)果,盡管每 400 毫秒打印?次結(jié)果:
val nums = (1..3).asFlow().onEach { delay(300) } // 發(fā)射數(shù)字 1..3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發(fā)射?次字符串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.zip(strs) { a, b -> "$a -> $b" } // 使?“zip”組合單個字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
然?,當在這?使? combine 操作符來替換 zip:
val nums = (1..3).asFlow().onEach { delay(300) } // 發(fā)射數(shù)字 1..3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發(fā)射?次字符串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.combine(strs) { a, b -> "$a -> $b" } // 使?“combine”組合單個字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
我們得到了完全不同的輸出,其中,nums 或 strs 流中的每次發(fā)射都會打印??:
1 -> one at 452 ms from start 2 -> one at 651 ms from start 2 -> two at 854 ms from start 3 -> two at 952 ms from start 3 -> three at 1256 ms from start
展平流:流表?異步接收的值序列,所以很容易遇到這樣的情況:每個值都會觸發(fā)對另?個值序列的請求。?如 說,我們可以擁有下?這樣?個返回間隔 500 毫秒的兩個字符串流的函數(shù):
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
現(xiàn)在,如果我們有?個包含三個整數(shù)的流,并為每個整數(shù)調(diào)? requestFlow ,如下所?:
(1..3).asFlow().map { requestFlow(it) }
然后我們得到了?個包含流的流( Flow<flow> ),需要將其進?展平為單個流以進?下? 步處理。集合與序列都擁有 flatten 與 flatMap 操作符來做這件事。然?,由于流具有異步的性質(zhì),因此 需要不同的展平模式,為此,存在?系列的流展平操作符。
flatMapConcat:連接模式由 flatMapConcat 與 flattenConcat 操作符實現(xiàn)。它們是相應(yīng)序列操作符最相近的類似物。它 們在等待內(nèi)部流完成之前開始收集下?個值,如下?的?例所?:
val startTime = System.currentTimeMillis() // 記錄開始時間
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發(fā)射?個數(shù)字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
在輸出中可以清楚地看到 flatMapConcat 的順序性質(zhì):
1: First at 121 ms from start 1: Second at 622 ms from start 2: First at 727 ms from start 2: Second at 1227 ms from start 3: First at 1328 ms from start 3: Second at 1829 ms from start
flatMapMerge:另?種展平模式是并發(fā)收集所有傳?的流,并將它們的值合并到?個單獨的流,以便盡快的發(fā)射值。它 由 flatMapMerge 與 flattenMerge 操作符實現(xiàn)。他們都接收可選的?于限制并發(fā)收集的流的個數(shù)的 concurrency 參數(shù)(默認情況下,它等于 DEFAULT_CONCURRENCY)。
val startTime = System.currentTimeMillis() // 記錄開始時間
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發(fā)射?個數(shù)字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
flatMapMerge 的并發(fā)性質(zhì)很明顯:
1: First at 136 ms from start 2: First at 231 ms from start 3: First at 333 ms from start 1: Second at 639 ms from start 2: Second at 732 ms from start 3: Second at 833 ms from start
注意,flatMapMerge 會順序調(diào)?代碼塊(本?例中的 { requestFlow(it) }),但是并發(fā)收集結(jié) 果流,相當于執(zhí)?順序是?先執(zhí)? map { requestFlow(it) }
然后在其返回結(jié)果上調(diào)? flattenMerge。
flatMapLatest:與 collectLatest 操作符類似(在"處理最新值" ?節(jié)中已經(jīng)討論過),也有相對應(yīng)的“最新”展平模式,在 發(fā)出新流后?即取消先前流的收集。這由 flatMapLatest 操作符來實現(xiàn)。
val startTime = System.currentTimeMillis() // 記錄開始時間
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒發(fā)射?個數(shù)字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
該?例的輸出很好的展?了 flatMapLatest 的?作?式:
1: First at 142 ms from start 2: First at 322 ms from start 3: First at 425 ms from start 3: Second at 931 ms from start
注意,flatMapLatest 在?個新值到來時取消了塊中的所有代碼 (本?例中的 { requestFlow(it) })。這在該特定?例中不會有什么區(qū)別,由于調(diào)? requestFlow ??的速度是很快的,
不會發(fā)?掛起,所以不會被取消。然?,如果我們要在塊中調(diào)?諸如 delay 之類的掛 起函數(shù),這將會被表現(xiàn)出來。
流異常:當運算符中的發(fā)射器或代碼拋出異常時,流收集可以帶有異常的完成。有?種處理異常的?法。
收集器 try 與 catch:收集者可以使? Kotlin 的 try/catch 塊來處理異常:
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 發(fā)射下?個值
}
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
這段代碼成功的在末端操作符 collect 中捕獲了異常,并且,如我們所?,在這之后不再發(fā)出任何值:
Emitting 1 1 Emitting 2 2 Caught java.lang.IllegalStateException: Collected 2
一切都已捕獲:前?的?例實際上捕獲了在發(fā)射器或任何過渡或末端操作符中發(fā)?的任何異常。例如,讓我們修改代碼 以便將發(fā)出的值映射為字符串,但是相應(yīng)的代碼會產(chǎn)??個異常:
fun foo(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 發(fā)射下?個值
}
}.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
仍然會捕獲該異常并停?收集:
Emitting 1 string 1 Emitting 2 Caught java.lang.IllegalStateException: Crashed on 2
異常透明性:發(fā)射器的代碼如何封裝其異常處理?為?
流必須對異常透明,即在 flow { ... } 構(gòu)建器內(nèi)部的 try/catch 塊中發(fā)射值是違反異常透明性 的。這樣可以保證收集器拋出的?個異常能被像先前?例中那樣的 try/catch 塊捕獲。
發(fā)射器可以使? catch 操作符來保留此異常的透明性并允許封裝它的異常處理。catch 操作符的代碼塊 可以分析異常并根據(jù)捕獲到的異常以不同的?式對其做出反應(yīng):
可以使? throw 重新拋出異常。 可以使? catch 代碼塊中的 emit 將異常轉(zhuǎn)換為值發(fā)射出去。 可以將異常忽略,或??志打印,或使??些其他代碼處理它
例如,讓我們在捕獲異常的時候發(fā)射?本:
foo()
.catch { e -> emit("Caught $e") } // 發(fā)射?個異常
.collect { value -> println(value) }
即使我們不再在代碼的外層使? try/catch,?例的輸出也是相同的。
透明捕獲:catch 過渡操作符遵循異常透明性,僅捕獲上游異常( catch 操作符上游的異常,但是它下?的不是)。 如果 collect { ... } 塊(位于 catch 之下)拋出?個異常,那么異常會逃逸:
fun foo(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo()
.catch { e -> println("Caught $e") } // 不會捕獲下游異常
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
盡管有 catch 操作符,但不會打印“Caught …”消息:
聲明式捕獲:我們可以將 catch 操作符的聲明性與處理所有異常的期望相結(jié)合,將 collect 操作符的代碼塊移動到 onEach 中,并將其放到 catch 操作符之前。收集該流必須由調(diào)??參的 collect() 來觸發(fā):
foo()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
現(xiàn)在我們可以看到已經(jīng)打印了“Caught …”消息,并且我們可以在沒有顯式使? try/catch 塊的情況 下捕獲所有異常:
流完成:
當流收集完成時(普通情況或異常情況),它可能需要執(zhí)??個動作。你可能已經(jīng)注意到,它可以通過兩 種?式完成:命令式或聲明式。
命令式finally塊:除了 try / catch 之外,收集器還能使? finally 塊在 collect 完成時執(zhí)??個動作
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
foo().collect { value -> println(value) }
} finally {
println("Done")
}
}
這段代碼打印出 foo() 流產(chǎn)?的三個數(shù)字,后?跟?個“Done”字符串:
1 2 3 Done
聲明式處理:對于聲明式,流擁有 onCompletion 過渡操作符,它在流完全收集時調(diào)???梢允? onCompletion 操作符重寫前?的?例,并產(chǎn)?相同的輸出:
foo()
.onCompletion { println("Done") }
.collect { value -> println(value) }
onCompletion 的主要優(yōu)點是其 lambda 表達式的可空參數(shù) Throwable 可以?于確定流收集是正常 完成還是有異常發(fā)?。在下?的?例中 foo() 流在發(fā)射數(shù)字 1 之后拋出了?個異常:
fun foo(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
如你所期望的,它打印了:
1 Flow completed exceptionally Caught exception
onCompletion 操作符與 catch 不同,它不處理異常。我們可以看到前?的?例代碼,異常仍然流向下 游。它將被提供給后?的 onCompletion 操作符,并可以由 catch 操作符處理。
成功完成:與 catch 操作符的另?個不同點是 onCompletion 能觀察到所有異常并且僅在上游流成功完成(沒有 取消或失?。┑那闆r下接收?個 null 異常。
fun foo(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
foo()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
我們可以看到完成時 cause 不為空,因為流由于下游異常?中?:
1 Flow completed with java.lang.IllegalStateException: Collected 2 Exception in thread "main" java.lang.IllegalStateException: Collected 2
命令式還是聲明式
現(xiàn)在我們知道如何收集流,并以命令式與聲明式的?式處理其完成及異常情況。這?有?個很?然的問 題是,哪種?式應(yīng)該是?選的?為什么?作為?個庫,我們不主張采?任何特定的?式,并且相信這兩種 選擇都是有效的,應(yīng)該根據(jù)??的喜好與代碼?格進?選擇。
啟動流
使?流表?來??些源的異步事件是很簡單的。在這個案例中,我們需要?個類似 addEventListener 的函數(shù),該函數(shù)注冊?段響應(yīng)的代碼處理即將到來的事件,并繼續(xù)進?進?步的 處理。onEach 操作符可以擔任該??。然?,onEach 是?個過渡操作符。我們也需要?個末端操作符 來收集流。否則僅調(diào)? onEach 是?效的。 如果我們在 onEach 之后使? collect 末端操作符,那么后?的代碼會?直等待直?流被收集
// 模仿事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- 等待流收集
println("Done")
}
你可以看到它的輸出:
Event: 1 Event: 2 Event: 3 Done
launchIn 末端操作符可以在這?派上?場。使? launchIn 替換 collect 我們可以在單獨的協(xié)程 中啟動流的收集,這樣就可以?即繼續(xù)進?步執(zhí)?代碼:
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- 在單獨的協(xié)程中執(zhí)?流
println("Done")
}
它打印了:
Done Event: 1 Event: 2 Event: 3
launchIn 必要的參數(shù) CoroutineScope 指定了?哪?個協(xié)程來啟動流的收集。在先前的?例中這個 作?域來? runBlocking 協(xié)程構(gòu)建器,在這個流運?的時候,runBlocking 作?域等待它的?協(xié)程執(zhí)? 完畢并防? main 函數(shù)返回并終?此?例。 在實際的應(yīng)?中,作?域來?于?個壽命有限的實體。在該實體的壽命終?后,相應(yīng)的作?域就會被取 消,即取消相應(yīng)流的收集。這種成對的 onEach { ... }.launchIn(scope) ?作?式就像 addEventListener ?樣。?且,這不需要相應(yīng)的 removeEventListener 函數(shù),因為取消與結(jié)構(gòu) 化并發(fā)可以達成這個?的。 注意,launchIn 也會返回?個 Job,可以在不取消整個作?域的情況下僅取消相應(yīng)的流收集或?qū)ζ溥M ? join。
流(Flow)與響應(yīng)式流(Reactive Streams)
對于熟悉響應(yīng)式流(Reactive Streams)或諸如 RxJava 與 Project Reactor 這樣的響應(yīng)式框架的?來 說,F(xiàn)low 的設(shè)計也許看起來會?常熟悉。 確實,其設(shè)計靈感來源于響應(yīng)式流以及其各種實現(xiàn)。但是 Flow 的主要?標是擁有盡可能簡單的設(shè)計, 對 Kotlin 以及掛起友好且遵從結(jié)構(gòu)化并發(fā)。沒有響應(yīng)式的先驅(qū)及他們?量的?作,就不可能實現(xiàn)這?? 標。你可以閱讀 Reactive Streams and Kotlin Flows 這篇?章來了解完成 Flow 的故事。 雖然有所不同,但從概念上講,F(xiàn)low 依然是響應(yīng)式流,并且可以將它轉(zhuǎn)換為響應(yīng)式(規(guī)范及符合 TCK)的 發(fā)布者(Publisher),反之亦然。這些開箱即?的轉(zhuǎn)換器可以在 kotlinx.coroutines 提供的相關(guān)響 應(yīng)式模塊( kotlinx-coroutines-reactive ?于 Reactive Streams,kotlinx-coroutinesreactor ?于 Project Reactor,以及 kotlinx-coroutines-rx2 / kotlinx-coroutinesrx3 ?于 RxJava2/RxJava3)中找到。集成模塊包含 Flow 與其他實現(xiàn)之間的轉(zhuǎn)換,與 Reactor 的 Context 集成以及與?系列響應(yīng)式實體配合使?的掛起友好的使??式。

浙公網(wǎng)安備 33010602011771號