文章目录

  • 一、流的上下文
    • 1、上下文保存
    • 2、流收集函数原型
    • 3、流发射函数原型
    • 4、代码示例 - 查看流发射和收集的协程
    • 5、代码示例 - 不能在不同协程中执行相同流的发射和收集操作
  • 二、修改流发射的协程上下文
    • 1、Flow#flowOn 函数原型
    • 2、代码示例

一、流的上下文


1、上下文保存

Flow 异步流 收集元素 的操作 , 一般是在 协程上下文 中进行的 , 如 : 在协程中调用 Flow#collect 函数 , 收集元素 ;

收集元素 时 的 协程上下文 , 会 传递给 发射元素 的 流构建器 , 作为 流构建器的 上下文 ;

Flow 异步流 在 收集元素 时 , 才调用 流构建器 中的代码 , 收集元素操作在协程中执行 , 流构建器 也同样在相同的协程中运行 ;

流收集元素 和 发射元素 在相同的协程上下文中 的 属性 , 称为 上下文保存 ;

2、流收集函数原型

Flow#collect 函数原型如下 : Flow#collect 函数 由 suspend 关键字修饰 , 该函数是 suspend 挂起函数 , 因此 该函数必须在 协程中调用 ;

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =collect(object : FlowCollector<T> {override suspend fun emit(value: T) = action(value)})

3、流发射函数原型

Flow 异步流的 构建器 函数 : 流构建器 不是 suspend 挂起函数 , 可以在普通的线程中运行 , 不必在协程中运行 ;

  • flow 构建器 :
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
  • asFlow 构建器 :
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {emit(invoke())
}
  • flowOf 构建器 :
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {for (element in elements) {emit(element)}
}

4、代码示例 - 查看流发射和收集的协程

代码示例 : 在 流收集 时 和 流构建时 , 分别打印线程名称 , 查看是在哪个线程中执行的 ;

package kim.hsl.coroutineimport android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlockingclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)// 携程中调用挂起函数返回一个 Flow 异步流runBlocking {println("流收集时的协程上下文 : ${Thread.currentThread().name}")// 调用 Flow#collect 函数, 可以获取在异步流中产生的元素flowFunction().collect {// 每隔 500ms 即可拿到一个 Int 元素// 并且该操作是异步操作, 不会阻塞调用线程println(it)}}}/*** 使用 flow 构建器 Flow 异步流* 在该异步流中, 异步地产生 Int 元素*/suspend fun flowFunction() = flow<Int> {println("流构建器的上下文 : ${Thread.currentThread().name}")for (i in 0..2) {// 挂起函数 挂起 500ms// 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令// 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令delay(500)// 每隔 500ms 产生一个元素// 通过调用 FlowCollector#emit 生成一个元素emit(i)}}
}

执行结果 : 最终执行时 , 流构建器和流收集 都是在 主线程中执行的 , 这是 由 runBlocking 协程构建器 将 主线程 包装后的 协程 ;

2022-12-23 14:29:06.315 17484-17484/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:29:06.323 17484-17484/kim.hsl.coroutine I/System.out: 流构建器的上下文 : main
2022-12-23 14:29:06.875 17484-17484/kim.hsl.coroutine I/System.out: 0
2022-12-23 14:29:07.399 17484-17484/kim.hsl.coroutine I/System.out: 1
2022-12-23 14:29:07.940 17484-17484/kim.hsl.coroutine I/System.out: 2

5、代码示例 - 不能在不同协程中执行相同流的发射和收集操作

在流构建器中 , 将代码定义在如下协程中执行 , 使用 Dispatchers.IO 调度器 , 也就是协程在子线程中执行 ;

withContext(Dispatchers.IO){}

在流收集时 , 在 使用 runBlocking 将主线程包装后的 协程 中 , 收集元素 , 协程在主线程中执行 ;

runBlocking {}

代码示例 :

