【Android】之 Flow 的使用和浅析
Flow 简介
Google 推荐在 MVVM 架构中使用 Kotlin Flow,可见其发展前景是非常好的。
Kotlin Flow 可以用于替换 Rxjava,也可以用于替换 LiveData,功能十分强大,它是 Kotlin 协程库提供的一部分功能,因此,如果我们项目中已经引用了 Kotlin 协程,则不需要额外引入 Flow 相关的依赖。
在协程中,挂起函数最多仅能返回一个值,而数据流 Flow 可按顺序发出多个值,例如,我们可以通过数据流从数据库中实时接收更新。数据流使用挂起函数通过异步方式生成和使用值,也就是说,数据流可安全地发出网络请求以生成下一个值,而不会阻塞主线程。
数据流 Flow 包含三个重要角色:
- 数据提供方:生成数据,并添加到数据流中
- 中介(可选):可修改发送到数据流的值,或修正数据流本身
- 数据使用方:使用数据流中的值
创建数据流
flow
构建器函数会创建一个新数据流,然后我们可使用 emit
函数将新值发送到数据流中。
val latestNews: Flow<List<NewsData>> = flow {while (true) {val latestNews = newsApi.fetchLatestNews()emit(latestNews)delay(5000)}
}
修改数据流
中介可以利用中间运算符在不使用值的情况下修改数据流,例如:
- filter : 对待操作的值进行过滤
- map :对值进行加工后继续向后传递
- flatMapLatest:转换成一个新的流,需要返回一个转换后的新的流。(如果下个值来了,上一个值变换还没结束,上一个值的转换会被取消)
- onEach:接收到的每一个值
val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews// 先过滤列表数据大于3的数据(大于3才能通过).filter {it.size >= 3}// 对结果进行加工后继续向后传递.map { list ->// 调用 list.filter 进一步筛选出 id==1 的新闻list.filter { it.id == 1 }}// 转换成一个新的流,需要返回一个转换后的新的流。(如果下个值来了,上一个值变换还没结束,上一个值的转换会被取消).flatMapLatest {flow {emit(it)}}.onEach {// todo 获取到筛选后 新闻列表(结果) 数据}
收集数据流
只有在 收集数据流时 才会触发 数据提供方 刷新最新数据。除非使用其他中间运算符指定流,否则数据流始终为冷流并延迟执行。
fun getNewsData() {viewModelScope.launch(Dispatchers.Main) {remoteRepository.news.catch {// todo 收集异常}.collect {// 收到到的数据}}
}
数据流收集可能会由于以下原因而停止:
- 收集数据的协程被取消,此操作也会让 数据提供方 停止活动。
- 数据提供方完成发出数据项。在这种情况下,数据流将关闭,调用
collect
的协程则继续执行。
捕获异常
如需处理异常,可以使用 catch
运算符,如:
fun getNewsData() {viewModelScope.launch {remoteRepository.news.catch {// todo 在这里收集异常}.collect {newsData.value = it}}
}
另外,catch
还可执行 emit
操作,向数据流发出新的数据项,例如,如果我们在 上游 发现了异常,我们可以继续调用 emit 函数发送新的数据(或者之前缓存的数据),如:
class MyRemoteRepository @Inject constructor(private val myRemoteDataSource: MyRemoteDataSource,
) {// 返回 id 等于 1 的新闻val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews.map { news ->// 筛选出 id==1 的新闻news.filter { it.id == 1 }}.onEach {// todo 获取到筛选后 新闻列表(结果) 数据}.catch {// 如果在 上游 收集到异常,我们可以继续调用 emit 函数发送新的数据(或者之前缓存的数据)// 例如:emit(lastCachedNews())}
}
协程作用域切换
默认情况下,flow
上游数据提供方 会基于 下游数据收集方 的协程 CoroutineContext
执行,也就是说,默认情况,下游和上游会运行在同一个协程作用域中。
并且,它无法从不同协程作用域对值执行 emit
操作。
如果需要更改数据流的的协程作用域,可以使用中间运算符 flowOn
运算符。
flowOn
会更改上游数据流的 CoroutineContext
,但不会影响到下游数据流的作用域。
如果有多个 flowOn
运算符,每个运算符都会更改当前位置的上游数据流。
// 上游数据流代码,上游数据流将会在 Dispatchers.IO 作用域上执行:
class MyRemoteRepository @Inject constructor(private val myRemoteDataSource: MyRemoteDataSource,
) {val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews.flowOn(Dispatchers.IO).catch { }
}// 下游数据流代码,下游数据流将会在 Dispatchers.Main 作用域上执行
fun getNewsData() {viewModelScope.launch(Dispatchers.Main) {remoteRepository.news.collect { }}
}
Flow Demo 演示
下面的代码将演示通过 Flow 不断获取最新的新闻列表。
步骤一:创建 News 类,定义新闻格式
data class NewsData(var id: Int, var content: String)
步骤二:创建 NewsApi 接口,用于请求最新的新闻列表
interface NewsApi {/*** 请求最新的新闻列表*/suspend fun fetchLatestNews(): List<NewsData>companion object {fun create(): NewsApi {return NewsApiImpl()}}
}
步骤三:创建 DataSource 类,内部有一个 latestNews 变量,作为 Flow 的上游数据提供者,每隔 5 秒,通过 newsApi 请求新闻数据,并调用 emit 方法将新闻数据发出
class MyRemoteDataSource @Inject constructor(private val newsApi: NewsApi,
) {val latestNews: Flow<List<NewsData>> = flow {while (true) {val latestNews = newsApi.fetchLatestNews()emit(latestNews)delay(5000)}}
}
步骤四:创建 RemoteRepository 类,内部有一个 news 变量,作为 Flow 的数据中间处理者,筛选数据,切换上游作用域,收集上游异常等都可以在这里处理。
class MyRemoteRepository @Inject constructor(private val myRemoteDataSource: MyRemoteDataSource,
) {// 返回 id 等于 1 的新闻val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews.map { news ->// 筛选出 id==1 的新闻news.filter { it.id == 1 }}.onEach {// todo 获取到筛选后 新闻列表(结果) 数据}.flowOn(Dispatchers.IO) // 上游数据流将会在 Dispatchers.IO 作用域上执行.catch {// 如果在 上游 收集到异常,我们可以继续调用 emit 函数发送新的数据(或者之前缓存的数据)// 例如:emit(lastCachedNews())}
}
步骤五:创建 ViewModel 类,定义了一个成员方法 getNewsData() ,作为 Flow 的下游数据接收者,另外,还定义了一个 LiveData 变量,监听最新的新闻数据。
@HiltViewModel
class MainViewModel @Inject constructor(private val remoteRepository: MyRemoteRepository,
) : ViewModel() {val newsData = MutableLiveData<List<NewsData>>()fun getNewsData() {viewModelScope.launch(Dispatchers.Main) {remoteRepository.news.catch {// todo 收集异常}.collect {newsData.value = it}}}
}
步骤六:编写 MainActivity 代码,接受最新的新闻数据并打印出来
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {private val mMainViewModel: MainViewModel by viewModels()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)initObserve()}private fun initObserve() {mMainViewModel.newsData.observe(this) {println("newsData=${Gson().toJson(it)}")}}fun click(view: View) {mMainViewModel.getNewsData()}}
步骤七:创建 NewsApi 的实现类
class NewsApiImpl : NewsApi {override suspend fun fetchLatestNews(): List<NewsData> {val list = ArrayList<NewsData>()list.add(NewsData(1, "news 1"))list.add(NewsData(2, "news 2"))list.add(NewsData(3, "news 3"))return list}
}
步骤八:编写 依赖注入类(di)
@InstallIn(SingletonComponent::class)
@Module
class MainModule {@Singleton@Providesfun provideAppDatabase(@ApplicationContext context: Context): NewsApi {return NewsApi.create()}
}// 另外,别忘了在 Application 中加上 @HiltAndroidApp 注解
@HiltAndroidApp
class MainApplication : Application()
步骤九:由于 demo 使用到了 Hilt ,因此我们需要加上如下依赖:
// Project-build.gradle
buildscript {ext {hiltVersion = '2.41'}dependencies {classpath "com.google.dagger:hilt-android-gradle-plugin:$hiltVersion"}
}// app-build.gradle
plugins {id 'kotlin-kapt'id 'dagger.hilt.android.plugin'
}
dependencies {kapt "com.google.dagger:hilt-android-compiler:$rootProject.hiltVersion"implementation "com.google.dagger:hilt-android:$rootProject.hiltVersion"
}
步骤十:为了能在 activity 或 fragment 中使用 by viewModels()
api,我们还需额外引入以下 依赖:
// app-build.gradle
dependencies {implementation "androidx.activity:activity-ktx:1.4.0"implementation "androidx.fragment:fragment-ktx:1.4.1"
}
StateFlw
StateFlow 是一个状态容器式的可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其 value
属性读取当前状态值。
StateFlow
非常适合需要让可变状态保持可观察的类。
Flow
是冷数据流,而 StateFlow
是热数据流,热流有如下特性:
- 调用
StateFlow.collect
收集数据不会触发任何数据提供方(上游)
的代码 - 上游数据流 如果已经处于活跃(发送)状态,即使没有任何地方调用
StateFlow.collect
,上游流仍会持续活跃(没有 Gc Root 引用自然也会被回收) - 它允许被多个观察者共用 (因此是共享的数据流)
当一个 新的数据接收方
开始从数据流中 collect
数据时,它将接收到信息流中的最近一个状态及任何后续状态。
注意:如果 StateFlow.value 接收的新数据和前一个旧数据一样时,下游并不会接收到数据的更新通知。
StateFlow 和 LiveData
StateFlow
和 LiveData
具有相似之处,两者都是可观察的数据容器类。
但也存在不同之处:
StateFlow
需要将初始状态传递给构造函数,而LiveData
不需要- 当 View 进入
STOPPED
状态时,LiveData.observe()
会自动取消注册使用方,但是从StateFlow
或任何其他数据流收集数据的操作并不会自动停止,如需实现与 LiveData 相同的行为,则需从Lifecycle.repeatOnLifecycle
块中收集数据流
StateFlow
的简单用法如下:
// ViewModel: (_uiState.value更新的地方属于上游)
@HiltViewModel
class MainViewModel @Inject constructor(private val remoteRepository: MyRemoteRepository,
) : ViewModel() {// 定义一个私有的 MutableStateFlow 变量(可变)private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))// UI 从此 StateFlow 收集以获取其状态更新val uiState: StateFlow<LatestNewsUiState> = _uiStatefun getNewsData() {viewModelScope.launch {remoteRepository.news.collect {// 接受到最新的新闻列表数据后,将数据赋值给 StateFlow 的 value_uiState.value = LatestNewsUiState.Success(it)}}}
}// Activity: (mMainViewModel.uiState.collect的地方属于下游)
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {private val mMainViewModel: MainViewModel by viewModels()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)initObserve()}private fun initObserve() {mMainViewModel.viewModelScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {mMainViewModel.uiState.collect {// 注意:如果 StateFlow.value 接收的新数据和前一个旧数据一样时,下游并不会接收到数据的更新通知when (it) {is LatestNewsUiState.Success -> {println("获取新闻成功,news=${Gson().toJson(it.news)}")}is LatestNewsUiState.Error -> {println("获取新闻失败,error=${Gson().toJson(it.exception)}")}}}}}}fun click(view: View) {mMainViewModel.getNewsData()}}
如下代码中,负责更新 MutableStateFlow
的类是 数据提供方(上游)
,从 StateFlow.collect
的类是 数据使用方(下游)
。
另外,repeatOnLifecycle
能使界面处于活跃状态下才会更新界面,要使用该 api,还需引入以下依赖:
// Project-build.gradle
buildscript {ext {lifecycleVersion = '2.4.1'}
}// app-build.gradle
dependencies {implementation "androidx.lifecycle:lifecycle-runtime-ktx:$rootProject.lifecycleVersion"
}
Flow 转为 StateFlow
如需将任何数据流转换为 StateFlow
,可以使用 stateIn
中间运算符。
stateIn
有两个重载函数,一般我们用第二个:
// 函数1:
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>// 函数2:
public fun <T> Flow<T>.stateIn(scope: CoroutineScope,started: SharingStarted,initialValue: T
): StateFlow<T>
其中函数1是一个挂起函数,且仅需要传一个 scope 参数既可,函数2是非挂起函数,需要传递三个参数,三个参数的含义如下:
1、scope
:共享流开始时所在的协程作用域范围
2、started
:控制共享的开始和结束的策略
3、initialValue
: 流的初始值
而 started
有三种取值可选:
SharingStarted.Eagerly
:立即启动上游数据流,且在scope
指定的作用域被结束时终止上游流SharingStarted.Lazily
:在第一个订阅者出现后开始启动上游数据流,且在scope
指定的作用域被结束时终止上游流SharingStarted.WhileSubscribed(stopTimeoutMillis)
:在第一个订阅者出现后开始启动上游数据流,没有下游收集的情况下,指定时间后(默认是0)会取消上游数据流
stateIn
使用如下:
// ViewModel:
@HiltViewModel
class MainViewModel @Inject constructor(private val remoteRepository: MyRemoteRepository,
) : ViewModel() {// 将 news(Flow冷流) 转为 StateFlow 热流val mStateFlow: StateFlow<List<NewsData>> = remoteRepository.news.stateIn(scope = viewModelScope,started = WhileSubscribed(5000),initialValue = emptyList())
}// Activity:
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {private val mMainViewModel: MainViewModel by viewModels()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)}// 点击开始收集上游数据fun click(view: View) {mMainViewModel.viewModelScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {mMainViewModel.mStateFlow.collect {println("收集到数据,it=${Gson().toJson(it)}")}}}}
}
例子中的 ViewModel,定义了一个变量,将 Flow 冷流通过 stateIn 操作符转为 StateFlow ,该 StateFlow 在第一个订阅者出现后开始启动上游数据流,没有下游收集的情况下,会在 5秒 后取消上游数据流,另外,initialValue 初始值设置为 一个空列表 。在 MainActivity 的代码中,点击按钮将会触发 上游流开始发送数据,同时下游流也开始接收数据。
WhileSubscribed
传入了 5000 ,是为了实现等待5
秒后仍然没有订阅者存在就终止协程的功能,这个方法有以下功能:
- 应用转至后台运行后,5 秒钟后所有来自其他层的数据更新会停止,这样可以节省电量
- 在屏幕旋转时,因为重新订阅的时间在5s内,因此上游流不会中止
SharedFlow
SharedFlow
配置更为灵活,支持配置replay
,缓冲区大小等,StateFlow
是SharedFlow
的特化版本,replay
固定为1,缓冲区大小默认为0
我们可使用 shareIn
函数会返回一个热数据流 SharedFlow
, SharedFlow
会 向从其所有的 数据接收方(下游)
发出数据。
我们先看下 ShareFlow 的构造函数:
public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
其主要有3个参数:
replay
:有新的订阅者Collect
时,发送几个已经发送过的数据给它,默认为0extraBufferCapacity
:除了replay
,SharedFlow
还缓存多少数据,默认为0onBufferOverflow
:表示缓存策略,即缓冲区满了之后ShareFlow
如何处理,默认为挂起
StateFolw 和 SharedFlow 的区别
StateFolw
和 SharedFlow
都属于热流。
StateFlow
本质上是一个 replay
为 1,且没有缓冲区的 SharedFlow
,因此第一次订阅时会先获得默认值。
StateFlow
仅在值已更新,并且值发生了变化时才会返回,也就是说如果更新后的值没有变化,Collect
方法不会回调,但是 ShareFlow
是会回调的。
下面举个简单的使用 SharedFlow 的例子:
// ViewModel:
@HiltViewModel
class MainViewModel @Inject constructor(private val remoteRepository: MyRemoteRepository,
) : ViewModel() {// 定义一个私有的 MutableSharedFlow 变量(可变),当有新的订阅者时,会先发送1个之前发送过的数据给订阅者private val mMutableSharedFlow = MutableSharedFlow<List<NewsData>>(replay = 1)// 不可变的 shareFlowval shareFlow: SharedFlow<List<NewsData>> = mMutableSharedFlowfun getNewsData() {viewModelScope.launch {remoteRepository.news.collect {mMutableSharedFlow.emit(it)}}}
}// Activity:
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {private val mMainViewModel: MainViewModel by viewModels()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)}// ShareFlow 开始发送数据 fun click(view: View) {mMainViewModel.getNewsData()}// 收集 ShareFlow 发送的数据fun click2(view: View) {mMainViewModel.viewModelScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {mMainViewModel.shareFlow.collect {println("获取到的新闻,news=${Gson().toJson(it)}")}}}}}
该例子中,ViewModel 中定义了一个 ShareFlow,并将其 replay 参数设置为 1 ,即当有新的订阅者时,会先发送 1 个之前发送过的数据给订阅者。在Activity 中,有两个按钮,按钮 1 触发 ShareFlow 上游开始发送数据, 按钮 2 触发 下游收集数据,按钮 2 按下后,下游会先收集到 1 个之前发送过的数据。
Flow 转为 SharedFlow
如需将任何数据流转换为 SharedFlow
,可以使用 ShareIn
中间运算符:
public fun <T> Flow<T>.shareIn(scope: CoroutineScope,started: SharingStarted,replay: Int = 0
): SharedFlow<T>
ShareIn
函数有三个参数:
- scope:共享流开始时所在的协程作用域范围
- started:控制共享的开始和结束的策略
- replay:有新的订阅者
Collect
时,发送几个已经发送过的数据给它,默认为0
举个简单的例子(跟 StateIn 的例子很像):
// ViewModel:
@HiltViewModel
class MainViewModel @Inject constructor(private val remoteRepository: MyRemoteRepository,
) : ViewModel() {// 将 news(Flow冷流) 转为 SharedFlow 热流val mSharedFlow: SharedFlow<List<NewsData>> = remoteRepository.news.shareIn(scope = viewModelScope,started = WhileSubscribed(5000),replay = 1)
}// Activity:
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {private val mMainViewModel: MainViewModel by viewModels()override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceState)setContentView(R.layout.activity_main)}fun click(view: View) {// 点击开始收集上游数据mMainViewModel.viewModelScope.launch {repeatOnLifecycle(Lifecycle.State.STARTED) {mMainViewModel.mSharedFlow.collect {println("收集到的新闻数据,news=${Gson().toJson(it)}")}}}}
}
例子中的 ViewModel,定义了一个变量,将 Flow 冷流通过 shareIn 操作符转为 SharedFlow ,该 SharedFlow 在第一个订阅者出现后开始启动上游数据流,没有下游收集的情况下,会在 5秒 后取消上游数据流,另外,replay设置为 1 ,当有下游收集者时,会将之前发过的最近一个值发给下游收集者。在 MainActivity 的代码中,点击按钮将会触发 上游流开始发送数据,同时下游流也开始接收数据。
【Android】之 Flow 的使用和浅析相关推荐
- Android Kotlin Flow 如何使用callbackflow
转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/121840157 本文出自[赵彦军的博客] callbackFlow 原理 call ...
- Android 系统(56)---深入浅析Android坐标系统
深入浅析Android坐标系统 1 背景 去年有很多人私信告诉我让说说自定义控件,其实通观网络上的很多博客都在讲各种自定义控件,但是大多数都是授之以鱼,却很少有较为系统性授之于渔的文章,同时由于自己也 ...
- android wifi flow
复习并记录下android wifi相关知识,因为下载的是androidp 的source code,因此codebase就用androidP的版本,复习整个android wifi的flow,我大概 ...
- [Android] AS 中 Gradle 配置运行浅析
既然是浅析,自然也就没有深入的地方,我也写不出深入的地方,自己没有用过,也不会写出来坑人的:仅仅就是在 Android Studio 中的 Gradle 使用配置而已. Gradle Gradle 是 ...
- Android系统Google Maps开发实例浅析
Google Map(谷歌地图)是Google公司提供的电子地图服务.包括了三种视图:矢量地图.卫星图片.地形地图.对于Android系统来说,可以利用Google提供的地图服务来开发自己的一些应用. ...
- Android开发学习笔记:对话框浅析
对话框式程序运行中弹出的窗口.Android系统中有四种默认的对话框:警告对话框AlertDialog.进度对话框ProgressDialog.日期选择对话框DatePickerDialog以及时间选 ...
- android stackover flow problem
2019独角兽企业重金招聘Python工程师标准>>> 在做android UI 的时候,遇到了一个问题,因为不同的UI之间需要相互切换.所以不加思索的写了下面的程式 public ...
- android actviity模糊,Framework启动过程浅析
浅显的总结一下Framework启动大概过程 总体 Android底层是linux系统,因而在开机时仍然是运行天字第一号进程inti,读取init.rc来创建第一个Dalvik进程zygote,下面是 ...
- Android RTC 自下往上浅析
1.首先搞清楚RTC在kernel内的作用: linux系统有两个时钟:一个是由主板电池驱动的"Real Time Clock"也叫做RTC或者叫CMOS时钟,硬件时钟.当操作系统 ...
最新文章
- 北美欧洲顶级大咖齐聚,在这里读懂 AIoT 未来!
- 最新OPhone 开发官网
- wasm + ffmpeg实现前端截取视频帧功能
- C和指针之动态内存分配之(编写calloc函数,函数内部使用malloc函数来获取内存)
- 【Python】【Python库】Python3.7.2 - 字符串str类 (2)
- 非结构化文件转移服务器,非结构化数据存储管理方法,服务器和系统 Unstructured data storage management method, and a system server...
- 虚拟机Linux(Centos)上用户密码忘记了怎么办?
- python数组初始化_python怎么初始化数组
- 国企转型----北京市供销社探索大数据之路!
- 高大上必备!D3.js对产品的贡献度剖析
- SpringBoot整合ThymeLeaf前后端分离使用案例
- R_Studio(关联)对Groceries数据集进行关联分析
- 163vip邮箱登录,163邮箱怎么登陆?如何登录163vip邮箱?
- c++基础题:判断奇偶数
- lambda表达式的3种写法
- Excel拆分字符判断是否有汉字
- stata学习笔记|异方差问题
- mysql客户端如何登录_MySQL-客户端登录问题
- 专有钉钉下载(windows、IOS、Android)地址
- 粮食行业视频监控系统互联互通技术规范
热门文章
- java有阴历年算法吗_中国农历算法java实现
- PDMS插件_三维地形工具
- 求一元二次方的根(虚根求法)
- 【论文笔记】Deep Reinforcement Learning for Robotic Pushing and Picking in Cluttered Environment
- JavaScript最新面试题
- 如何批量提取过期域名,如何批量查询权重域名、收录域名
- 【温故而知新】JavaWEB回顾(八)
- java 手动触发gc_java触发full gc的几种情况整理
- SpringBoot里参数校验/参数验证
- JSD-2204-Vue-ElementUI-Day06