一、前言

  • ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
  • 即创建一个和 Observable 具有相同生命周期的 disposable 资源。

  • 可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。
  • 如下所示:
class MyDisposables: Disposable {func dispose() {print("dispose")}
}......let _ = Observable.using({ () -> MyDisposables inreturn MyDisposables()}) { _ inreturn Observable<Int>.interval(1, scheduler: MainScheduler.instance).take(5)}.subscribe(onNext: {print($0)})
  • 执行结果:
0
1
2
3
4
dispose
  • 可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。

二、监听 Obervable

  • RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
let signingIn = ActivityIndicator()
self.signingIn = signingIn.asObservable()let usernameAndPassword = Observable.combineLatest(input.username, input.password) { ($0, $1) }signedIn = input.loginTaps.withLatestFrom(usernameAndPassword).flatMapLatest { (username, password) inreturn API.signup(username, password: password).observeOn(MainScheduler.instance).catchErrorJustReturn(false).trackActivity(signingIn)}.flatMapLatest { loggedIn -> Observable<Bool> inlet message = loggedIn ? "Mock: Signed in to GitHub." : "Mock: Sign in to GitHub failed"return wireframe.promptFor(message, cancelAction: "OK", actions: [])// propagate original value.map { _ inloggedIn}}.shareReplay(1)
  • signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下:
    • 按下登录按钮;
    • 使用当前用户名及密码进行登录;
    • 展示登录结果。
  • Rx 可观察对象的交互如下:

  • 上面的 GitHub 登录涉及到 Rx 的相关操作:
    • combineLatest:合并最后的 username 和 password,形成一个新的 Observable;
    • withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。

三、如何实现监听?

  • 上面的 GitHub 登录涉及到记录开始登录的操作如下,那么它是如何监听到当前是否正在登录呢?
......API.signup(username, password: password)
.observeOn(MainScheduler.instance)
.catchErrorJustReturn(false)
.trackActivity(signingIn)......public extension ObservableConvertibleType {public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<E> {return activityIndicator.trackActivityOfObservable(self)}
}
  • 可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?
  • 最直白的想法应该如下:
    • 设置当前状态为正在执行登录;
    • 执行登录操作;
    • 设置当前状态为没有执行登录。
  • 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆操作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情:
    • 保留登录动作 ObservableA,返回自定义的一个 ObservableB;
    • 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录;
    • 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA;
    • 登录操作执行完毕后,设置当前状态为没有执行登录。
  • signingIn 所属类 ActivityIndicator 的实现如下:
public class ActivityIndicator : DriverConvertibleType {public typealias E = Boolprivate let _lock = NSRecursiveLock()private let _variable = Variable(0)private let _loading: Driver<Bool>public init() {_loading = _variable.asDriver().map { $0 > 0 }.distinctUntilChanged()}fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E> {return Observable.using({ () -> ActivityToken<O.E> inself.increment()return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)}) { t inreturn t.asObservable()}}private func increment() {_lock.lock()_variable.value = _variable.value + 1_lock.unlock()}private func decrement() {_lock.lock()_variable.value = _variable.value - 1_lock.unlock()}public func asDriver() -> Driver<E> {return _loading}
}
  • _variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。
  • Variable 的示例如下:
let v = Variable(0)
v.asObservable().subscribe(onNext: {print($0)})v.value = 1
v.value = 2
  • 执行结果:
0
1
2
  • 继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading = _variable.asDriver().map { $0 > 0 }.distinctUntilChanged()
  • 其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。
  • 重点还是在 trackActivityOfObservable 方法:
fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E> {return Observable.using({ () -> ActivityToken<O.E> inself.increment()return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)}) { t inreturn t.asObservable()}
}
  • 对应的 resourceFactory 如下:
{ () -> ActivityToken<O.E> inself.increment()return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)}
  • observableFactory 如下:
{ t inreturn t.asObservable()}
  • ActivityToken 的实现如下:
private struct ActivityToken<E> : ObservableConvertibleType, Disposable {private let _source: Observable<E>private let _dispose: Cancelableinit(source: Observable<E>, disposeAction: @escaping () -> ()) {_source = source_dispose = Disposables.create(with: disposeAction)}func dispose() {_dispose.dispose()}func asObservable() -> Observable<E> {return _source}
}
  • 可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下操作:
    • 增加当前正在执行的动作数;
    • 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。
  • 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的操作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。
  • 监听当前是否正在登录就是通过 using 操作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。

四、using 内部实现

  • using 的内部实现:
public static func using<R: Disposable>(_ resourceFactory: @escaping () throws -> R, observableFactory: @escaping (R) throws -> Observable<E>) -> Observable<E> {return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)}
  • using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
class Using<SourceType, ResourceType: Disposable>: Producer<SourceType> {typealias E = SourceTypetypealias ResourceFactory = () throws -> ResourceTypetypealias ObservableFactory = (ResourceType) throws -> Observable<SourceType>fileprivate let _resourceFactory: ResourceFactoryfileprivate let _observableFactory: ObservableFactoryinit(resourceFactory: @escaping ResourceFactory, observableFactory: @escaping ObservableFactory) {_resourceFactory = resourceFactory_observableFactory = observableFactory}override func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {let sink = UsingSink(parent: self, observer: observer)sink.disposable = sink.run()return sink}
}
  • Producer 的实现如下:
