异步流(官方文档)

目录

1、多个值的表示

1.1、使用Sequences

1.2、使用suspend函数

1.3、使用Flow

2、Flow是延迟执行的

3、取消flow的基本使用

4、flow构建器

5、中间流操作符

5.1、变换算子(Transform operator)

5.2、容量限制操作符(Size-limiting operators)

6、终结操作符(Terminal flow operators)

7.flow流是顺序的(Flows are sequential)

8、flow上下文(Flow context)

8.1、错误的使用withContext(Wrong emission withContext)

8.2、flowOn操作符

9、缓冲(Buffering)

9.1、合并(Conflation)

9.2、处理最新的值(Processing the latest value)

10、组合fow(Composing multiple flows)

10.1、Zip

10.2、Combine

11、Flattening flows

11.1、flatMapConcat

11.2、flatMapMerge

11.3、flatMapLatest

12、Flow的异常处理

12.1、由Collector端处理异常

12.2、Everyingthing is caught

13、Exception transparency

13.1、Transparent catch

13.2、Catching declaratively

14、Flow completion

14.1、命令式的try/finally

14.2、声明式的执行

14.3、正常结束(Successful completion)

15、Imperative versus declarative

16、Launching flow

17、检查flow的取消

17.1、Making busy flow cancellable

18、Flow及响应式流


一个suspend函数异步的返回一个值,但是我们怎么返回多个异步计算的值呢?这就是kotlin流(Flows)存在的理由。

1、多个值的表示

在kotlin中可以使用 集合collections来表示多个值,比如我们有个simple函数用于返回一个包含3个值的List,然后使用 forEach把他们打印出来,代码如下:

package com.cool.cleaner.testfun simple(): List<Int> =  listOf(1, 2, 3)fun main(): Unit {simple().forEach { value -> println(value) }
}

下面是输出:

1
2
3Process finished with exit code 0

1.1、使用Sequences

假如我们要获取的值是需要耗时(每个100ms)计算的,那我们可以使用 Sequence来包装我们的值,代码如下:

