SharedFlow

SharedFlow是一个hot stream. sharedflow有以下特点:

  1. 没有默认值
  2. 可以保持旧值
  3. emit会挂起直到所有的订阅者处理完成
public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

replay: 重新发送给新订阅者之前值的个数

extraBufferCapacity:除了replay缓冲区个数之外的缓冲区的值。当有剩余空间的时候emit就不会挂起

onBufferOverflow:当extraBufferCapacity溢出时的处理。有下面三种处理方式:

public enum class BufferOverflow {/*** Suspend on buffer overflow.*/SUSPEND,/*** Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.*/DROP_OLDEST,/*** Drop **the latest** value that is being added to the buffer right now on buffer overflow* (so that buffer contents stay the same), do not suspend.*/DROP_LATEST
}

使用demo来测试这几个参数的作用:

1.测试replay的作用

fun `sharedflow Reply = 1`()= runBlocking{var sharedflow = MutableSharedFlow<Int>(replay = 1)GlobalScope.launch {sharedflow.emit(111)sharedflow.emit(222)}val job1 = GlobalScope.launch {delay(5000)sharedflow.collect{println("first job : $it")}}val job2 = GlobalScope.launch {delay(10000)sharedflow.collect{println("second job : $it")}}job1.join()job2.join()
}

结果:

first job : 222
second job : 222

上面的例子可以看出,设置了replay为1(默认为0),job1和job2最后打印的都是“222”

2.测试extraBufferCapacity的作用