package kim.hsl.coroutineimport android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContextclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)// 携程中调用挂起函数返回一个 Flow 异步流runBlocking {println("流收集时的协程上下文 : ${Thread.currentThread().name}")// 调用 Flow#collect 函数, 可以获取在异步流中产生的元素flowFunction().collect {// 每隔 500ms 即可拿到一个 Int 元素// 并且该操作是异步操作, 不会阻塞调用线程println(it)}}}/*** 使用 flow 构建器 Flow 异步流* 在该异步流中, 异步地产生 Int 元素*/suspend fun flowFunction() = flow<Int> {// 在后台线程中发射元素withContext(Dispatchers.IO){println("流构建器的上下文 : ${Thread.currentThread().name}")for (i in 0..2) {// 挂起函数 挂起 500ms// 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令// 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令delay(500)// 每隔 500ms 产生一个元素// 通过调用 FlowCollector#emit 生成一个元素emit(i)}}}
}

执行程序后的报错信息如下 :

2022-12-23 14:39:05.805 19710-19710/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:39:05.850 19710-19738/kim.hsl.coroutine I/System.out: 流构建器的上下文 : DefaultDispatcher-worker-1
2022-12-23 14:39:06.436 19710-19710/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
2022-12-23 14:39:06.462 19710-19710/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: mainProcess: kim.hsl.coroutine, PID: 19710java.lang.RuntimeException: Unable to start activity ComponentInfo{kim.hsl.coroutine/kim.hsl.coroutine.MainActivity}: java.lang.IllegalStateException: Flow invariant is violated:Flow was collected in [BlockingCoroutine{Active}@daf39f2, BlockingEventLoop@a6f9843],but emission happened in [DispatchedCoroutine{Active}@8ba6ec0, Dispatchers.IO].Please refer to 'flow' documentation or use 'flowOn' insteadat android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)at android.os.Handler.dispatchMessage(Handler.java:106)at android.os.Looper.loop(Looper.java:193)at android.app.ActivityThread.main(ActivityThread.java:6718)at java.lang.reflect.Method.invoke(Native Method)at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)Caused by: java.lang.IllegalStateException: Flow invariant is violated:Flow was collected in [BlockingCoroutine{Active}@daf39f2, BlockingEventLoop@a6f9843],but emission happened in [DispatchedCoroutine{Active}@8ba6ec0, Dispatchers.IO].Please refer to 'flow' documentation or use 'flowOn' insteadat kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)at kim.hsl.coroutine.MainActivity$flowFunction$2$1.invokeSuspend(MainActivity.kt:43)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
2022-12-23 14:39:06.505 19710-19710/kim.hsl.coroutine I/Process: Sending signal. PID: 19710 SIG: 9

二、修改流发射的协程上下文


在上述 流的收集 和 流的发射 都 必须在同一个协程中执行 , 这样并不是我们想要的 ;

如 : 下载时 , 想要在后台线程中下载 , 在主线程中更新 UI , 那么对应 Flow 异步流应该是在 后台线程中 发射元素 , 在主线程中 收集元素 ;

使用 flowOn 操作符 , 可以修改 流发射 的协程上下文 , 不必必须在 流收集 的协程上下文中执行 流发射操作 ;

1、Flow#flowOn 函数原型

Flow#flowOn 函数原型如下 :