package com.cool.cleaner.testfun simple(): Sequence<Int> =  sequence {for (i in 1..3) {Thread.sleep(1000)yield(i)//yield next value}
}fun main(): Unit {simple().forEach { value -> println(value) }println("done")
}

输出结果和上面一样,但是在输出每个数字之前都会有1000ms的延时。

1.2、使用suspend函数

不过上面使用Sequence的方式是会阻塞主线程的(很容易看到上面的例子只有simple执行完后才会打印出"done"),当这些值都是异步计算的时候我们可以把simple函数改写成suspend的,然后它就可以非阻塞的计算然后把结果作为一个list返回,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.runBlockingsuspend fun simple(): List<Int> {delay(1000)return listOf(1, 2, 3)
}fun main() = runBlocking<Unit> {simple().forEach { value -> println(value) }println("done")
}

1秒后输出3个数字。

1.3、使用Flow

使用返回类型List<Int>意味着我们只能一次返回所有的值,使用Flow<Int> 可以表示数值流是异步计算的,就像我们使用Sequence<Int>表示同步计算一样,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = flow {for (i in 1..3) {delay(1000)emit(i)//emit next value}
}fun main() = runBlocking<Unit> {// Launch a concurrent coroutine to check if the main thread is blockedlaunch {for (k in 1..3) {println("I'm not blocked $k")delay(1000)}}simple().collect { value -> println(value) }println("done")
}

每隔1000ms打印出一个数值,并且没有阻塞主线程,同时通过运行于主线程中的一个协程每隔1000ms打印 "I'm not blocked"证明确实没有阻塞主线程,输出如下:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
doneProcess finished with exit code 0

下面是在代码中使用 Flow与之前的例子不同的地方:

  1. Flow 类型的构建函数是 flow。
  2. 位于flow { ... } 中的代码可以suspend。
  3. simple 函数不再标记为suspend的。
  4. 从flow中发射数据使用emit函数。
  5. 从flow从收集数据使用collect 函数。

在simple函数的代码块flow { ... } 中,我们可以把delay替换成Thread.sleep,此时你将看到main线程已经阻塞。

2、Flow是延迟执行的

Flow就像sequences一样,位于flow 构建者内部的代码只有在flow上调用collect的时候才会执行,下面的代码很容易证明这点:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = flow {println("Flow started")for (i in 1..3) {delay(1000)emit(i)//emit next value}
}fun main() = runBlocking<Unit> {println("Calling simple function...")val flow = simple()println("Calling collect...")flow.collect { value -> println(value) }println("Calling collect again...")flow.collect { value -> println(value) }
}

下面是输出:

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3Process finished with exit code 0

这是simple函数(返回一个flow流)没有用suspend修饰的原因,simple()本身很快就会返回而不用等待任何东西;每次在flow上调用collect的时候flow都会开始执行,这就是为什么当我们再次调用collect的时候会看到输出 "Flow started"。

3、取消flow的基本使用

flow会与协程的协作式取消绑定在一起,通常情况下当flow挂起在(suspend)一个可以取消的suspend函数(比如 delay)中的时候,flow集合也是可以取消的。下面的代码展示了当位于withTimeoutOrNull 中的代码因为超时而阻塞并且停止执行的时候它是如何取消flow的执行的:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNullfun simple(): Flow<Int> = flow {for (i in 1..3) {delay(100)println("Emitting $i")emit(i)//emit next value}
}fun main() = runBlocking<Unit> {withTimeoutOrNull(250) {//250ms后超时返回simple().collect { value -> println(value) }}println("Done")
}

请注意在simple函数中仅有2个数字从flow中发射出来(其实我只看到输出了1,你可以试试),输出如下:

Emitting 1
1
Emitting 2
2
Done

4、flow构建器

上面使用的flow { ... }构建器是最基本的一个,还有其他更简单的方式可以声明异步流flows:

  1. flowOf 构建器发射固定的一些值。
  2. 不同的集合(collection)和序列(sequence)可以调用扩展函数.asFlow转换成flow。

因此从一个flow中打印出1到3的例子可以如下编写:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {(1..3).asFlow().collect { value -> println(value) }
}

5、中间流操作符

flow可以通过操作符来转换,就像你将要使用的collection和sequence一样。中间操作符会作用于一个上游的流并返回一个下游的流,这些中间操作符就像flow一样都是惰性的。对这样一个操作符的调用本身不属于挂起函数,它很快返回一个经过变换的flow。

基本的操作符都有一些熟悉的名字,比如 map 和 filter,它和sequence的最重要的区别就是这些操作符里面可以调用suspend函数。

比如,即使一个要映射到的请求是由suspend实现的耗时操作,flow也可以把进来的请求映射到这样的一系列耗时请求上,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlockingsuspend fun performRequest(request: Int): String {delay(1000)return "response $request"
}fun main() = runBlocking<Unit> {(1..3).asFlow().map { request -> performRequest(request) }.collect { response -> println(response) }
}

每隔一秒输出一行,输出如下:

response 1
response 2
response 3Process finished with exit code 0

5.1、变换算子(Transform operator)

在flow变换的一系列操作符中,其中最基础的就是transform,它可以用来模仿一些简单的变换,比如: map 和 filter,当然也可以用来实现一些复杂的变换;使用transform 操作符我们可以对任意值发射任意次数。

举个例子,使用transform 我们可以在运行异步耗时任务请求之前发射一个字符串,然后才是发射耗时任务的执行结果,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.runBlockingsuspend fun performRequest(request: Int): String {delay(1000)return "response $request"
}fun main() = runBlocking<Unit> {(1..3).asFlow().transform { request ->emit("Making request $request")emit(performRequest(request))}.collect { response -> println(response) }
}

下面是输出结果:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3Process finished with exit code 0

5.2、容量限制操作符(Size-limiting operators)

类似于take 的容量限制操作符会在达到相应阈值时取消flow的执行,由于协程中的取消是用跑异常来表示的,因此你可以在取消操作的情况下使用所有的资源管理功能操作(比如代码块try { ... } finally { ... }),代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlockingfun numbers(): Flow<Int> = flow {try {emit(1)emit(2)println("This line will not execute")emit(3)} finally {println("Finally in numbers")}
}fun main() = runBlocking<Unit> {numbers().take(2).collect { number -> println(number) }
}

从输出结果中可以看出在函数numbers()中的代码块flow { ... }在发射两个数据后就停止发射数据了,输出如下:

1
2
Finally in numbersProcess finished with exit code 0

6、终结操作符(Terminal flow operators)

在flow上的终结符操作符都是suspend函数,它会开始在flow上执行collection;操作符 collect是最基本的一个终结操作符,但是还有其他一些更方便的终结操作符,比如:

  1. 转化为其它集合的方法比如:toList、toSet。
  2. 获取第一个元素的first 操作符以及确保只发射一个元素的single操作符。
  3. reduce一个flow的操作符比如 reduce 和 fold。

举个例子,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }println(sum)
}

只打印出一个结果,如下:

55Process finished with exit code 0

7.flow流是顺序的(Flows are sequential)

除非使用了操作多个flow的操作符否则每一次的collection动作都是顺序的(处理了flow中的第一个元素然后第二个元素,依次下去),默认情况下collection操作运行在调用终结操作符的协程,而不会启动新的协程。每一个从flow中发射出来的数据都会从上游到下游经过中间操作符的处理最后传送给终结操作符。

请看下面的例子,首页过滤掉奇数,然后再映射到字符串:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {(1..5).asFlow().filter {println("Filter $it")it % 2 == 0}.map {println("Map $it")"string $it"}.collect {println("Collect $it")}
}

输出如下:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5Process finished with exit code 0

8、flow上下文(Flow context)

collection的执行一般都是在调用collect动作的协程中,比如:假如有一个simple的flow,下面的代码将会执行在withContext指定的上下文中而不管simple是如何实现的。

withContext(context) {simple().collect { value ->println(value) // run in the specified context }
}

这个属性叫做上下文保持。

因此,默认情况下代码块flow { ... }中的代码运行于flow的收集者指定的上下文中。举个例子,下面的simple()函数会打印出运行的线程并且发射3个数据:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlockingfun log(msg: String) = println("[${Thread.currentThread().name}] $msg")fun simple(): Flow<Int> = flow {log("Started simple flow")for (i in 1..3) {emit(i)}
}fun main() = runBlocking<Unit> {simple().collect {value -> log("Collected $value")}
}

输出如下:

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3Process finished with exit code 0

因为simple().collect是从主线程中调用的,所以simple的flow也是运行于主线程中,这对于那些不关心运行上下文且不阻塞调用者的快速运行代码和异步代码来说,这种默认设置非常不错。

8.1、错误的使用withContext(Wrong emission withContext)

耗时的CPU密集型任务可以需要在协程上下文Dispatchers.Default 中运行而UI更新相关的需要在 Dispatchers.Main中执行,在kotlin协程中withContext是用来切换上下文的,但是构建器flow { ... }又得遵循“上下文保持”这个属性因此不允许从不同的上下文发射数据。

你可以运行下面的代码试试:

package com.cool.cleaner.testimport kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContextfun log(msg: String) = println("[${Thread.currentThread().name}] $msg")fun simple(): Flow<Int> = flow {withContext(Dispatchers.Default) {log("Started simple flow")for (i in 1..3) {emit(i)}}
}fun main() = runBlocking<Unit> {simple().collect {value -> log("Collected $value")}
}

你会发现产生如下异常:

[DefaultDispatcher-worker-1 @coroutine#1] Started simple flow
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@31265a9d, BlockingEventLoop@1161555e],but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@68d8d324, Dispatchers.Default].Please refer to 'flow' documentation or use 'flowOn' insteadat kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)at com.cool.cleaner.test.KotTestKt$simple$1$1.invokeSuspend(KotTest.kt:16)(Coroutine boundary)at com.cool.cleaner.test.KotTestKt$simple$1.invokeSuspend(KotTest.kt:13)at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:24)
Caused by: java.lang.IllegalStateException: Flow invariant is violated:Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@31265a9d, BlockingEventLoop@1161555e],but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@68d8d324, Dispatchers.Default].Please refer to 'flow' documentation or use 'flowOn' insteadat kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)at com.cool.cleaner.test.KotTestKt$simple$1$1.invokeSuspend(KotTest.kt:16)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)