fun `sharedflow emit suspend`() = runBlocking{var sharedflow = MutableSharedFlow<Int>()val job1 = GlobalScope.launch {sharedflow.collect{//println("${gettime()}:first job: $it")}}val job2 = GlobalScope.launch {sharedflow.collect{delay(5000)//println("${gettime()}:second job: $it")}}val job3 = GlobalScope.launch {for (i in 1..5){delay(500)println("${gettime()}:start--------emit $i")sharedflow.emit(i)println("${gettime()}:end--------emit $i")}}job1.join()job2.join()job3.join()
}

结果:

第一个emit没有挂起
11:20:46:699:start--------emit 1
11:20:46:704:end--------emit 1
第二个emit挂起了4s左右
11:20:47:206:start--------emit 2
11:20:51:711:end--------emit 2
第三个emit挂起了6s左右
11:20:52:217:start--------emit 3
11:20:56:716:end--------emit 3
第四个emit挂起了4s左右
11:20:57:219:start--------emit 4
11:21:1:717:end--------emit 4
第五个emit挂起了6s左右
11:21:2:218:start--------emit 5
11:21:6:720:end--------emit 5

代码中job2在collect时候delay了5s,从emit的结果看,从emit 2开始从start emit到end emit中间大概执行了4s多的时间。extraBufferCapacity默认为0

下面再测试一下,将extraBufferCapacity设置为2:

var sharedflow = MutableSharedFlow<Int>(extraBufferCapacity = 2)

测试结果:

emit没有挂起
11:24:47:843:start--------emit 1
11:24:47:848:end--------emit 1
emit没有挂起
11:24:48:354:start--------emit 2
11:24:48:356:end--------emit 2
emit没有挂起
11:24:48:860:start--------emit 3
11:24:48:861:end--------emit 3
emit挂起3s多
11:24:49:365:start--------emit 4
11:24:52:852:end--------emit 4
emit挂起6s多
11:24:53:355:start--------emit 5
11:24:57:857:end--------emit 5

从结果看emit1 到 emit3 的emit没有耗费太多的时间,从emit4开始中间分别耗费了3s,6s。从demo的测试来看当缓冲区满了之后执行了emit执行了挂起操作。

3.测试onBufferOverflow值对sharedflow的影响,修改onBufferOverflow = BufferOverflow.DROP_OLDEST,这个意思是丢弃最先的值

 fun `sharedflow emit suspend`() = runBlocking{var sharedflow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.DROP_OLDEST)val job1 = GlobalScope.launch {sharedflow.collect{println("${gettime()}:first job: $it")}}val job2 = GlobalScope.launch {sharedflow.collect{delay(5000)println("${gettime()}:second job: $it")}}val job3 = GlobalScope.launch {for (i in 1..5){delay(500)//println("${gettime()}:start--------emit $i")sharedflow.emit(i)// println("${gettime()}:end--------emit $i")}}job1.join()job2.join()job3.join()}

运行结果:

1:20:26:627:first job: 1
1:20:27:129:first job: 2
1:20:27:637:first job: 3
1:20:28:140:first job: 4
1:20:28:643:first job: 5
1:20:31:632:second job: 1
1:20:36:639:second job: 5

从上面运行结果看job1执行完成了,job2只输出了第一个值和最后一个值。job2将2,3,4几个值都丢失了。

onBufferOverflow != BufferOverflow.SUSPEND时,replay和extraBufferCapacity不能同时为0(默认值),否则运行时会出现如下错误:

replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDEST
java.lang.IllegalArgumentException: replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy DROP_OLDESTat kotlinx.coroutines.flow.SharedFlowKt.MutableSharedFlow(SharedFlow.kt:246)at com.example.flowdemo.ExampleUnitTest$sharedflow emit suspend$1.invokeSuspend(ExampleUnitTest.kt:165)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)at com.example.flowdemo.ExampleUnitTest.sharedflow emit suspend(ExampleUnitTest.kt:164)

将onBufferOverflow 的值改为DROP_LATEST

var sharedflow = MutableSharedFlow<Int>(replay = 1,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.DROP_LATEST)

运行结果:

1:36:44:978:first job: 1
1:36:45:481:first job: 2
1:36:49:983:second job: 1
1:36:54:989:second job: 2

从结果看只输出去了前两个值,后面的值全部丢掉了。

4.sharedflow是不防抖的,即如果连续放入相同的值,那么每个值collect都会响应一次。

fun `sharedflow shake`()= runBlocking{val sharedflow = MutableSharedFlow<String>()val job1 = GlobalScope.launch {sharedflow.collect {println("job---$it")}}val job2 = GlobalScope.launch {for (i in 1..5){delay(100)sharedflow.emit("hello laworks")}}job1.join()job2.join()
}

运行结果

job---hello laworks
job---hello laworks
job---hello laworks
job---hello laworks
job---hello laworks

这个和stateflow做一个对比

@Test
fun `stateflow shake`()= runBlocking{val stateflow = MutableStateFlow("state init")val job1 = GlobalScope.launch {stateflow.collect {println("job---$it")}}val job2 = GlobalScope.launch {for (i in 1..5){delay(100)stateflow.emit("I'am stateflow")}}job1.join()job2.join()
}

运行结果:

job---state init
job---I'am stateflow

从上面对比的结果可以看出stateflow是防抖的。

stateFlow

sharedflow是不防抖的,但是stateflow是防抖的,从上面的例子可以看出,下面是源码的分析:

源码如下:

synchronized(this) {val oldState = _state.valueif (expectedState != null && oldState != expectedState) return false // CAS support/*********return if equal*********/if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true_state.value = newStatecurSequence = sequenceif (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)curSequence++ // make it oddsequence = curSequence} else {// update is already in process, notify it, and returnsequence = curSequence + 2 // change sequence to notify, keep it oddreturn true // updated}curSlots = slots // read current reference to collectors under lock}

从源码上可以看出来,当值没有变化的时候就直接return了,不会做任何的处理。这也就是为什么stateflow是防抖的。

下面看看关于stateflow值丢失的问题:

@Test
fun `stateflow suspend`() = runBlocking {val stateflow = MutableStateFlow(0)val job1 = GlobalScope.launch {stateflow.collect{println("job1----$it")}}val job2 = GlobalScope.launch {stateflow.collect{delay(5000)println("job2----$it")}}val job3 = GlobalScope.launch {for(i in 1..5){//delay(500)println("${gettime()}:start--------emit $i")stateflow.emit(i)println("${gettime()}:end--------emit $i")}}job1.join()job2.join()job3.join()
}

输入的结果如下:

10:3:3:681:start--------emit 1
job1----0
10:3:3:684:end--------emit 1
job1----1
10:3:3:684:start--------emit 2
10:3:3:684:end--------emit 2
10:3:3:684:start--------emit 3
10:3:3:684:end--------emit 3
10:3:3:684:start--------emit 4
10:3:3:684:end--------emit 4
10:3:3:684:start--------emit 5
10:3:3:685:end--------emit 5
job1----5
job2----0
job2----5

结果看job1分别输出了0,1,5. job2分别输出了0和5. 并且emit很快就完成了,这是为什么?为什么job1没有输出所有的值,job2的延迟也没有像sharedflow一样将emit挂起?

首先说stateflow没有buffer的概念,它只能存一个值,所以在有值的时候就会去更新。再看看collect的实现:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =collect(object : FlowCollector<T> {override suspend fun emit(value: T) = action(value)})

从上面的方法可以看出flow的collect方法会继续调用带参数的collect方法,参数里面的emit方法就是我们外面的实现。

在看看带参数的collect在stateflow里面的实现:

override suspend fun collect(collector: FlowCollector<T>) {val slot = allocateSlot()try {if (collector is SubscribedFlowCollector) collector.onSubscription()val collectorJob = currentCoroutineContext()[Job]var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)// The loop is arranged so that it starts delivering current value without waiting firstwhile (true) {....if (oldState == null || oldState != newState) {//emit is the user actioncollector.emit(NULL.unbox(newState))oldState = newState}....}} finally {freeSlot(slot)}
}

从代码中可以看出:

1.这个方法里面是一个无限循环

2.这个里面在不断的取值,并和之前的值做对比,如果一样的话那么就不会再调用emit方法了。

在这个方法里面假设collector.emit(NULL.unbox(newState)) 执行的时间很长,那么就有可能出现stateflow的值变化之后没有及时取的情况。

Kotlin之SharedFlow和Stateflow相关推荐

  1. Kotlin Flow | SharedFlow和StateFlow详解

    文章目录 Getting Started SharedFlow Handling Shared Events Event Emission With SharedFlow Replay and Buf ...

  2. Kotlin上的反应式流-SharedFlow和StateFlow

    点击上方蓝字关注我,知识会给你力量 在本教程中,你将学习Kotlin中的反应式流,并使用两种类型的流--SharedFlow和StateFlow,构建一个应用程序. 事件流已经成为Android的标准 ...

  3. Kotlin Flow 冷流 StateFlow 热流 StateFlow 的应用

    Flow是冷流.简单来说.如果Flow有了订阅者Collector以后,发射出来的值才会存在内存中, 这和懒加载的概念很像 与之相对的是热流,StateFlow和SharedFlow 是热流,在垃圾回 ...

  4. Kotlin 协程Flow、StateFlow、ShareFlow

    Kotlin 协程Flow.StateFlow.ShareFlow 数据流 数据流以协程为基础构建,可提供多个值.从概念上来讲,数据流是可通过异步方式进行计算处理的一组数据序列.所发出值的类型必须相同 ...

  5. Kotlin鱿鱼游戏大奖赛

    点击上方蓝字关注我,知识会给你力量 鱿鱼游戏来了,现在开始,看看你闯过第几关. 在不借助IDE的情况下,看你的人肉编译器能否编译出正确的结果. Scala-like functions fun hel ...

  6. Android SingleLiveEvent Redux with Kotlin Flow

    点击上方蓝字关注我,知识会给你力量 这个系列我做了协程和Flow开发者的一系列文章的翻译,旨在了解当前协程.Flow.LiveData这样设计的原因,从设计者的角度,发现他们的问题,以及如何解决这些问 ...

  7. Kotlin 之 协程

    初识协程,启动取消协程,Flow异步流,协程并发 目录 (一)初识协程 协程是什么? Android中协程解决了什么问题? 协程的挂起与恢复 挂起和阻塞 协程的调度器 Dispatchers 任务泄露 ...

  8. Android SharedFlow详解

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/121911675 本文出自[赵彦军的博客] 文章目录 系列文章 什么是SharedF ...

  9. 【译】LiveData三连

    点击上方蓝字关注我,知识会给你力量 这个系列我做了协程和Flow开发者的一系列文章的翻译,旨在了解当前协程.Flow.LiveData这样设计的原因,从设计者的角度,发现他们的问题,以及如何解决这些问 ...

最新文章

  1. SAP Retail 寄售门店关键配置
  2. 记工作中的git遇到的问题
  3. 4.1.3 OS之文件目录目录结构(单级-两级-多级-无环图)、索引节点FCB瘦身
  4. 解密NTFS下经EFS加密的文件
  5. java cpu 内存使用情况_java高cpu占用和高内存占用问题排查 (转)
  6. linux安装各种文件格式,Embeded linux中的各类文件系统
  7. 看看什么样的人适合网上开店( 转载)
  8. 如何检查手机上的 App 是不是正版?
  9. 3-26 C++ 学习
  10. AAAI2021-基于对比学习的三元组生成式抽取方法
  11. db2查看数据库端口
  12. [区块链]对话区块链人工智能顶级实力公司ObEN
  13. java一元二次方程用if_用javascript写一个求一元二次方程的页面 用JAVA写一个求解一元二次方程的类...
  14. js原型、原型链、原型链继承详解
  15. 无法使用以下不同的参数继承com.baomidou.mybatisplus.extension.service.IService: <> 和 <com.itheima.rijidao.en
  16. mysql load 导入csv或者unl 如果文件跟表的字段不一致的情况
  17. 各种EDA软件的PCB文件后缀名
  18. bp神经网络和cnn神经网络,RNN神经网络适用于什么
  19. os-level版本控制工具
  20. /proc/cpuinfo 里的 CPU flags

热门文章

  1. anacoda里面安装包显示失败_Photoshop解决安装失败!怎么办?!!!
  2. C# 获取MD5 (MD5计算,MD5小工具)
  3. django-CBVS (转载知乎彧神)
  4. 零中频接收机的问题以及设计解决方案
  5. 香港服务器租用数据真的进入了刷脸时代吗
  6. 互联网成为中国公益活行动的辽阔平台
  7. 【B站弹幕游戏开发笔记01】Win10系统下给Unity项目导入Protobuf
  8. QSV文件格式简单分析
  9. 【无标题】数据仓库-学习
  10. 膝盖中了一箭之康复篇-第八个月暨2月份目标总结