转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/117370700
本文出自【赵彦军的博客】

文章目录

  • 往期精彩文章
  • flow 是啥
  • flow咋用?
  • 创建flow
  • map 操作符
    • 例子1
    • 例子2
  • catch 捕获上游出现的异常
    • 例子1:不捕获异常
    • 例子2:捕获异常
  • 取消flow
  • collectIndexed
  • distinctUntilChanged 过滤重复的值
  • transform
  • filter 过滤
  • flowOn 数据发射的线程
  • launchIn
  • conflate()
  • withIndex
  • onEach
  • onStart 在数据发射之前触发
  • onCompletion
  • drop(n) 忽略最开始释放的值
  • dropWhile
  • sample
  • take 取指定数量的数据
  • takeWhile
  • 实现一个定时器功能

往期精彩文章

Kotlin实战指南十九:use 函数魔法
Kotlin实战指南十八:open、internal 关键字使用
Kotlin实战指南十七:JvmField、JvmStatic使用

flow 是啥

按顺序发出值并正常完成或异常完成的冷流异步数据流

flow咋用?

flow 是kotlin coroutines 库里面的类,所以使用 flow 之前,要确保添加了协程依赖

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.0"

引入依赖后,我们写一段代码:

       flow {emit(1)  //发射数字 1emit(2)  //发射数字 2}.collect {//接收结果Log.d("flow-", "value $it")}

如果你这样写就会报错

意思是:collect 方法是 suspend 修饰的挂起函数,只能在协程里,或者其他挂起函数中使用。我们来修改一下:

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flow {emit(1)  //发射数字 1emit(2)  //发射数字 2}.collect {//接收结果Log.d("flow-", "value: $it")}}}
}

输出结果:

D/flow-: value: 1
D/flow-: value: 2

创建flow

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {val flow = flow {emit(1)emit(2)}val flow2 = flowOf(1, 2.3)val flow3 = mutableListOf(1, 2, 3, 4, 5).asFlow()val flow4 = (1..4).asFlow()}}
}

map 操作符

例子1

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flow {emit(1)  //发射数字 1emit(2)  //发射数字 2}.map {"map $it"}.collect {//接收结果Log.d("flow-", "value: $it")}}}
}

输出结果:

D/flow-: value: map 1
D/flow-: value: map 2