8.2、flowOn操作符

异常指出了应该使用flowOn函数来改变发射flow的协程上下文,正确改变协程上下文的例子如下:

package com.cool.cleaner.testimport kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlockingfun log(msg: String) = println("[${Thread.currentThread().name}] $msg")fun simple(): Flow<Int> = flow {log("Started simple flow")for (i in 1..3) {delay(1000)log("Emitting $i")emit(i)}
}.flowOn(Dispatchers.Default)fun main() = runBlocking<Unit> {simple().collect {value -> log("Collected $value")}
}

此时collection发生在主线程而flow { ... }代码则运行在后台线程。

另外一个需要主意的问题是 flowOn 操作符改变了flow的默认有序性,现在collection运行在 ("coroutine#1") 而发射而运行在另外一个协程 ("coroutine#2") ,而这两个协程则同时运行在不同的线程。当一个上游的流需要改变CoroutineDispatcher 的时候flowOn操作符会为它创建一个协程。

9、缓冲(Buffering)

从整体运行时间来看,把一个flow的的不同部分运行在不同的协程是非常有用的,特别是当运行长时间的异步任务时。举个例子,当simple发射flow很慢,比如需要100ms,而collector也很慢,需要花300ms处理一个元素,现在看看收集一个有3个元素的flow需要花多长时间,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillisfun simple(): Flow<Int> = flow {for (i in 1..3) {delay(1000)emit(i)}
}fun main() = runBlocking<Unit> {val time = measureTimeMillis {simple().collect { value ->delay(3000)println(value)}}println("Collected in $time ms")
}

输出如下,所有的处理时间大约12000ms(3个数字,每个花4000ms):

1
2
3
Collected in 12082 msProcess finished with exit code 0

我们可以在flow上使用 buffer 操作符使得simple流的发射代码与收集的代码并发的运行,而不是顺序执行:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillisfun simple(): Flow<Int> = flow {for (i in 1..3) {delay(1000)emit(i)}
}fun main() = runBlocking<Unit> {val time = measureTimeMillis {simple().buffer().collect { value ->delay(3000)println(value)}}println("Collected in $time ms")
}

输出和上面一致,不过由于我们创建了一个高效的处理管道,所以整体上运行得更快;花1000ms等待第一个数字的到来,然后花3000处理每个数据,这样运行时间总共是10000ms左右:

