Flow 上下文

Flow 的收集动作总是发生在调用协程的上下文当中,而非定义 Flow 的上下文。

fun log(msg: String) = println("[${Thread.currentThread().name}], $msg")
fun myMethod(): Flow<Int> = flow {log("started")for(i in 1..3){emit(i)}
}
fun main() = runBlocking {myMethod().collect { log("collected: $it")}
}

运行输出结果如下(打开 debug 参数):

[main @coroutine#1], started
[main @coroutine#1], collected: 1
[main @coroutine#1], collected: 2
[main @coroutine#1], collected: 3

可以看出 flow 运行于 collect 调用时的协程,即 runBlocking 开启的协程。这无疑会阻塞住主线程。因此我们可以将 flow 运行在其它上下文:

private fun log(msg:String) = println("[${Thread.currentThread().name}], $msg")
private fun myMethod(): Flow<Int> = flow {withContext(Dispatchers.Default) {for(i in 1..4){Thread.sleep(100)emit(i)}}
}
fun main() = runBlocking {myMethod().collect { log(it) }
}

程序报出如下异常:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@405324edf, BlockingEventLoop@44235043],
but emission happened in [DispatchedCoroutine{Active}@40452e23e, DefaultDispatcher].
...

大意是收集线程发生在主线程,但 emit 线程发生在后台线程。withContext(Dispatchers.Default) 一句修改了 flow 的上下文,将 flow 的分发器修改到了 Dispatchers.Default。但 collect 的代码使用的分发器则是 runBlocking 的分发器。

flowOn 运算符

为了解决这个问题,kotlin 引入了 flowOn 方法:

private fun myMethod(): Flow<Int> = flow {for(i in 1..4){Thread.sleep(100)log("emit: $i")emit(i)}.flowOn(Dispatchers.Default)
}
fun main() = runBlocking {myMethod().collect { println("collect: $it") }
}

运行程序,错误不再出现。flowOn 方法允许修改 中间操作 的 上下文,使得中间操作和终止操作运行于不同的上下文。flowOn 操作改变了 Flow 本身默认的顺序。现在收集操作和发射操作发生在不同的协程(线程):

[DefaultDispatcher-worker-1 @coroutine#2], emit: 1
[main @coroutine#1], collect: 1
[DefaultDispatcher-worker-1 @coroutine#2], emit: 2
[main @coroutine#1], collect: 2
[DefaultDispatcher-worker-1 @coroutine#2], emit: 3
[main @coroutine#1], collect: 3
[DefaultDispatcher-worker-1 @coroutine#2], emit: 4
[main @coroutine#1], collect: 4
缓冲

如果让 flow 的不同部分使用不同的协程执行,将有助于实现并行操作从而提升性能。这就会用到缓冲的概念。典型地,我们可以让 emit 操作和collect 操作并行。这样 emit 操作在collect 操作进行的同时不会被阻塞,而是继续 emit 下一个元素并将结果缓冲到缓存里,下一次收集将直接从缓存中读取。

privatet fun myMethod(): Flow<Int> = flow {for(i in 1..4){delay(100) // 模拟耗时操作emit(i)}
}
fun main() = runBlocking {val time = measureTimeMillis {myMethod().collect{ delay(200)println(it)}}println(time)
}

这里,发射一个元素需要0.1秒,收集一个元素耗时0.2秒,所以4个元素总共耗时 1.2 秒,所以输出结果如下:

1
2
3
4
1223

如果使用缓冲操作,则不需要耗时这么多时间了:

myMethod().buffer().collect{ delay(200)println(it)}

唯一的修改就是在 collect 之前增加了一个buffer() 操作,这样元素发射之后不会等待收集完成,而是直接将结果缓冲到缓存里。真正搜集到的实际上是缓存里的元素。这样无疑会加快整个发射和搜集的过程:

1
2
3
4
1003

第一次收集发生在 0.3秒(0.1秒发射+0.2秒收集),第二次收集发生在 0.5 秒,第三次收集发生在 0.7 秒,第四次收集发生在0.9 秒。加上一些建立、销毁缓冲的时间,大致总耗时1秒左右。

缓冲和 flowOn 有一定关系。本质上 flowOn 操作需要改变 CoroutineDispatcher 时也会使用同样的缓冲机制,比如上面 flowOn 的例子。

Flow 的组合

将两个flow的内容合并为一个flow。

fun main()=runBlocking<Unit> {val nums = (1..5).asFlow()val strings = flowOf("a","b","c","d","e")// zip - 合并操作nums.zip(strings){a, b -> "$a$b"    }.collect{ println(it) }
}

zip 操作将第一个流和第二个流中的元素依序取出,然后按照第二个参数的lambda 进行处理,然后发射。输出结果如下所示:

1a
2b
3c
4d
5e
打平操作

将一个包含了 f low 的 flow (类似 Flow<Flow)转换成不包含 flow 的flow(Flow)操作(类似将二维数组转换成一维数组)。

private fun myMethod(i: Int): Flow<String> = flow {emit("$i: First")delay(500)emit("$i: Second")
}
fun main() = runBlocking<Unit> {val startTime = System.currentTimeMillis()(1..3).asFlow().onEach{ // 1. 遍历元素delay(100)}.flatMapConcat{ // 2. 打平myMethod(it)}.collect { // 3. 终止println("$it: ${System.currentTimeMillis()-startTime} ms")}
}

myMethod 方法

输出结果如下:

1: 144 ms
1: 646 ms
2: 751 ms
2: 1256 ms
3: 1360 ms
3: 1865 ms
  1. 将1…3 转换为流,在每个元素(1…3)上 delay 0.1 秒。

  2. 将 3 个元素(1…3)转换为 3 个流,myMethod 方法每次都会产生两个相同元素的流(emit 两次)。这样 1…3 就会变成:

    [1,1],[2,2],[3,3]

    而且由于每个流之间有一个 delay 0.1秒,所以打印输出时,两个相同数字之间间隔0.5秒, 而两两一组之间间隔 0.1 秒。

  3. 此外 flatMapConcat 会将它从二维的流打平成一维的流,于是这个流在收集时会变成 6 个单独的元素:

    [1,1,2,2,3,3]

Flow 的异常

可以对 flow的异常进行传统的 try…catch 处理:

private fun myMethod():Flow<Int> = flow {for(i in 1..3) {println("emit: $i")emit(i)}
}
fun main() = runBlocking<Unit> {try {myMethod().collect {println(it)check(it <= 1){// check 函数的作用是检查第一个参数,当true 时执行第二个参数(lambda),否则抛出一个 IllegalStateException,同时 error message 的内容时lambda 表达式返回的 Any 类型。"collect $it"}}}catch(e: Throwable) {println("Caught $e")}
}

执行上述代码,输出:

emit: 1
1
emit: 2
2
Caught java.lang.IllegalStateException: collect  2

当第二个元素为 2 时,check 判断失败,抛出异常,被 try…catch 所捕获。

在上面的例子中,异常发生在收集阶段。但 try…catch 也可以捕获flow 的发射阶段和中间操作阶段。

private fun myMethod():Flow<String> = flow {for(i in 1..3) {println("emit: $i")emit(i)}.map{ value ->check(value <= 1) { "crash on $value" }"string $value"}
}
fun main()=runBlocking<Unit> {try{myMethod().collect{ println(it) }}catch(e: Throwable){println("caught $e")}
}

运行程序,输出:

emit: 1
string 1
emit: 2
caught java.lang.IllegalStateException: crash on 2

当元素 大于 1 时,异常被捕获,并且 crash on 2 被保存在了异常的 message 当中。

完成

当 flow 执行完毕之后,可以额外追加一个完成动作。这个动作可以是命令式的,也可以时声明式的。

private fun myMethod(): Flow<Int> = (1..10).asFlow()fun main()=runBlocking<Unit> {try {myMethod().collect { println(it) }}finally {println("finally")}
}

finally 块中的代码最终都会得到执行,无论 flow 是正常执行完还是抛出异常,这就是命令式,利用了 try…finally 语句:

1
2
...
10
finally

声明式则比较灵活,它就是 onCompletion 中间操作,它在 collect 或 取消 之后执行:

private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {myMethod().onCompletion{ println("onCompletion") }.collect { println(it) }
}

输出结果同之前一模一样。值得注意的是,onCompletion 的 action 参数是一个带参数的 lambda 表达式,这个参数的类型是可空的 Throwable。可以利用这个参数获取抛出的异常。

private fun myMethod(): Flow<Int> = flow {emit(1)throw RuntimeException() // 抛出异常
}
fun main() = runBlocking {myMethod().onCompletion { e -> if(e!=null) println("Flow stopped with an Exception.")}.catch { e -> println("Caught an Exception") }.collect { println(it) }
}

其中,catch 操作也是一个中间操作,用于捕获 flow 中的异常。

打印结果:

1
Flow stopped with an Exception.
Caught an Exception

但是有一点需要注意,onCompletion 只会看到来自于 flow 上游的异常,但无法捕获下游异常。

private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> {myMethod().onCompletion{ e -> println("Flow stopped with an Exception $e.")}.collect { value -> check{value <= 1} { "collect $value" }println(value)}
}

输出结果如下:

1
Flow stopped with an Exception null.
Exception in thread "main" java.lang.IllegalStateException: Collect 2
...

这里,onCompletion 把 e 打印成 null,说明它并没有捕获到这个异常,因为 onCompletion 只是一个中间操作,而 collect 是终止操作,从这个角度上看 collect 是 onCompletion 的下游操作,因此 collect 中出现的异常无法被 onCompletion 捕获。

取消

flow 的取消实际上是通过协程的取消来实现的,本身没有所谓的取消操作。比如我们取消一个 cellect 操作,前提是 flow 本身在一个可取消的挂起函数(如 delay) 中被挂起了:

private fun myMethod(): Flow<Int> = flow {for (i int 1..4) {delay(100)println("Emit: $i")emit(i)}
}
fun main() = runBlocking<Unit> {withTimeoutOrNull(280) {// 设置超时时间 280 毫秒myMethod().collect { println(it) }}println("Done.")
}

控制台输出如下:

Emit: 1
1
Emit: 2
2
Done.

第3、4次循环因超时被取消。这里,withTimeoutOrNull 就是一个可取消的挂起函数。

【深入kotlin】 - Flow 进阶相关推荐

  1. Kotlin Flow响应式编程,操作符函数进阶

    本文同步发表于我的微信公众号,扫一扫文章底部的二维码或在微信搜索 郭霖 即可关注,每个工作日都有文章更新. 大家好,今天原创. 在上一篇原创文章当中,我跟大家说了会开启一个新的系列,讲一讲Kotlin ...

  2. RxJava VS kotlin flow

    1.基础概念介绍 1.1 观察者模式 观察者模式,其实对于Android开发者而言,并不陌生,button的setOnClickListener,就是一个典型的观察者模式.控件button是被观察者, ...

  3. Kotlin Flow响应式编程,基础知识入门

    Kotlin在推出多年之后已经变得非常普及了.相信现在至少有80%的Android项目已经在使用Kotlin开发,或者有部分功能使用Kotlin开发. 关于Kotlin方面的知识,我其实分享的文章并不 ...

  4. Android Kotlin Flow 如何使用callbackflow

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/121840157 本文出自[赵彦军的博客] callbackFlow 原理 call ...

  5. sharedpreferences使用方法_Google 推荐在 MVVM 架构中使用 Kotlin Flow

    前言 在之前分享过一篇 Jetpack 综合实战应用 Jetpack 实战:神奇宝贝 ,这个项目主要包了以下功能: 自定义 RemoteMediator 实现 network + db 的混合使用 ( ...

  6. avd android 5.1,Kotlin开发进阶

    Kotlin开发进阶 编辑 锁定 讨论 上传视频 本词条缺少信息栏.概述图,补充相关内容使词条更完整,还能快速升级,赶紧来编辑吧! <Kotlin开发进阶>是清华大学出版社出版的图书,作者 ...

  7. Kotlin Flow | SharedFlow和StateFlow详解

    文章目录 Getting Started SharedFlow Handling Shared Events Event Emission With SharedFlow Replay and Buf ...

  8. Kotlin第4篇 【Kotlin】进阶视频课程-关东升-专题视频课程

    Kotlin第4篇 [Kotlin]进阶视频课程-376人已学习 课程介绍         本视频是智捷课堂推出的一套"Kotlin语言学习立体教程"的视频第四部分,主要内容包括: ...

  9. Kotlin Flow啊,你将流向何方?

    前言 协程系列文章: 一个小故事讲明白进程.线程.Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotl ...

  10. 【Kotlin Flow】 一眼看全——Flow操作符大全

    作者:搬砖小子出现了 转载地址:https://juejin.cn/post/6989536876096913439 Kotlin Flow 基本上可以替代RxJava,其提供了诸多操作符来处理数据. ...

最新文章

  1. visualSVN-server的安装图解
  2. python面向对象编程的优点-Python面向对象编程——总结面向对象的优点
  3. Win2000 DDK 附带例子概览(图解)
  4. Linux中常用C/C++一些头文件的作用
  5. 从零起步CMFCToolBar用法详解
  6. 你不懂js系列学习笔记-异步与性能- 02
  7. linux服务器搭建_Linux怎么搭建ftp服务器,Windows怎么访问?按此教程10分钟完成...
  8. ios7中的UILabel自适决定大小
  9. SEO和SEM、ASO之间的区别?
  10. 计算机网络实验报告双机互联,双机互联实验报告.docx
  11. [html+css+js] 小米官网首页制作
  12. python简易程序教程_Python-自制简易程序挂机刷御魂
  13. Java 实现双向链表
  14. 例题6-21 uva506 System Dependencies 模拟
  15. 《数据结构与算法基础 严蔚敏版》第三章 堆栈与队列
  16. 世界上有多少Java开发人员?
  17. Anaconda 安装使用
  18. 谷歌2018博士生奖研金名单出炉,清华、上交大多人入选
  19. Maven插件wagon-maven-plugin自动化部署Java项目到Linux远程服务器
  20. 计算机二级(五)小笨鸟学飞版

热门文章

  1. 金蝶BOS,服务端执行SQL语句参考
  2. 大数据处理和编程实践Hadoop
  3. android菜单回弹,Android--实现ViewPager边界回弹效果(转)
  4. iPhone X UITabBarController UITabBar 适配解读
  5. 学习PLC有什么好方法吗
  6. SQL中的 update用法+示例
  7. 掌握typora使用方法
  8. uniapp 打包成H5
  9. AI智能分析在智慧电厂的典型应用
  10. 【读论文】A Deep Neural Network for Unsupervised Anomaly Detection and Diagnosis in Multivariate Time...