/*** 将此流执行的上下文更改为给定的[context]。* 此操作符是可组合的,仅影响前面没有自己上下文的操作符。* 这个操作符是上下文保护的:[context] **不会**泄漏到下游流中。** 例如:** ```* withContext(Dispatchers.Main) {*     val singleValue = intFlow // will be executed on IO if context wasn't specified before*         .map { ... } // Will be executed in IO*         .flowOn(Dispatchers.IO)*         .filter { ... } // Will be executed in Default*         .flowOn(Dispatchers.Default)*         .single() // Will be executed in the Main* }* ```** 有关上下文保存的更多说明,请参考[Flow]文档。** 如果更改上下文不需要更改,则此操作符保留流的_sequential_性质* (调度)[CoroutineDispatcher]。否则,如果需要更改dispatcher,它将进行收集* 使用指定[上下文]运行的协同例程中的流发射,并从另一个协同例程中发射它们* 使用带有[default][channel]的通道与原始收集器的上下文连接。BUFFERED]缓冲区大小* 在两个协程之间,类似于[buffer]操作符,除非显式调用[buffer]操作符* 在' flowOn '之前或之后,请求缓冲行为并指定通道大小。** 注意,跨不同调度程序操作的流在取消时可能会丢失一些正在运行的元素。* 特别是,该操作符确保下游流不会在取消时恢复,即使元素* 已经被上游的气流释放出来了。** ###算子融合** 相邻的[channelFlow]、[flowOn]、[buffer]和[produceIn]的应用是* 始终融合,以便只有一个正确配置的通道用于执行。** 多个“flowOn”操作符融合到一个具有组合上下文的单一“flowOn”。上下文的要素* 第一个' flowOn '操作符自然优先于第二个' flowOn '操作符的元素* 当它们具有相同的上下文键时,例如:** ```* flow.map { ... } // Will be executed in IO*     .flowOn(Dispatchers.IO) // This one takes precedence*     .flowOn(Dispatchers.Default)* ```** 请注意,[SharedFlow]的实例本身没有执行上下文,* 所以应用' flowOn '到' SharedFlow '没有效果。参见[SharedFlow]关于Operator Fusion的文档。** @throws [IllegalArgumentException] 如果所提供的上下文包含[Job]实例。*/
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {checkFlowContext(context)return when {context == EmptyCoroutineContext -> thisthis is FusibleFlow -> fuse(context = context)else -> ChannelFlowOperatorImpl(this, context = context)}
}

2、代码示例

代码示例 :

package kim.hsl.coroutineimport android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlockingclass MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)// 携程中调用挂起函数返回一个 Flow 异步流runBlocking {println("流收集时的协程上下文 : ${Thread.currentThread().name}")// 调用 Flow#collect 函数, 可以获取在异步流中产生的元素flowFunction().collect {// 每隔 500ms 即可拿到一个 Int 元素// 并且该操作是异步操作, 不会阻塞调用线程println(it)}}}/*** 使用 flow 构建器 Flow 异步流* 在该异步流中, 异步地产生 Int 元素*/suspend fun flowFunction() = flow<Int> {println("流构建器的上下文 : ${Thread.currentThread().name}")for (i in 0..2) {// 挂起函数 挂起 500ms// 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令// 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令delay(500)// 每隔 500ms 产生一个元素// 通过调用 FlowCollector#emit 生成一个元素emit(i)}}.flowOn(Dispatchers.IO)
}

执行结果 : 没有报错 , 并且 流发射 在子线程中执行 , 流收集 在 主线程中执行 ;

2022-12-23 14:50:32.925 21339-21339/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:50:32.991 21339-21374/kim.hsl.coroutine I/System.out: 流构建器的上下文 : DefaultDispatcher-worker-2
2022-12-23 14:50:33.512 21339-21339/kim.hsl.coroutine I/System.out: 0
2022-12-23 14:50:34.038 21339-21339/kim.hsl.coroutine I/System.out: 1
2022-12-23 14:50:34.583 21339-21339/kim.hsl.coroutine I/System.out: 2