1
2
3
Collected in 10346 msProcess finished with exit code 0

请注意,flowOn操作符会在需要切换CoroutineDispatcher的时候使用同样的缓冲机制,但是这里我们显示的请求缓冲而没有改变协程运行上下文。

9.1、合并(Conflation)

当flow代表部分的操作结果或者状态更新时,此时可能不需要处理所有的数据,而只需要处理最新的数据即可。在这种情况下,如果collector处理过慢那就就可以使用conflate操作符来跳过中间的值,示例如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillisfun simple(): Flow<Int> = flow {for (i in 1..3) {delay(1000)emit(i)}
}fun main() = runBlocking<Unit> {val time = measureTimeMillis {simple().conflate().collect { value ->delay(3000)println(value)}}println("Collected in $time ms")
}

我们可以看到当第一个数据还在被处理的时候,第2个、第3个数据就已经产生了,因此第2个数据就被合并了且只有最新(第3个)的被传递给了collector:

1
3
Collected in 7347 msProcess finished with exit code 0

9.2、处理最新的值(Processing the latest value)

当发送者和处理者都比较慢的时候conflation是一种加快处理的方式,它是通过丢弃发射的数据来达到目的的;另外一种方法是当一个新数据到达的时候重启一个慢的数据处理者。有一类xxxLatest操作符,它们执行与xxx操作符相同的基本逻辑,但在新值上取消其块中的代码执行,例子如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillisfun simple(): Flow<Int> = flow {for (i in 1..3) {delay(1000)emit(i)}
}fun main() = runBlocking<Unit> {val time = measureTimeMillis {simple().collectLatest { value ->println("Collecting $value")delay(3000)println("Done $value")}}println("Collected in $time ms")
}

collectLatest 中的代码需要3000ms来处理数据,但是新数据每1000ms就会到来,你可以看到 collectLatest 块中的代码在新值到来时都会执行但是只有最后一个是完整执行的。

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 6451 msProcess finished with exit code 0

10、组合fow(Composing multiple flows)

有很多方法可以组合多个flow。

10.1、Zip

就像kotlin标准库中的Sequence.zip 扩展函数一样,flow也有一个zip操作符可以用来合并两个flow中的相应数据,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {val nums = (1..3).asFlow()val strs = flowOf("one", "two", "three")nums.zip(strs) {a, b -> "$a -> $b"}.collect { println(it) }
}

下面是输出:

1 -> one
2 -> two
3 -> threeProcess finished with exit code 0

10.2、Combine

当flow代表最新的值或者操作的的时候,如果上游发射了一个值那就需要根据最新的值重新计算,处理这种类型的相应操作符叫做combine。

比如上面的例子,每300ms产生一个数字,但是每400ms才产生一个字符,那使用zip操作符将会始终产生相同的结果,每400ms输出一个两两结合的值。

在这个例子中我们使用操作符 onEach 延时每一个元素,这样使得发射一个flow更清晰和简洁。

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {val nums = (1..3).asFlow().onEach { delay(300) }val strs = flowOf("one", "two", "three").onEach { delay(400) }val startTime = System.currentTimeMillis();nums.zip(strs) { a, b -> "$a -> $b" }.collect { value ->println("$value at ${System.currentTimeMillis() - startTime} ms from start")}
}

然而如果使用 combine操作符,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {val nums = (1..3).asFlow().onEach { delay(300) }val strs = flowOf("one", "two", "three").onEach { delay(400) }val startTime = System.currentTimeMillis();nums.combine(strs) { a, b -> "$a -> $b" }.collect { value ->println("$value at ${System.currentTimeMillis() - startTime} ms from start")}
}

此时将会得到完全不同的输出,当nums或者strs发射一个元素的时候都会得到一行新的输出,结果如下:

1 -> one at 727 ms from start
2 -> one at 943 ms from start
2 -> two at 1129 ms from start
3 -> two at 1244 ms from start
3 -> three at 1532 ms from startProcess finished with exit code 0

11、Flattening flows

Flow代表一系列异步收到的值,所以你很可能遇到一个值又触发产生另一个Flow的情况,比如我们可以使用下面的函数每隔500ms产生一个字符串:

fun requestFlow(i: Int): Flow<String> = flow {emit("$i: First") delay(500) // wait 500 msemit("$i: Second")
}

假如现在我们有一个会发射三个值的Flow,并且像下面这样调用requestFlow:

(1..3).asFlow().map { requestFlow(it) }

此时我们就会得到一个嵌套的Flow (Flow<Flow<String>>),而此时为了方便进一步的处理就需要把它扩展(flatten)成一个单一的Flow。 Collections 和sequences 中的操作符flatten及flatMap就是应对这种情况的;然而由于flow的异步性,将会存在不同形式的flatten,同样对于flow也有一系列的flatten操作符。

11.1、flatMapConcat

