一、Subject

  • RxSwift 的核心逻辑 Observable 不具备发送事件的能力,创建一个 Observable 的时候就要预先将要发出的数据都准备好,等到有人订阅它时再将数据通过 Event 发出去。但有时希望 Observable 在运行时能动态地获得或者说产生一个新的数据,再通过 Event 发送出去。比如,订阅一个输入框的输入内容,当用户每输入一个字符之后,输入框关联的 Observable 就会发出一个带有输入内容的 Event,通知给所有订阅者。为此,RxSwift 提供了一种可以发送事件又可以订阅事件值的对象,它就是 Subject。
  • Subject 既是订阅者,也是 Observable:它是订阅者,是因为能够动态地接收新的值;它是 Observable,是因为当 Subjects 有了新的值之后,就会通过 Event 将新值发出给它的所有订阅者。
  • Subject 常用的方法:
    • onNext( : ):是 on(.next( : )) 的简写,该方法相当于 subject 接收到一个.next 事件;
    • onError( : ):是 on(.error( : )) 的简写,该方法相当于 subject 接收到一个 .error 事件;
    • onCompleted():是 on(.completed) 的简写,该方法相当于 subject 接收到一个 .completed 事件。

二、PublishSubject

  • PublishSubject 不需要初始值就能创建,它的订阅者从开始订阅的时间点起,可以收到订阅后 Subject 发出的新 Event,而不会收到它们在订阅前已发出的 Event,即 PublishSubject 仅仅发送在订阅之后由源 Observable 发送的数据。
  • 如下所示,最上面是 PublishSubject,下面分别表示两个新的订阅,它们订阅的时间点不同,可以发现 PublishSubject 的订阅者只能收到它们订阅后的 Event:

  • PublishSubject 一旦被建立就会立刻开始发送事件(除非采取方法去阻止它),这种机制有丢失事件的风险,因为在 Subject 被创建和被监听之间有一定的时间间隔,如果想保证所有的事件都可以被监听到的话,可以有两种方法:
    • 第一种方法是使用 Create 方法(在发送之前检查是否所有 observer 已经订阅);
    • 第二种方法是可以使用 ReplaySubject。
  • 如果源 Observable 被一个 error 中断,PublishSubject 将不会发送任何事件给后续的 observer,但是它会传递 error 信息,如下所示:

  • 使用示例如下:
let disposeBag = DisposeBag()// 创建一个PublishSubject
let subject = PublishSubject<String>()// 由于当前没有任何订阅者,所以该信息不会被输出到控制台
subject.onNext("1")// 第1次订阅subject
subject.subscribe(onNext: { string inprint("第1次订阅:", string)
}, onCompleted:{print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)// 当前有1个订阅,则该信息会被输出到控制台
subject.onNext("2")// 第2次订阅subject
subject.subscribe(onNext: { string inprint("第2次订阅:", string)
}, onCompleted:{print("第2次订阅:onCompleted")
}).disposed(by: disposeBag)// 当前有2个订阅,则该信息会输出到控制台
subject.onNext("3")// subject结束
subject.onCompleted()// subject完成后会发出.next事件
subject.onNext("4")// subject完成后它的所有订阅(包括结束后的订阅),都能收到subject的.completed事件
subject.subscribe(onNext: { string inprint("第3次订阅:", string)
}, onCompleted:{print("第3次订阅:onCompleted")
}).disposed(by: disposeBag)
  • 运行结果如下:
第1次订阅: 2
第1次订阅: 3
第2次订阅: 3
第1次订阅:onCompleted
第2次订阅:onCompleted
第3次订阅:onCompleted

三、AsyncSubject

  • AsyncSubject 只发送由源 Observable 发送的最后一个事件,并且只在源 Observable 完成之后(如果源 Observable 没有发送任何值,AsyncSubject 也不会发送任何值):

  • AsyncSubject 会发送相同的值给所有 observer。但是,如果源 Observable 被 error 中断了发送,AsyncSubject 便不会发送任何事件,而是会发送从源 Observable 传来的 error 提示:

  • 使用示例:
let disposeBag = DisposeBag()
// 创建序列
let asynSub = AsyncSubject<Int>.init()
// 发送信号
asynSub.onNext(1)
asynSub.onNext(2)
// 订阅序列
asynSub.subscribe{ print("订阅到:",$0)}.disposed(by: disposeBag)
// 再次发送
asynSub.onNext(3)
asynSub.onNext(4)
// asynSub.onError(NSError.init(domain: "dw", code: 10086, userInfo: nil))
asynSub.onCompleted()
  • 运行结果如下:
订阅到: next(4)
订阅到: completed

四、BehaviorSubject

  • 当一个 observer 订阅一个 BehaviorSubject,它就开始发送最近由源 Observable 发送的事件(或者是还没有被发送的种子值/默认值),然后继续发送从源 Observable 接收到的其它事件。如下所示:

  • 如果源 Observable 被一个 error 中断,那么 BehaviorSubject 不会发送事件给后续的 observer,但会传递给它们 error 的信息。如下所示:

  • 使用示例:
let disposeBag = DisposeBag()// 创建一个BehaviorSubject
let subject = BehaviorSubject(value: "1")// 第1次订阅subject
subject.subscribe { event inprint("第1次订阅:", event)
}.disposed(by: disposeBag)// 发送next事件
subject.onNext("2")// 发送error事件
subject.onError(NSError(domain: "dw", code: 0, userInfo: nil))// 第2次订阅subject
subject.subscribe { event inprint("第2次订阅:", event)
}.disposed(by: disposeBag)
  • 运行结果如下:
第1次订阅: next(1)
第1次订阅: next(2)
第1次订阅: error(Error Domain=dw Code=0 "(null)")
第2次订阅: error(Error Domain=dw Code=0 "(null)")

五、ReplaySubject

  • ReplaySubject 在创建时候需要设置一个 bufferSize,表示它对于它发送过的 event 的缓存个数。比如一个 ReplaySubject 的 bufferSize 设置为 2,它发出了 3 个 .next 的 event,那么它会将后两个(最近的两个)event 给缓存起来。此时如果有一个 subscriber 订阅了这个 ReplaySubject,那么这个 subscriber 就会立即收到前面缓存的两个.next 的 event。
  • 如果一个 subscriber 订阅已经结束的 ReplaySubject,除了会收到缓存的 .next 的 event外,还会收到那个终结的 .error 或者 .complete 的event。
  • 如下所示,最上面是 ReplaySubject(bufferSize 设为为 2),下面分别表示两个新的订阅,它们订阅的时间点不同,可以发现 ReplaySubject 的订阅者一开始就能收到 ReplaySubject 之前发出的两个 Event(如果有):

  • 使用示例:
let disposeBag = DisposeBag()// 创建一个bufferSize为2的ReplaySubject
let subject = ReplaySubject<String>.create(bufferSize: 2)// 连续发送3个next事件
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")// 第1次订阅subject
subject.subscribe { event inprint("第1次订阅:", event)
}.disposed(by: disposeBag)// 再发送1个next事件
subject.onNext("4")// 第2次订阅subject
subject.subscribe { event inprint("第2次订阅:", event)
}.disposed(by: disposeBag)// 让subject结束
subject.onCompleted()// 第3次订阅subject
subject.subscribe { event inprint("第3次订阅:", event)
}.disposed(by: disposeBag)
  • 运行结果如下:
第1次订阅: next(2)
第1次订阅: next(3)
第1次订阅: next(4)
第2次订阅: next(3)
第2次订阅: next(4)
第1次订阅: completed
第2次订阅: completed
第3次订阅: next(3)
第3次订阅: next(4)
第3次订阅: completed

六、Variable

  • Variable 其实就是对 BehaviorSubject 的封装,所以它也必须要通过一个默认的初始值进行创建,它具有 BehaviorSubject 的功能,能够向它的订阅者发出上一个 event 以及之后新创建的 event。
  • 不同的是,Variable 还会把当前发出的值保存为自己的状态,同时它会在销毁时自动发送 .complete 的 event,不需要也不能手动给 Variables 发送 completed 或者 error 事件来结束它。
  • 简单地说,就是 Variable 有一个 value 属性,改变这个 value 属性的值就相当于调用一般 Subjects 的 onNext() 方法,而这个最新的 onNext() 的值就被保存在 value 属性里,直到再次修改它。
  • Variables 本身没有 subscribe() 方法,但是所有 Subjects 都有一个 asObservable() 方法,可以使用这个方法返回这个 Variable 的 Observable 类型,拿到这个 Observable 类型就能订阅它。
  • 使用示例:
let disposeBag = DisposeBag()// 创建一个初始值为1的Variable
let variable = Variable("1")// 修改value值
variable.value = "2"// 第1次订阅
variable.asObservable().subscribe {print("第1次订阅:", $0)
}.disposed(by: disposeBag)// 修改value值
variable.value = "3"// 第2次订阅
variable.asObservable().subscribe {print("第2次订阅:", $0)
}.disposed(by: disposeBag)// 修改value值
variable.value = "4"
  • 运行结果:
第1次订阅: next(2)
第1次订阅: next(3)
第2次订阅: next(3)
第1次订阅: next(4)
第2次订阅: next(4)
第1次订阅: completed
第2次订阅: completed
  • Variable 虽然被废弃了,但是由于 Variable 的灵活性,因此在开发里面应用非常之多。

七、BehaviorRelay

  • BehaviorRelay 会替换原来的 Variable,储存一个信号,并且可以随时订阅响应,但响应发送的时候要注意 behaviorR.accept(20):
let disposeBag = DisposeBag()
let behaviorRelay = BehaviorRelay(value: 100)
behaviorRelay.subscribe(onNext: { (num) inprint(num)
.disposed(by: disposeBag)
print("打印:\(behaviorRelay.value)")behaviorRelay.accept(1000)

八、Subject 原理分析

① SubjectType

  • SubjectType 继承自 ObservableType,具有序列特性,并且关联观察者类型,具备观察者类型的能力,如下:
public protocol SubjectType : ObservableType {// 关联观察者类型,具备观察者类型的能力associatedtype SubjectObserverType : ObserverTypefunc asObserver() -> SubjectObserverType
}
  • 现有如下实例 subject:
let disposeBag = DisposeBag()
// 初始化序列
let publishSub = PublishSubject<Int>()
// 发送响应序列
publishSub.onNext(1)
// 订阅序列
publishSub.subscribe { print("订阅到:",$0)}.disposed(by: disposeBag)
// 再次发送响应
publishSub.onNext(2)
publishSub.onNext(3)
  • PublishSubject 很明显能够订阅信号(序列最基本的能力),并且能够发送响应,又是观察者的能力。

② 订阅响应流程

public override func subscribe -> Disposable {self._lock.lock()let subscription = self._synchronized_subscribe(observer)self._lock.unlock()return subscription
}func _synchronized_subscribe -> Disposable  {............let key = self._observers.insert(observer.on)return SubscriptionDisposable(owner: self, key: key)
}
  • self._observers.insert(observer.on): 通过一个集合添加进去所有的订阅事件,很明显在合适的地方一次性全部执行。其中也返回这次订阅的销毁者,方便执行之后的工作:synchronizedUnsubscribe->self._observers.removeKey(disposeKey)。
mutating func removeKey(_ key: BagKey) -> T? {if _key0 == key {_key0 = nillet value = _value0!_value0 = nilreturn value}if let existingObject = _dictionary?.removeValue(forKey: key) {return existingObject}for i in 0 ..< _pairs.count where _pairs[i].key == key {let value = _pairs[i].value_pairs.remove(at: i)return value}return nil
}
  • 遍历通过 key 获取响应 bag 中的 value,执行集合移除,因为没有相应持有关系,达到自动释放销毁。

③ 发送信号流程

public func on(_ event: Event<Element>) {dispatch(self._synchronized_on(event), event)
}
  • 这里调用了 dispatch 函数,传了两个参数:self._synchronized_on(event) 和 event,查看 dispatch 函数源码:
func dispatch<E>(_ bag: Bag) {bag._value0?(event)if bag._onlyFastPath {return}let pairs = bag._pairsfor i in 0 ..< pairs.count {pairs[i].value(event)}if let dictionary = bag._dictionary {for element in dictionary.values {element(event)}}
}
  • bag._value0?(event) 首先执行事件的回调,然后判断 bag._onlyFastPath 的情况,默认会开启快速通道,如果是开启慢速通道,需要从刚刚添加进 bag 包里面的匹配,挨个进行 pairs[i].value(event) 外界事件回调,然后拿回外界封装的闭包的闭包调用 :element(event)。
func _synchronized_on(_ event: Event<E>) -> Observers {self._lock.lock(); defer { self._lock.unlock() }switch event {case .next:if self._isDisposed || self._stopped {return Observers()}return self._observerscase .completed, .error:if self._stoppedEvent == nil {self._stoppedEvent = eventself._stopped = truelet observers = self._observersself._observers.removeAll()return observers}return Observers()}
}
  • 如果 self._isDisposed || self._stopped 成立,就会返回一个空的集合,也就没有序列的响应。在 .completed, .error 都会改变状态 self._stopped = true,也就是说序列完成或者错误之后都无法再次响应。在.completed, .error 还会移除添加在集合里面的内容。
  • Subject 流程图如下(Subject 把订阅流程和响应流程都内部实现,所以没有必要引入 sink):

④ Subject 对比

  • PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject、Variable,它们之间既有各自的特点,也有相同之处:
    • 首先它们都是 Observable,它们的订阅者都能收到它们发出的新的 Event;
    • 直到 Subject 发出 .complete 或者 .error 的 Event 后,该 Subject 便终结,同时它也就不会再发出 .next 事件;
    • 对于那些在 Subject 终结后再订阅它的订阅者,也能收到 subject 发出的一条 .complete 或 .error 的 event,告诉新的订阅者它已经终结。
  • 它们之间最大的区别只是在于:当一个新的订阅者刚订阅它的时候,能不能收到 Subject 以前发出过的旧 Event,如果能的话又能收到多少个 Event。

RxSwift之深入解析Subject的使用和实现原理相关推荐

  1. RxSwift之深入解析dispose源码的实现原理

    一.前言 任何对象都有生命周期,有创建就要销毁.OC 中有 init 和 dealloc,swift 有 init 和 deinit,RxSwift 也不例外,有 create 和 dispose. ...

  2. RxSwift之深入解析核心逻辑Observable的底层原理

    一.前言 现有一段 RxSwift 使用序列并监听序列消息发送的示例代码,如下所示: // 创建序列let ob = Observable<Any>.create { (observer) ...

  3. RxSwift之深入解析Using操作的应用和原理

    一.前言 ReactiveX 官方文档 对于 Using 的描述如下: create a disposable resource that has the same lifespan as the O ...

  4. RxSwift之深入解析map操作符的底层实现

    一.map 操作符的使用 map 操作符将源 Observable 的每个元素转换一遍,然后返回含有转换结果的 Observable: 现有如下示例: Observable<Int>.of ...

  5. RxSwift之深入解析场景特征序列的使用和底层实现

    一.引序 任何序列都可以用 Observable 描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收: Observable<Any>.create { (o ...

  6. 解析深度学习:卷积神经网络原理与视觉实践

    解析深度学习:卷积神经网络原理与视觉实践 魏秀参 著 ISBN:9787121345289 包装:平装 开本:16开 正文语种:中文 出版社: 电子工业出版社 出版时间:2018-11-01

  7. jQuery技术内幕:深入解析jQuery架构设计与实现原理

    为什么80%的码农都做不了架构师?>>>    jQuery技术内幕:深入解析jQuery架构设计与实现原理 本书由阿里巴巴资深前端开发工程师撰写,从源代码角度全面而系统地解读了jQ ...

  8. 解析BW:数据源提取数据的原理

    解析BW:数据源提取数据的原理 题记:忽然想到这么个问题,后勤数据源和非后勤数据初始化有何区别,然后进行周边的拓展,所以就形成了下文.大部分知识源于TBW350和SAP SDN. 对数据源抽取机制的深 ...

  9. XML的四种解析器(dom,sax,jdom,dom4j)原理及性能比较

    6月20日 XML的四种解析器(dom,sax,jdom,dom4j)原理及性能比较  1:DOM     DOM 是用与平台和语言无关的方式表示 XML 文档的官方 W3C 标准.DOM 是以层次结 ...

最新文章

  1. c 获取mysql列数据_转 用C API 操作MySQL数据库
  2. 113亿参数,中国最大 AI 模型!不仅能作诗,还能告诉你男朋友该不该分手!
  3. 《高性能Linux服务器构建实战》笔记
  4. iOS项目工程及目录结构
  5. 微软推出VS Code新特性,为TypeScript和JavaScript用户提供AI辅助开发功能
  6. 拒绝“肌肉记忆”,卡萨帝迈进场景品牌新赛道
  7. Shift register(RAM-based)------ALTSHIFT_TAPS
  8. linux 查redis状态_干货:用案例代码详解Redis中的事件驱动模型
  9. 如何隐藏你的 Linux 的命令行历史
  10. java中类与类的关系_Java中类与类的关系
  11. django使用mysql原始语句,Django中使用mysql数据库并使用原生sql语句操作
  12. 大数据_Flink_数据处理_资源的分配和并行度测试---Flink工作笔记0013
  13. python字符串format格式化
  14. 机器学习-监督学习之分类算法:K近邻法 (K-Nearest Neighbor,KNN)
  15. new char() 和 new char[]
  16. 数据结构--酒店管理系统(C语言)
  17. python爬虫 导出/乱码/中英文夹杂问题解决
  18. java实现表白动图
  19. Dominating Patterns UVALive - 4670 AC自动机
  20. python原始数据是什么_荐Python种MNE库模拟原始脑电数据

热门文章

  1. c++ 读写文本文件
  2. 第十届山东省ACM省赛题解
  3. 设计模式之控制反转和依赖注入的使用小结
  4. MySql cmd下的学习笔记 —— 有关建立数据库的操作(连接Mysql,建立数据库,删除数据库等等)...
  5. vs2003打开项目错误
  6. python paramiko sftp_python paramiko (ssh,sftp)
  7. 指定路径_Workbench中如何创建指定路径分析的变形曲线
  8. python open文件安全隐患_python的其他安全隐患
  9. java 判断端口是否被占用_java检测端口是否被占用详解
  10. Java黑皮书课后题第8章:*8.2(求矩阵对角线元素的和)使用下面的方法头编写一个方法,求n*n的double类型矩阵中主对角线上所有数字的和。编写一个程序,读取一个4*4的矩阵,显示主对角线和