例子2

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow: Flow<String> = flow {emit(1)  //发射数字 1emit(2)  //发射数字 2}.map {"map $it"}GlobalScope.launch {flow.collect {//接收结果Log.d("flow-", "value: $it")}}}
}
输出结果:
```java
D/flow-: value: map 1
D/flow-: value: map 2

在接收数据的时候,我们用了 collect 方法,collect 英文含义就是收集的意思

catch 捕获上游出现的异常

当 flow 流操作中发生异常的情况时,程序会发生崩溃:

例子1:不捕获异常

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow: Flow<String> = flow {emit(1)  //发射数字 1emit(2)  //发射数字 2}.map {0 / 0  //人为制造异常"map $it"}GlobalScope.launch {flow.collect {//接收结果Log.d("flow-", "value: $it")}}}
}

输出结果:程序发生崩溃

例子2:捕获异常

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow: Flow<String> = flow {emit(1)  //发射数字 1emit(2)  //发射数字 2}.map {0 / 0  //人为制造异常"map $it"}GlobalScope.launch {flow.catch {//捕获异常Log.d("flow-", "value: ${it.message}")}.collect {//接收结果Log.d("flow-", "value: $it")}}}
}

输出结果:

D/flow-: exception: divide by zero

可以看到程序可以正常运行,我们也正常捕获了异常

取消flow

Flow创建后并不返回可以cancel的句柄,但是一个flow的collect是suspend的,所以可以像取消一个suspend方法一样取消flow的collection。

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val job = GlobalScope.launch {flow {emit(1)kotlinx.coroutines.delay(3000)emit(2)kotlinx.coroutines.delay(3000)emit(3)}.collect {//接收结果Log.d("flow-", "value: $it")}}findViewById<Button>(R.id.cancel).setOnClickListener {job.cancel()}}
}

collectIndexed

输出带有索引的结果

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flow {Log.d("flow-", "init")emit(1)emit(2)}.collectIndexed { index, value ->Log.d("flow-", "onEach $index $value")}}}
}//输出结果
D/flow-: init
D/flow-: onEach 0 1
D/flow-: onEach 1 2

distinctUntilChanged 过滤重复的值

如果生产的值和上个发送的值相同,值就会被过滤掉

flow {emit(1)emit(1)emit(2)emit(2)emit(3)emit(4)
}.distinctUntilChanged()// 结果:1 2 3 4
// 解释:
// 第一个1被释放
// 第二个1由于和第一个1相同,被过滤掉
// 第一个2被释放
// 第二个2由于和第一个2相同,被过滤掉
// 第一个3被释放
// 第一个4被释放

可以传参(old: T, new: T) -> Boolean,进行自定义的比较

private class Person(val age: Int, val name: String)flow {emit(Person(20, "张三"))emit(Person(21, "李四"))emit(Person(21, "王五"))emit(Person(22, "赵六"))
}.distinctUntilChanged{old, new -> old.age == new.age }
.collect{ value -> println(value.name) }// 结果:张三 李四 赵六
// 解释:本例子定义如果年龄相同就认为是相同的值,所以王五被过滤掉了

可以用 distinctUntilChangedBy转换成年龄进行对比

flow {emit(Person(20, "张三"))emit(Person(21, "李四"))emit(Person(21, "王五"))emit(Person(22, "赵六"))
}.distinctUntilChangedBy { person -> person.age }// 结果:张三 李四 赵六

transform

对每个值进行转换,用一个新的 FlowCollector 发射数据

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flow {emit(1)emit(2)emit(3)emit(4)emit(5)}.transform {if (it % 2 == 0) {emit("value-$it")}}.collect {Log.d("flow-", "collect $it")}}}
}//输出结果
D/flow-: collect value-2
D/flow-: collect value-4

filter 过滤

val list = mutableListOf(1, 2, 3, 4, 5)val job = GlobalScope.launch {list.asFlow().filter {it > 3}.collect {Log.d("flow-", "value: $it")}
}

输出结果:

 D/flow-: value: 4D/flow-: value: 5

类似的还有 filterNot 反向过滤,这里就不举例子了

flowOn 数据发射的线程

可以切换CoroutineContext

说明:flowOn只影响该运算符之前的CoroutineContext,对它之后的CoroutineContext没有任何影响
我们先看一下默认情况下的线程问题

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow = flow {Log.d("flow-", "init->thread:${Thread.currentThread().name}")emit(1)emit(2)}val job = GlobalScope.launch {flow.collect {Log.d("flow-", "collect->thread:${Thread.currentThread().name} value: $it")}}}
}

输出结果

D/flow-: init->thread:DefaultDispatcher-worker-1
D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 1
D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 2

我们可以看到默认情况下,flow 数据发射的线程就是当前协程所在的线程。

我们可以用 flowOn 方法指定数据发射的线程


class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow = flow {Log.d("flow-", "init->thread:${Thread.currentThread().name}")emit(1)emit(2)}GlobalScope.launch {flow.flowOn(Dispatchers.Main) //指定数数据发射的线程为主线程.collect {Log.d("flow-", "collect->thread:${Thread.currentThread().name} value: $it")}}}
}

输出结果

D/flow-: init->thread:main
D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 1
D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 2

可以看到数据发射的线程已经被切换到了主线程

多个操作符的情况下,如下

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {flowDemo()}}suspend fun flowDemo() {flow {Log.d("flow-", "init ${Thread.currentThread().name}")emit(1)}.map {Log.d("flow-", "map ${Thread.currentThread().name}")it}.flowOn(Dispatchers.Main).collect {Log.d("flow-", "collect ${Thread.currentThread().name} value:$it")}}
}
D/flow-: init main
D/flow-: map main
D/flow-: collect DefaultDispatcher-worker-1 value:1

launchIn

scope.launch { flow.collect() }的缩写, 代表在某个协程上下文环境中去接收释放的值

例子:

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {val flow = flow {Log.d("flow-", "init")emit(1)emit(2)}.onEach {Log.d("flow-", "onEach $it")}flow.launchIn(this)}}
}

输出结果:

D/flow-: init
D/flow-: onEach 1
D/flow-: onEach 2

conflate()

如果值的生产速度大于值的消耗速度,就忽略掉中间未来得及处理的值,只处理最新的值。

val flow1 = flow {delay(2000)emit(1)delay(2000)emit(2)delay(2000)emit(3)delay(2000)emit(4)
}.conflate()flow1.collect { value ->println(value)delay(5000)
}// 结果: 1 3 4
// 解释:
// 2000毫秒后生产了1这个值,交由collect执行,花费了5000毫秒,当1这个值执行collect完成后已经经过了7000毫秒。
// 这7000毫秒中,生产了2,但是collect还没执行完成又生产了3,所以7000毫秒以后会直接执行3的collect方法,忽略了2这个值
// collect执行完3后,还有一个4,继续执行。

withIndex

将值封装成IndexedValue对象

flow {emit(1)emit(2)emit(3)emit(4)
}.withIndex()// 结果:
// I/System.out: IndexedValue(index=0, value=1)
// I/System.out: IndexedValue(index=1, value=2)
// I/System.out: IndexedValue(index=2, value=3)
// I/System.out: IndexedValue(index=3, value=4)

onEach

每个值释放的时候可以执行的一段代码


onEach 函数,执行一段代码后,再释放值

flow {emit(1)emit(2)emit(3)emit(4)
}.onEach { println("接收到$it") }// 结果:
I/System.out: 接收到1
I/System.out: 1
I/System.out: 接收到2
I/System.out: 2
I/System.out: 接收到3
I/System.out: 3
I/System.out: 接收到4
I/System.out: 4

onStart 在数据发射之前触发

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow = flow {Log.d("flow-", "init->thread:${Thread.currentThread().name}")emit(1)emit(2)emit(3)}GlobalScope.launch {flow.onStart {Log.d("flow-", "onStart->thread:${Thread.currentThread().name}")}.flowOn(Dispatchers.Main) //指定数数据发射的线程为主线程.collect {Log.d("flow-", "collect->thread:${Thread.currentThread().name} value: $it")}}}
}

输出结果:

D/flow-: onStart->thread:main
D/flow-: init->thread:main
D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 1
D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 2
D/flow-: collect->thread:DefaultDispatcher-worker-1 value: 3

结论:
onStart 方法在数据发射之前调用,onStart 所在的线程是数据产生的线程。

onCompletion

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)GlobalScope.launch {val flow = flow {Log.d("flow-", "init ${Thread.currentThread().name}")emit(1)emit(2)}flow.onStart {Log.d("flow-", "onStart ${Thread.currentThread().name}")}.onCompletion {Log.d("flow-", "onCompletion ${Thread.currentThread().name}")}.collect {Log.d("flow-", "collect ${Thread.currentThread().name} value:$it")}}}
}

输出结果:

D/flow-: onStart DefaultDispatcher-worker-1
D/flow-: init DefaultDispatcher-worker-1
D/flow-: collect DefaultDispatcher-worker-1 value:1
D/flow-: collect DefaultDispatcher-worker-1 value:2
D/flow-: onCompletion DefaultDispatcher-worker-1

drop(n) 忽略最开始释放的值

flow {emit(1)emit(2)emit(3)emit(4)
}.drop(2)// 结果:3 4
// 解释:
// 最开始释放的两个值(1,2)被忽略了

dropWhile

判断第一个值如果满足(T) -> Boolean这个条件就忽略

flow {emit(1)emit(2)emit(3)emit(4)
}.dropWhile {it % 2 == 0
}// 结果:1 2 3 4
// 解释:
// 第一个值不是偶数,所以1被释放flow {emit(1)emit(2)emit(3)emit(4)
}.dropWhile {it % 2 != 0
}// 结果:2 3 4
// 解释:
// 第一个值是偶数,所以1被忽略

sample

如果有一個資料來源大概每200ms 就會丟出來数据,但是在更新UI 的时候,我们不需要那么频繁的更新UI , 就需要用到采样。比如每 2 秒更新一次。

take 取指定数量的数据

class MainActivity : AppCompatActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)val flow = flow {emit(1)emit(2)emit(3)}GlobalScope.launch {flow.take(2).flowOn(Dispatchers.Main) //指定数数据发射的线程为主线程.collect {Log.d("flow-", "collect->thread:${Thread.currentThread().name} value: $it")}}}
}

输出结果:

D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 1
D/flow-: collect->thread:DefaultDispatcher-worker-2 value: 2

takeWhile

只会释放第一个值,但也不是必须释放,要看条件

判断第一个值如果满足(T) -> Boolean这个条件就释放

flow {emit(1)emit(2)emit(3)emit(4)
}.takeWhile { it%2 != 0 }// 结果:1
// 解释:
// 第一个值满足是奇数条件flow {emit(1)emit(2)emit(3)emit(4)
}.takeWhile { it%2 == 0 }// 结果:无
// 解释:
// 第一个值不满足是奇数条件

实现一个定时器功能

GlobalScope.launch {val flow = flow {while (true) {emit(0)//每隔一秒产生一个数据delay(1000)}}flow.collect {UtilLog.d(TAG, "定时任务..")}
}

Kotlin实战指南二十:flow相关推荐

  1. Kotlin实战指南二:变量、常量、静态常量

    转载请标明出处:https://blog.csdn.net/zhaoyanjun6/article/details/87811333 本文出自[赵彦军的博客] Kotlin初体验二:变量.常量.静态常 ...

  2. Kotlin实战指南十九:use 函数魔法

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/117366756 本文出自[赵彦军的博客] 文章目录 往期精彩文章 use函数 往期 ...

  3. Kotlin实战指南十八:open、internal 关键字使用

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/117365712 本文出自[赵彦军的博客] 文章目录 往期精彩文章 open关键字 ...

  4. Kotlin实战指南十七:JvmField、JvmStatic使用

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/116668666 本文出自[赵彦军的博客] 文章目录 往期精彩文章 @JvmFiel ...

  5. (转载)Android项目实战(二十八):使用Zxing实现二维码及优化实例

    Android项目实战(二十八):使用Zxing实现二维码及优化实例 作者:听着music睡 字体:[增加 减小] 类型:转载 时间:2016-11-21 我要评论 这篇文章主要介绍了Android项 ...

  6. Android项目实战(二十二):启动另一个APP or 重启本APP

    Android项目实战(二十二):启动另一个APP or 重启本APP 原文:Android项目实战(二十二):启动另一个APP or 重启本APP 一.启动另一个APP 目前公司项目需求,一个主AP ...

  7. Android项目实战(二十):浅谈ListView悬浮头部展现效果

    Android项目实战(二十):浅谈ListView悬浮头部展现效果 原文:Android项目实战(二十):浅谈ListView悬浮头部展现效果 先看下效果:需求是 滑动列表 ,其中一部分视图(粉丝数 ...

  8. Vue实战篇二十八:实现一个手机版的购物车

    系列文章目录 Vue基础篇一:编写第一个Vue程序 Vue基础篇二:Vue组件的核心概念 Vue基础篇三:Vue的计算属性与侦听器 Vue基础篇四:Vue的生命周期(秒杀案例实战) Vue基础篇五:V ...

  9. 转:Vim实战指南(二):光标移动技巧

    原文地址:Vim实战指南(二):光标移动技巧 Introduction 提升Vim/vi的打字效率的一个技巧就是快速移动光标.或许你觉得这不值一提,用hjkl或者上下左右也能移动,不过相信我,我下面要 ...

最新文章

  1. linux 脚本 符号,Shell脚本 入门 —— 符号篇
  2. 重大BUG:你的淘宝双十一订单可能多付钱了!
  3. 依赖: ros-melodic-desktop 但是它将不会被安装_npm系列之依赖管理
  4. 批处理:修改COM端口号
  5. liux 常用操作命令
  6. js文件位置--为甚有些js必须放在尾部
  7. 公司里面用的iTextSharp(教程)---关于PDF的属性设置
  8. java 取色器_Arava: 用 swing 写一个取色器
  9. STM32F107+LAN8720A使用STM32cubeMX配置网络连接+tcp主从机+UDP app
  10. vue使用openlayers描边中国地图
  11. 深圳自己做网站 服务器,深圳自己做网站 服务器
  12. Java Swing写的支持合并单元格的JTable
  13. 深入理解计算机系统arch lab
  14. calc()语法规则
  15. Unity3D学习 ③ 摄像机视角跟随
  16. 苹果MacBook电脑应用优化利器CleanMyMac X
  17. 构建高并发高可用的电商平台架构实践(一)
  18. 机器人中的 jog 是什么意思?
  19. 【HTML 教程】iframe
  20. 【机器学习面经】实验室祖传机器学习重难点(第一弹)

热门文章

  1. 单调不减序列查询第一个大于等于_[力扣84,85] 单调栈
  2. 五十六、TodoList的三种写法,祭奠我的前端之路
  3. keras从入门到放弃(十)手写数字识别训练
  4. 关于无法加载sass 模块问题。vue2.0中报错ERROR :scss can't resolve 'scss-loader'
  5. 直播 | WWW 2021论文解读:基于隐私保护的模型联邦个性化
  6. 论NLP可解释的评估:什么才是“好”的解释?
  7. 从动力学角度看优化算法:为什么学习率不宜过小?
  8. 如何使用知识图谱增强信息检索模型?
  9. 数字图像处理与Python实现笔记之图像特征提取
  10. 乘法逆元总结 3种基本方法