连接模式可以使用操作符flatMapConcat 及flattenConcat ,他们会在开始下一个flow之前等待内部的flow结束,如下例子所示:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun requestFlow(i: Int): Flow<String> = flow {emit("$i: First")delay(500)emit("$i: Second")
}fun main() = runBlocking<Unit> {val startTime = System.currentTimeMillis()(1..3).asFlow().onEach { delay(100) }.flatMapConcat { requestFlow(it) }.collect { value ->println("$value at ${System.currentTimeMillis() - startTime} ms from start")}
}

从输出很容易看出 flatMapConcat操作符的有序性:

1: First at 201 ms from start
1: Second at 703 ms from start
2: First at 804 ms from start
2: Second at 1305 ms from start
3: First at 1406 ms from start
3: Second at 1907 ms from startProcess finished with exit code 0

11.2、flatMapMerge

另一种flatten模式是并发的收集flow,然后把他们的值合并到一个flow中,使得flow中的数据可以尽快的发射出去。这种情况可以使用操作符flatMapMerge 和flattenMerge;他们都接收一个可选的concurrency (默认值为DEFAULT_CONCURRENCY )参数,该参数用来限制同时可以收集的flow的最大数量:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun requestFlow(i: Int): Flow<String> = flow {emit("$i: First")delay(500)emit("$i: Second")
}fun main() = runBlocking<Unit> {val startTime = System.currentTimeMillis()(1..3).asFlow().onEach { delay(100) }.flatMapMerge { requestFlow(it) }.collect { value ->println("$value at ${System.currentTimeMillis() - startTime} ms from start")}
}

操作符flatMapMerge 的并发特性可以很明显的从输出中看出来:

1: First at 315 ms from start
2: First at 388 ms from start
3: First at 491 ms from start
1: Second at 816 ms from start
2: Second at 890 ms from start
3: Second at 996 ms from startProcess finished with exit code 0

请注意:本例中flatMapMerge 调用的代码块({ requestFlow(it) } in this example)是顺序的,但是结果的收集是并发的;等价于先调用 map{requestFlow(it)}然后再调用flattenMerge

11.3、flatMapLatest

与在处理最新的值(9.2)使用的collectLatest 操作类似,当新的flow发射值的时候取消前面的flow这种情况也有相应的操作符flatMapLatest,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun requestFlow(i: Int): Flow<String> = flow {emit("$i: First")delay(500)emit("$i: Second")
}fun main() = runBlocking<Unit> {val startTime = System.currentTimeMillis()(1..3).asFlow().onEach { delay(100) }.flatMapLatest { requestFlow(it) }.collect { value ->println("$value at ${System.currentTimeMillis() - startTime} ms from start")}
}

从结果输出中很容易看出 flatMapLatest 是如何工作的。

1: First at 530 ms from start
2: First at 911 ms from start
3: First at 1015 ms from start
3: Second at 1516 ms from startProcess finished with exit code 0

请注意: flatMapLatest 在新值到来的时候会取消所有的代码块的执行(本例中的{ requestFlow(it) } )

12、Flow的异常处理

当操作符中的发射器或者代码发生异常的时候,flow的收集就出现异常,不过你有多种方法可以处理这些异常。

12.1、由Collector端处理异常

在collector端可以使用try/catch 处理异常,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = flow {for (index in 1..3) {println("Emitting $index")emit(index)//emit next value}
}fun main() = runBlocking<Unit> {try {simple().collect { value ->println(value)check(value <= 1) {"Collected $value"}}} catch (e: Throwable) {println("Caught $e")}
}

上面的代码成功的捕获了异常,正如你所见捕获之后就没值再输出了:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2Process finished with exit code 0

12.2、Everyingthing is caught

上面的例子的确捕获了发射器、中间操作符或者终结操作符中抛出的异常;比如下面下面的例子,把int值map到string上,但是会产生相应的异常:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlockingfun simple(): Flow<String> =flow {for (index in 1..3) {println("Emitting $index")emit(index)//emit next value}}.map { value ->check(value <= 1) {"Crashed on $value"}"string $value"}fun main() = runBlocking<Unit> {try {simple().collect { value -> println(value)}} catch (e: Throwable) {println("Caught $e")}
}

这个异常将会被捕获并且停止收集,下面是相应输出:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2Process finished with exit code 0

13、Exception transparency

但是现在发射器如何封装异常捕获的代码呢?

流必须对异常透明,而在flow { ... }代码块中使用try/catch并发射数据是违背异常透明规则的;就像前面的例子一样,这样可以保证在collector端使用try/catch能够正常的捕获到异常。

发射器可以使用 catch操作符处理异常,该操作遵循异常透明规则并且允许封装异常处理;在catch 代码块中可以根据不异常类型作不同的处理:

  • 可以使用throw重新抛出异常。
  • 可以在catch块中使用emit把异常转变为发射数据。
  • 可以忽略异常、打印日志或者作其它处理。

比如,我们可以在捕获异常的时候把异常信息当成数据发射出去,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun simple(): Flow<String> =flow {for (index in 1..3) {println("Emitting $index")emit(index)//emit next value}}.map { value ->check(value <= 1) {"Crashed on $value"}"string $value"}fun main() = runBlocking<Unit> {try {simple().catch { e -> emit("Caught $e") }//emit on exception.collect { value -> println(value)}} catch (e: Throwable) {println("Caught $e")}
}

以下是输出结果,即使不使用try/catch得到的输出结果也和上面的一样:

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2Process finished with exit code 0

13.1、Transparent catch

中间操作符catch遵循透明异常规则,所以它只捕获上游的异常(在catch之上的操作符抛出的异常),假如collect { ... }(在catch的下面)中抛出了异常,此异常将不会被捕获,如下代码所示:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = flow {for (index in 1..3) {println("Emitting $index")emit(index)//emit next value}
}fun main() = runBlocking<Unit> {simple().catch { e -> println("Caught $e") }//emit on exception.collect { value ->check(value <= 1) { "Collected $value" }println(value)}
}

上面的代码并不能捕获到下游抛出的异常,输出如下:

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$$inlined$collect$1.emit(Collect.kt:134)at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:77)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)at com.cool.cleaner.test.KotTestKt$simple$1.invokeSuspend(KotTest.kt:12)at com.cool.cleaner.test.KotTestKt$simple$1.invoke(KotTest.kt)at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)at kotlinx.coroutines.flow.FlowKt__ErrorsKt.catchImpl(Errors.kt:230)at kotlinx.coroutines.flow.FlowKt.catchImpl(Unknown Source)at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catch$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:113)at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:80)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:16)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)