【Kotlin 协程】Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )相关推荐

  1. 【Kotlin 协程】Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

    文章目录 一.流的构建器函数 1.flow 构建器 2.flowOf 构建器 3.asFlow 构建器 一.流的构建器函数 1.flow 构建器 在之前的博客 [Kotlin 协程]Flow 异步流 ...

  2. 【Kotlin 协程】Flow 异步流 ⑧ ( 背压概念 | 使用缓冲处理背压问题 | 使用 flowOn 处理背压问题 | 从提高收集元素效率方向解决背压问题 )

    文章目录 一.背压概念 二.使用缓冲处理背压问题 三.使用 flowOn 处理背压问题 四.从提高收集元素效率方向解决背压问题 1.Flow#conflate 代码示例 2.Flow#collectL ...

  3. 【Kotlin 协程】Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

    文章目录 一.使用 Flow 异步流持续获取不同返回值 二.Flow 异步流获取返回值方式与其它方式对比 三.在 Android 中 使用 Flow 异步流下载文件 一.使用 Flow 异步流持续获取 ...

  4. 王学岗Kotlin协程(四)————Flow异步流

    参考文章 异步返回值的多个方案 1,什么时候用flow呢?----kotlin要表示多个值 如何表示多个值?挂起函数可以异步返回单个值,但是如果要异步返回多个计算好的值, 就只能用flow了.其它方案 ...

  5. Kotlin 协程Flow主要操作符(一)

    Kotlin 协程Flow主要操作符(一) 1. 主要导包 2. map 转换操作符 3. filter过滤操作符 4. take限长操作符 5. drop丢弃操作符 6. flowOn操作符 7. ...

  6. Kotlin 协程Flow、StateFlow、ShareFlow

    Kotlin 协程Flow.StateFlow.ShareFlow 数据流 数据流以协程为基础构建,可提供多个值.从概念上来讲,数据流是可通过异步方式进行计算处理的一组数据序列.所发出值的类型必须相同 ...

  7. 大型Android项目架构:基于组件化+模块化+Kotlin+协程+Flow+Retrofit+Jetpack+MVVM架构实现WanAndroid客户端

    前言:苟有恒,何必三更眠五更起:最无益,莫过一日曝十日寒. 前言 之前一直想写个 WanAndroid 项目来巩固自己对 Kotlin+Jetpack+协程 等知识的学习,但是一直没有时间.这里重新行 ...

  8. Day10-Python3基础-协程、异步IO、redis缓存、rabbitMQ队列

    内容目录: Gevent协程 Select\Poll\Epoll异步IO与事件驱动 Python连接Mysql数据库操作 RabbitMQ队列 Redis\Memcached缓存 Paramiko S ...

  9. vs 启动调用的目标发生异常_协程中的取消和异常 | 取消操作详解

    在日常的开发中,我们都知道应该避免不必要的任务处理来节省设备的内存空间和电量的使用--这一原则在协程中同样适用.您需要控制好协程的生命周期,在不需要使用的时候将它取消,这也是结构化并发所倡导的,继续阅 ...

最新文章

  1. Lintcode363 Trapping Rain Water solution 题解
  2. tableau linux无网络安装_举个栗子!Tableau 技巧(127):购物篮分析之关联购买
  3. 23期PHP基础班第四天
  4. 作者:程学旗(1972-),男,中国科学院计算技术研究所研究员、博士生导师、副所长,中国科学院网络数据科学与技术重点实验室主任。...
  5. Uber从Postgres切换到MySQL
  6. 【我的Android进阶之旅】 解决Android编译出现问题:AAPT: error: resource string/xxx (aka xxx:string/xxx) not found.
  7. 用python计算化学题_(完整版)化学计算题解题方法(含答案)
  8. mysql计算连续天数,mysql连续登录天数,连续天数统计
  9. HeadFirstC笔记_7 高级函数:发挥函数的极限
  10. [GXYCTF2019]Ping Ping Ping 1解题思路
  11. annaconda 安装 opencv(cv2)
  12. python for ArcGIS 绘制西安市板块地图
  13. 通信协议整理之 SPI 通信
  14. Wow64(32位进程)注入DLL到64位进程
  15. vlc 视频局部放大【WPF版】
  16. Windows PowerShell学习笔记(一)
  17. 电信 NB-IoT无缝对接阿里云IoT 物联网平台
  18. Android View的elevation属性,CardView始终在布局顶层覆盖其它控件的解决方式;
  19. 尽量避免bug的一些手法
  20. 唐骏的失败,检验真理的标准将向实干精神倾斜

热门文章

  1. [COI2012] TRAMPOLIN
  2. 水果贵到吃不起,“水果自由”要电商平台来实现?
  3. php网站 类设计,PHP网站开发与设计
  4. 系统引导文件之 boot.ini
  5. 2022.12.28雷神加速器更新问题
  6. java中 ^ 是什么意思
  7. 使用nginx搭建http代理服务器
  8. Error in if (x[i] == NA) { : missing value where TRUE/FALSE needed
  9. Vue.js 2.5新特性介绍
  10. 【最全】软件测试基础理论选择题(含答案)