class Producer<Element> : Observable<Element> {override init() {super.init()}override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {if !CurrentThreadScheduler.isScheduleRequired {return run(observer)}else {return CurrentThreadScheduler.instance.schedule(()) { _ inreturn self.run(observer)}}}func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {abstractMethod()}
}
  • Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType> : Sink<O>, ObserverType where O.E == SourceType {typealias Parent = Using<SourceType, ResourceType>typealias E = O.Eprivate let _parent: Parentinit(parent: Parent, observer: O) {_parent = parentsuper.init(observer: observer)}func run() -> Disposable {var disposable = Disposables.create()do {let resource = try _parent._resourceFactory()disposable = resourcelet source = try _parent._observableFactory(resource)return Disposables.create(source.subscribe(self),disposable)} catch let error {return Disposables.create(Observable.error(error).subscribe(self),disposable)}}func on(_ event: Event<E>) {switch event {case let .next(value):forwardOn(.next(value))case let .error(error):forwardOn(.error(error))dispose()case .completed:forwardOn(.completed)dispose()}}
}
  • 可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。
  • 实际上 UsingSink 只是在 run 中做了两件特殊的事情:
    • 在让 source 订阅自身前,创建 resource(一般会在这里做额外的操作);
    • 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的操作比如 map、flatMap 等都是由上游给的)。

RxSwift之深入解析Using操作的应用和原理相关推荐

  1. RxSwift之深入解析dispose源码的实现原理

    一.前言 任何对象都有生命周期,有创建就要销毁.OC 中有 init 和 dealloc,swift 有 init 和 deinit,RxSwift 也不例外,有 create 和 dispose. ...

  2. RxSwift之深入解析Subject的使用和实现原理

    一.Subject RxSwift 的核心逻辑 Observable 不具备发送事件的能力,创建一个 Observable 的时候就要预先将要发出的数据都准备好,等到有人订阅它时再将数据通过 Even ...

  3. RxSwift之深入解析核心逻辑Observable的底层原理

    一.前言 现有一段 RxSwift 使用序列并监听序列消息发送的示例代码,如下所示: // 创建序列let ob = Observable<Any>.create { (observer) ...

  4. RxSwift之深入解析map操作符的底层实现

    一.map 操作符的使用 map 操作符将源 Observable 的每个元素转换一遍,然后返回含有转换结果的 Observable: 现有如下示例: Observable<Int>.of ...

  5. RxSwift之深入解析场景特征序列的使用和底层实现

    一.引序 任何序列都可以用 Observable 描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收: Observable<Any>.create { (o ...

  6. KingbaseES V8R6运维案例之---wal日志解析DDL操作

    ​案例说明: 通过sys_waldump解析DDL操作,获取DDL操作的日志条目具体内容. 适用版本: KingbaseES V8R3/R6 一.DDL事务操作对应的wal日志文件 # 查看当前onl ...

  7. 【Hadoop】HDFS操作、数据上传与下载原理解析、高级特性及底层原理

    HDFS操作.数据上传与下载原理解析.高级特性及底层原理 1 HDFS操作 1.1 Web Console网页工具 1.2 命令行 1.2.1 普通的操作命令 1.2.2 管理员命令 1.3 Java ...

  8. jsoup 对 HTML 文档进行解析和操作

    使用 jsoup 对 HTML 文档进行解析和操作 使用 jsoup 对 HTML 文档进行解析和操作 jsoup 简介 Java 程序在解析 HTML 文档时,相信大家都接触过 htmlparser ...

  9. 一起谈.NET技术,VS2010测试功能之旅:编码的UI测试(3)-操作对象的识别原理...

    回顾  在之前的两章分别介绍了一个简单的示例, 操作动作的录制原理,通过修改UIMap.UItest文件控制操作动作代码的生成.想必大家对编码的UI测试操作动作的录制应该有一定了解了,在UI操作中,操 ...

最新文章

  1. 5G NPN 行业专网 — 部署模式
  2. binlog2mysql,MySQL 数据恢复工具之binlog2sql
  3. 简单子串查找--strstr的使用(ZCMU1108)
  4. iOS SDK: Send E-mail In-App
  5. mysql 截取逗号并形成新的字段_MySQL将一个字段中以逗号分隔的取出来形成新的字段实现...
  6. 对java支持并发的理解_Java并发知识(1)
  7. 关于tomcat和sessionCookieName和SESSION_PARAMETER_NAME以及disableURLRewriting参数原理和使用...
  8. Python 爬虫进阶必备——某体育网站登录令牌加密分析,赶紧收藏哦!
  9. 【jQuery笔记Part1】10-jQuery操作html-获取设置
  10. 物联网卡焊接式和插拔式各有什么优势
  11. 使用systemtap调试Linux内核
  12. 视频剪辑素材哪里找?这个几个网站就够了。
  13. 五种开源协议的比较(BSD,Apache,GPL,LGPL,MIT) – 整理
  14. 新浪微博PC端登录分析
  15. 多多情报通:拼多多推广账户金额可以通用吗?里面钱可以提出来吗?
  16. echarts 好看的柱形图
  17. RTC编程挑战赛 开源项目推荐
  18. Windows-驱动-解决Thinkpad e470c在Win8.1下WiFi无法连接的问题
  19. 评测三款最流行的azw3阅读器(ios手机适用)
  20. iphone手机显示itunes store无法连接服务器,iphone无法连接到itunes store怎么解决?

热门文章

  1. ChatGPT官宣数学能力再升级,网友:终于精通十以内加减法了
  2. Defocus(散焦)
  3. html实现鼠标悬停效果实现
  4. 视频直播,音频直播,m3u8
  5. Java,Android,计算机原理视频,500G视频资料
  6. 关于10的勾股数有哪些_关于“天风证券”“浪潮信息”的配股提示
  7. 马尔科夫链蒙特卡洛(MCMC)
  8. NATO(北大西洋公约组织)采用ADOBE FLEX作为作战支持系统
  9. 微信小程序获取系统时间、时间戳、时间时间戳加减
  10. SAP FICO资产卡片批导开发说明书(包括测试样例、程序代码仅作参考,不保证一定可以运行)