13.2、Catching declaratively

我们可以在想要处理异常的时候结合使用catch操作符,如何做呢?你只需要把collect操作符中的代码移到onEach中并放在catch操作符之前即可;当然收集这样的flow必须调用无参数版本的collect函数,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = flow {for (index in 1..3) {println("Emitting $index")emit(index)//emit next value}
}fun main() = runBlocking<Unit> {simple().onEach { value ->check(value <= 1) { "Collected $value" }println(value)}.catch { e -> println("Caught $e") }.collect()
}

通过下面的输出结果我们知道异常已经得到处理,而你并不需要显示的使用try/catch代码块:

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2Process finished with exit code 0

14、Flow completion

当Fow结束(正常结束或者异常结束)的时候我们可能需要执行一个动作,正如你所见,你可以用声明式或者命令式的方式来执行你想要执行的动作。

14.1、命令式的try/finally

除了try/catch以外,collector还可以使用finally来执行你想要执行的动作,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = (1..3).asFlow()fun main() = runBlocking<Unit> {try {simple().collect{ value -> println(value)}} finally {println("Done")}
}

输出如下:

2
3
DoneProcess finished with exit code 0

14.2、声明式的执行

对于声明式的执行方式,flow 有一个操作符onCompletion ,该操作符会在flow完成的时候调用,前面的例子可以使用onCompletion 重写,输出结果也是一样的,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = (1..3).asFlow()fun main() = runBlocking<Unit> {simple().onCompletion { println("Done") }.collect{ value -> println(value)}
}

使用onCompletion 操作符的优势是它包含一个可为null的Throwable 参数,可以根据这个参数判断是否发生了异常,下面的例子中,simple flow在发射一个数据1之后就会抛出一个异常:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = flow {emit(1)throw RuntimeException()
}fun main() = runBlocking<Unit> {simple().onCompletion { cause ->if (null != cause) {println("Flow completed exceptionally")}}.catch { cause -> println("Caught exception") }.collect{ value -> println(value)}
}

正如你所期望的,下面是输出:

1
Flow completed exceptionally
Caught exceptionProcess finished with exit code 0

onCompletion 操作符并不像 catch操作符,它不会处理异常,正如上面的代码所看到的,异常还是继续向下游传播,当传播到到操作符onCompletion 之后还会继续向后传播最后被catch操作符处理。

14.3、正常结束(Successful completion)

catch操作符与onCompletion操作符另一个不同是onCompletion会接收到所有的异常并且只有在flow正常结束(没有取消或者失败)的情况下才会接收到null的异常,如下代码所示:

package com.cool.cleaner.testimport kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlockingfun simple(): Flow<Int> = (1..3).asFlow()fun main() = runBlocking<Unit> {simple().onCompletion { cause ->println("Flow completed with $cause")}.collect { value ->check(value <= 1) { "Collected $value" }println("$value")}
}

我们可以看到完成原因并不会空,因此flow的下游产生了一个异常,下面是输出:

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:114)at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:77)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:11)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)Process finished with exit code 1

15、Imperative versus declarative

现在我们知道如何collect flow,接收数据发射完成通知,以声明式或者命令式的方式处理异常;所以现在问题是哪一种处理异常的方式更好呢?作为一个库,我们不推荐哪一种方式更好,并且相信两种方式都是正确的,而你应该根据你的喜欢和代码风格来选择他们。

16、Launching flow

我们可以使用flow来代表一些从某个源发射的一些异步事件,在这 种情况下我们需要使用类似于addEventListener 的函数注册一个监听,当事件到来的时候执行我们的监听器并继续执行其它工作。onEach操作符正是为此而生;然而onEach是一个中间操作符,我们需要一个终结操作符来收集flow,否则仅仅调用onEach是没有效果的。

如果在onEach之后使用终结操作符collect,那么它后面的代码只有在flow收集完成之后才会开始执行,如下代码所示:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlockingfun events(): Flow<Int> = (1..3).asFlow().onEach {number ->delay(100)
}fun main() = runBlocking<Unit> {events().onEach { event -> println("Event: $event") }.collect()//<----- Collecting the flow waitsprintln("Done")
}

下面是输出:

Event: 1
Event: 2
Event: 3
Done

launch操作符在这种情况下就可以使用了,使用launchIn替换collect使得我们可以在一个新的协程中收集flow,因此后面的代码就可以马上执行而不用像collect一样需要等待collect执行完成才会执行后面的代码,如下所示:

package com.cool.cleaner.testimport kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlockingfun events(): Flow<Int> = (1..3).asFlow().onEach {number ->delay(100)
}fun main() = runBlocking<Unit> {events().onEach { event -> println("Event: $event") }.launchIn(this)//<----- Launching the flow in a separate coroutineprintln("Done")
}

下面是输出:

Done
Event: 1
Event: 2
Event: 3

launchIn的参数指定需要在哪个CoroutineScope 中启动协程来执行收集动作,在上面的例子中这来自协程构建器runBlocking中,因此当flow还没收集完成的时候, runBlocking作用域将会等待它所有的子协程完成并阻止主函数返回然后退出。

在一个实际应用中作用域将会来自一个有生命周期的实体。只要实体的生命周期结束那么相应的协程作用域也会被取消,相应flow的收集也会 被取消;从这种使用方式上来说onEach { ... }.launchIn(scope) 就类似于addEventListener,只是在结构化并发中并不需要调用相应的removeEventListener 。

请注意,launchIn还返回一个Job,I不过它只能用于取消相应的流集合协程,而不会取消整个范围或联接它。

17、检查flow的取消

为了操作上的方便,flow构建器会在发射 每一个值的时候调用ensureActive以检查flow是否已经被取消,这意味着在代码块flow { ... }中的循环是可取消的,如下代码所示:

package com.cool.cleaner.testimport kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlockingfun foo(): Flow<Int> = flow {for (i in 1..5) {println("Emitting $i")emit(i)}
}fun main() = runBlocking<Unit> {foo().collect { value ->if (3 == value) {cancel()}println(value)}
}

输出如下,我们只看到输出了前三个数字,当尝试发射4的时候就会抛出CancellationException的异常:

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@123a439bat kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1579)at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:217)at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:215)at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:134)at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:77)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)at com.cool.cleaner.test.KotTestKt$foo$1.invokeSuspend(KotTest.kt:12)at com.cool.cleaner.test.KotTestKt$foo$1.invoke(KotTest.kt)at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:212)at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:80)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:16)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)Process finished with exit code 1

然而大部分flow操作符由于性能的原因是没有检查有没有取消协程的,如果你用 IntRange.asFlow去实现一个循环且没有调用suspend函数,那么是不会执行是否已经取消这个检查动作的,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {(1..5).asFlow().collect { value ->if (3 == value) cancel()println(value)}
}

从1到5的所有数字都会输出,而只有在从runBlocking返回之前才检查到取消操作,输出如下:

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@aec6354at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1579)at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:217)at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:215)at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:71)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:9)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)Process finished with exit code 1

17.1、Making busy flow cancellable

在协程中当你有一个循环的时候,你必须显示的执行检查是否取消了协程这个动作,你可以调用.onEach { currentCoroutineContext().ensureActive() },但是还有一个可取消的操作符,代码如下:

package com.cool.cleaner.testimport kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlockingfun main() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect { value ->if (3 == value) cancel()println(value)}
}

从输出可以看到使用cancellable操作符后只有1到3的数字输出了,如下:

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@7fbe847cat kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:1579)at kotlinx.coroutines.CoroutineScopeKt.cancel(CoroutineScope.kt:217)at kotlinx.coroutines.CoroutineScopeKt.cancel$default(CoroutineScope.kt:215)at com.cool.cleaner.test.KotTestKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)at kotlinx.coroutines.flow.CancellableFlowImpl$collect$$inlined$collect$1.emit(Collect.kt:135)at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)at kotlinx.coroutines.flow.CancellableFlowImpl.collect(Context.kt:342)at com.cool.cleaner.test.KotTestKt$main$1.invokeSuspend(KotTest.kt:74)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt:10)at com.cool.cleaner.test.KotTestKt.main(KotTest.kt)Process finished with exit code 1

18、Flow及响应式流

对于熟悉响应式流的读者来说会觉得Flow非常熟悉。

实际上Flow的设计就是受到响应式流的启发而设计的,但是Flow的设计是尽可能的简单,而要达到这个目标是离不开响应式编程开发者们的支持的,在这里你可能了解到相应的故事。

5、异步流(Asynchronous Flow)相关推荐

  1. C# 8 新特性 - 异步流 Asynchronous Streams

    异步流 Asynchronous Streams 例子 这是一个很简单的控制台程序.它有一个NumberFactory,它可以根据传递的参数来产生一串数字(IEnumerable<int> ...

  2. 【Kotlin 协程】Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

    文章目录 一.使用 Flow 异步流持续获取不同返回值 二.Flow 异步流获取返回值方式与其它方式对比 三.在 Android 中 使用 Flow 异步流下载文件 一.使用 Flow 异步流持续获取 ...

  3. 【Kotlin 协程】Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

    文章目录 一.流的构建器函数 1.flow 构建器 2.flowOf 构建器 3.asFlow 构建器 一.流的构建器函数 1.flow 构建器 在之前的博客 [Kotlin 协程]Flow 异步流 ...

  4. 【Kotlin 协程】Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )

    文章目录 一.流的上下文 1.上下文保存 2.流收集函数原型 3.流发射函数原型 4.代码示例 - 查看流发射和收集的协程 5.代码示例 - 不能在不同协程中执行相同流的发射和收集操作 二.修改流发射 ...

  5. .NET特性:异步流

    自从VB/C#开始支持async/await后,开发者一直在期待异步版本的IEnumerable.但直到C# 7和ValueTask发布前,从性能的角度来看这一要求几乎是不可能实现的. \\ 在老版本 ...

  6. corda_Corda服务的异步流调用

    corda 如何使流程更快? 如果您与Corda合作已有一段时间,那么您很有可能已经考虑过这一点. 您可以通过以下几方面进行合理的调整以提高性能:事务大小,优化查询并减少整个Flow执行过程中所需的网 ...

  7. Corda服务的异步流调用

    如何使流程更快? 如果您已经与Corda合作了一段时间,那么您很有可能已经考虑过这一点. 您可以通过以下几方面进行合理的调整来提高性能:事务大小,优化查询并减少整个Flow执行过程中所需的网络跃点数. ...

  8. SAP SD基础知识之凭证流(Document Flow)

    SAP SD基础知识之凭证流(Document Flow) 一,根据参考创建Create with reference 可以参考之前的凭证来创建销售凭证,可以在初始画面,也可以在凭证处理过程中,通过u ...

  9. postgresql主从备份_基于PG12.2实现主从异步流复制及主从切换教程(下)

    概述 今天主要介绍如何搭建PG主从流复制及主从切换,仅供参考. PS:上篇的地址在文末链接. PostgreSQL数据库主从异步流复制搭建 环境说明: 1.安装PG数据库(主从库进行) 用脚本进行,略 ...

最新文章

  1. 【Tool】Augmentor和imgaug——python图像数据增强库
  2. 使用Angular HTTP client对数据模型进行创建操作
  3. docker 安装nacos_康过来!Nacos配置和管理微服务的使用
  4. win32 c语言编程,win32环境C语言实现最基本的DLL编写及调用实例,测试通过[原]
  5. RuntimeError: expected backend CUDA and dtype Float but got backend CUDA and dtype Long
  6. [蓝桥杯]试题 基础练习 Sine之舞
  7. centos 6.3 64bit 安装VMware workstation 9.1 64bit
  8. IOS 打包后安装崩溃,debug正常运行
  9. mongodb 日期分组聚合_如何在MongoDB中按其他字段分组时聚合时间序列数据?
  10. MyBatis 安装下载 及入门案例
  11. cmd关闭计算机指令,取消CMD自动关机的命令是什么
  12. 阿里云Docker仓库
  13. 产品管理,产品策划,产品设计
  14. 震惊!这浏览器居然进过全球 Top10
  15. i2c-test工具说明文档
  16. python中PyGame的下载与安装
  17. 用java流复制文件不能复制全,少几十兆!!
  18. c语言上机总结报告,C语言程序设计上机实践心得报告
  19. Sixth season ninth episode,Ross got high blamed on Chandler so Monica‘s parents do not like him????
  20. CT值及CT常用窗宽、窗位

热门文章

  1. word模板中添加图片
  2. Visual Graph常见问题回答(FAQ)
  3. Typecho权限管理插件 - 权限狗
  4. npm install XXX 报错:error An unexpected error occurred:
  5. python3 输出中文、日文等等乱码问题的解决办法
  6. 任性安装苹果应用,安装包在手天下我有
  7. 【整活】修改U盘的图标,让你的U盘与众不同
  8. 皇甫懒懒 Java 学习笔记 第一章
  9. 懒懒的周末 (r8笔记第30天)
  10. Angular primeng tree 组件数据解析(适用于Angular2+)