rxjava 观察者模式

In this tutorials, we’ll be discussing RxJava Observables and Observers in length. We’ll discuss their various types and what each of them has to offer.

在本教程中,我们将详细讨论RxJava Observable和Observers。 我们将讨论它们的各种类型以及它们各自提供的功能。

观察者与观察者 (Observables and Observers)

In RxJava, Observables are the source which emits items to the Observers. For Observers to listen to the Observables, they need to subscribe first. The instance created after subscribing in RxJava2 is called Disposable.

在RxJava中,Observables是向观察者发射项目的源。 为了让观察者收听可观察对象,他们需要首先订阅。 订阅RxJava2之后创建的实例称为Disposable

In order to stop listening to Observables, we can call unsubscribe by calling the method dispose() on the Disposable instance.

为了停止监听Observable,我们可以通过在Disposable实例上调用方法dispose()来调用取消订阅。

here.此处 。

创建可观察物 (Creating Observables)

We can create Observables in many ways. One of the ways are:

我们可以通过多种方式创建Observable。 方法之一是:

Observable<Integer> observable = new ObservableCreate<Integer>(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(10);emitter.onNext(20);emitter.onComplete();}});

Observable.OnSubscribe is an interface which defines the action to be taken when a subscriber subscribes to the Observable. The subscribe method would only run when an Observer is subscribed to the Observable.

Observable.OnSubscribe是一个接口,它定义订户订阅Observable时要采取的操作。 只有在将观察者订阅到Observable时,subscribe方法才会运行。

onNext is used to emit the next item.
onError is triggered when an error occurs.
onComplete is called after the last item is emitted.

onNext用于发射下一个项目。
发生错误时触发onError
发出最后一项后,将调用onComplete

Now in order to catch these values, we must subscriber an observer. For that we have to create an observer first:

现在,为了捕获这些值,我们必须订阅一个观察者。 为此,我们必须首先创建一个观察者:

Observer<Integer> observer = new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe");}@Overridepublic void onNext(Integer o) {System.out.println("onNext " + o);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {System.out.println("onComplete");}};

Let’s subscribe to it:

让我们订阅它:

observable.subscribe(observer);

This creates a Subscription between the observer and observable. The Observable would now emit values which would be caught by the onNext of the Observer.

这将在观察者和可观察者之间创建一个订阅。 Observable现在将发出将由Observer的onNext捕获的值。

The output from the console is:

控制台的输出为:

Output:
onSubscribe
onNext 10
onNext 20
onComplete

If you subscribe() multiple times, each time the items would be emitted.

如果您多次subscribe() ,则每次发射该项目。

创建可观察物的方法 (Methods to Create Observables)

We can create Observables in the following ways:

我们可以通过以下方式创建Observables:

  • Observable.from()Observable.from()
  • Observable.just() – Pass one or more values inside this.Observable.just()–在其中传递一个或多个值。
  • Observable.range – The first argument expects the starting value. The second expects the size. Eg: Observable.range(1,2) would emit 1 and 2.Observable.range –第一个参数需要起始值。 第二个期望大小。 例如: Observable.range(1,2)将发出1和2。
  • Observable.interval() – Emits the values in the interval defined. The values emitted would be of the type Long. More on this later.Observable.interval()–发出定义的时间间隔中的值。 发出的值将是Long类型。 稍后再详细介绍。

For more info check out the RxJava Tutorial.

有关更多信息,请查看RxJava教程 。

冷观测和热观测 (Cold Observables and Hot Observables)

Cold Observables are Observables that emit one or values such that each Subscriber would receive all the values from the beginning.

冷可观察变量是发出一个或多个值以便每个订阅服务器从一开始就接收所有值的可观察变量。

Hot Observables are Observables in which the Observer won’t be able to receive items emitted before it subscribed. Only items emitted after the Observer is emitted could be received.

热点可观察者是指观察者在订阅之前无法接收发出的项目的可观察者。 只能接收发出观察者之后发出的项目。

The example we’d defined above was a Cold Observable.

我们上面定义的示例是Cold Observable。

To create a Hot Observable we do:

要创建热点可观察对象,请执行以下操作:

Observable<Long> observableInterval = Observable.interval(2, TimeUnit.SECONDS);PublishSubject<Long> publishSubject = PublishSubject.create();observableInterval.subscribe(publishSubject);publishSubject.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l));try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}publishSubject.subscribe(l -> System.out.println("Subscriber #2 onNext: " + l));try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}

To create a Hot Observable we need to use Subject. A Subject can act as an Observable or Observer at any given time.

要创建热点可观察对象,我们需要使用主题。 主体可以在任何给定时间充当可观察者或观察者。

Values from 0 would be emitted every 2 seconds. We’ve set the thread to sleep for 2 seconds after the first observer is subscribed. Hence the second observer won’t get the initial emitted items as shown in the output below:

从0开始的值将每2秒发出一次。 订阅第一个观察者后,我们将线程设置为Hibernate2秒钟。 因此,第二个观察者将无法获得初始发射的物品,如下面的输出所示:

可观察物的类型 (Types of Observables)

Following are the major types of Observables with each have a slightly different functionality and use case:

以下是Observable的主要类型,每种类型的功能和用例略有不同:

  • Observable – Emits one or more values. We have already discussed this above.可观察 –发出一个或多个值。 上面我们已经讨论过了。
  • Single – Emits a single value or throws an error.单一 -发出单一值或引发错误。
  • Maybe – This may or may not emit a value. Should be used when you need some data optionally.可能 –这可能会或可能不会发出值。 当您需要一些数据时应使用。
  • Flowable – Used when a huge amount of data has to be emitted. It is used for backpressure. More on this later.可流动 –在必须发射大量数据时使用。 用于背压。 稍后再详细介绍。
  • Completable – This just emits success or failure. No data is emitted.可完成 –只会产生成功或失败。 没有数据被发射。

观察者类型 (Types of Observers)

For every Observable type above we have an Observer type as well in RxJava.

对于上面的每个Observable类型,RxJava中也都有Observer类型。

  • Observer.观察员
  • SingleObservable单次可观察
  • MaybeObservable也许可以观察
  • CompletableObserverCompletableObserver
Subscribers are used instead of Observers订阅服务器代替观察者

Let’s now look at the basic implementation of each of the Observables with the Observers.

现在,让我们来看一下带有观察者的每个Observable的基本实现。

单 (Single)

This emits just one value. This can be used with Retrofit network calls.

这仅发出一个值。 可以与翻新网络调用一起使用。

Following is an example of Single:

以下是Single的示例:

Observable<Integer> integerObservable = Observable.just(1,2,3);Single<Integer> integerSingle = integerObservable.single(1);integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));integerSingle = integerObservable.singleOrError();integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

This would give an onError in both the cases since neither of has a single value.

在这两种情况下,这都将导致onError ,因为两者都不具有单个值。

single(Integer defaultValue) and singleOrError() are just two of the methods.
We can add plenty of other operators as well just as all, any,contains, count etc. Generally, a predicate is set in these methods which would return a single value.

single(Integer defaultValue)singleOrError()只是其中两个方法。
我们可以添加很多其他运算符,以及allanycontainscount等。通常,在这些方法中设置谓词,该谓词将返回单个值。

Example:

例:

Observable<Integer> integerObservable = Observable.just(1, 2, 3);Single<Boolean> booleanSingle = integerObservable.any(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer % 2 == 0;}});booleanSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));Single<Long> integerSingle = integerObservable.count();integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

This prints true and 3 respectively.

这分别输出true3

也许 (Maybe)

Maybe emits 0 or 1 items. The MaybeObserver has the method onSuccess in place of onNext().

也许发出0或1个项目。 MaybeObserver使用onSuccess方法代替onNext()

Following is an example using Maybe in which we print the maximum number from an Observable of Integers.

以下是使用Maybe的示例,其中我们从“可观察的整数”中打印最大数量。

Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5);Maybe<Integer> integerMaybe = integerObservable.reduce(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {if (integer > integer2)return integer;elsereturn integer2;}});MaybeObserver<Integer> maybeObserver = new MaybeObserver<Integer>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe");}@Overridepublic void onSuccess(Integer o) {System.out.println("onSuccess : " + o);}@Overridepublic void onError(Throwable e) {System.out.println("onError");}@Overridepublic void onComplete() {System.out.println("onComplete");}};integerMaybe.subscribe(maybeObserver);

This prints 5.

打印5。

Besides the reduce function, there are plenty of other functions such as firstElement(), lastElement() etc.

除了reduce函数外,还有许多其他函数,例如firstElement(),lastElement()等。


To create a zero emission observable, do:
要创建可观察到的零排放,请执行以下操作:

Maybe<Integer> emptySource = Maybe.empty();

可完成 (Completable)

Completable is used in cases where you need to know whether an operation is completable successfully or not. Example: Uploading an image to the server. Unlike Maybe and Single, the CompletableObserver doesn’t return any value at all. Neither does the Completable Observable has a type.

当您需要知道某个操作是否可以成功完成时,可以使用“完成”。 示例:将图像上传到服务器。 与Maybe和Single不同,CompletableObserver根本不返回任何值。 Completable Observable也没有类型。

Example:

例:

Observable<Integer> integerObservable = Observable.empty();Completable completable = integerObservable.ignoreElements();CompletableObserver completableObserver = new CompletableObserver() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("onSubscribe");}@Overridepublic void onComplete() {System.out.println("onComplete");}@Overridepublic void onError(Throwable e) {System.out.println("onError");}};completable.subscribe(completableObserver);

流动性 (Flowable)

Flowable is used when you need to handle lots of data. It supports backpressure. We’ll discuss it at length in another tutorial. For now, a Flowable Observable needs a Subscriber class as the Observer since RxJava2.

当您需要处理大量数据时,可以使用Flowable。 它支持背压。 我们将在另一个教程中详细讨论它。 从RxJava2开始,目前Flowable Observable需要一个Subscriber类作为Observer。

Following is a sample of Flowable:

以下是Flowable的示例:

Flowable<Integer> integerFlowable = Flowable.range(1,500000);integerFlowable.reduce(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer + integer2;}});Subscriber<Integer> integerSubscriber = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {System.out.println("onSubscribe");s.request(Long.MAX_VALUE);}@Overridepublic void onNext(Integer integer) {System.out.println("onNext: " + integer);}@Overridepublic void onError(Throwable t) {}@Overridepublic void onComplete() {System.out.println("onComplete");}};integerFlowable.subscribe(integerSubscriber);

For a Subscriber to start receiving emissions we must manually invoke request() on the Subscription instance as done above.

为了使订阅者开始接收排放,我们必须如上所述在Subscription实例上手动调用request()。

在可观察对象之间转换 (Converting Between Observables)

We have various helper methods to convert an Observable type into another.

我们有多种帮助程序方法可将Observable类型转换为另一个类型。

For example:

例如:

To convert any type to a Completable, either of the methods are available:

要将任何类型转换为Completable ,可以使用以下两种方法之一:

  • toCompletable()toCompletable()
  • ignoreElements()ignoreElements()

Similarly, to convert to Observable, toObservable() method is suffice.
FlowabletoFlowable()
MaybetoMaybe()
Singlereduce()/firstElement() etc.

同样,要转换为Observable ,可以使用toObservable()方法。
流动性 - toFlowable()
也许toMaybe()
单个reduce() / firstElement()

This brings an end to this tutorial on RxJava Observables.

这结束了有关RxJava Observables的本教程。

翻译自: https://www.journaldev.com/22594/rxjava-observables-observers

rxjava 观察者模式

rxjava 观察者模式_RxJava可观察对象和观察者相关推荐

  1. Angular RxJS入门笔记 (Observable可观察对象、Subscribe订阅、Observer观察者、Subscription对象)

    RxJS入门笔记,关于Observable可观察对象.Observer观察者.Subscribe订阅,Subscription Observable可观察对象 Observer观察者 总结整体 本笔记 ...

  2. rxjava教程_RxJava教程

    rxjava教程 In this tutorial, we'll be introducing you to Reactive Programming and the library RxJava. ...

  3. rxjava结合_RxJava结合最新,与最新发件人

    rxjava结合 In this tutorial, we'll be discussing the two important operators of RxJava and how they di ...

  4. Angular 可观察对象(Observable)

    可观察对象(Observable) 可观察对象支持在应用的发布者和订阅者之间传递消息. 可观察对象是声明式的 -- 即定义的用于发布值的函数,在有消费者订阅它之前,这个函数不会实际执行. 可观察对象可 ...

  5. UG NX 12 观察对象显示

    在使用UG NX 12建模的过程中,在绘图区中对点.线.实体.特征.线型.颜色以及网格线等对象的编辑或修改统称为对象的操作.本节将介绍对象的观察显示.隐藏对象.删除与恢复对象等相关操作. 观察对象显示 ...

  6. 优雅のJava(五)—— 优雅的观察对象与订阅信息 观察者模式 好莱坞原则

    文章目录 专栏导航 前言 好莱坞原则 工厂模式的思路借鉴 IOC的思路借鉴 问题核心 即时通讯的思路借鉴 GUI设计上的思路借鉴 后记 专栏导航 优雅のJava(零)-- 面向问题的学习 前言 这篇主 ...

  7. javafx阴影_JavaFX技巧来节省内存! 属性和可观察对象的阴影场

    javafx阴影 在 JavaFX的世界中, Properties API允许UI开发人员将值绑定到UI控件. 这种功能出奇的简单,但是当对象模型经常使用属性时,应用程序可能会很快耗尽内存. 我通常会 ...

  8. 简单的对象监听器 观察者设计模式

    该代码实现了一个可以注册监听类的类,如果注册了监听类,那么在类的一个方法执行前会执行监听类的方法.并且该监听类方法的参数就是被监听对象. 监听类就是事件监听器,被监听对象就是事件源,事件监听器的参数就 ...

  9. java中退订程序怎么写_如何在RxJava中的自定义Observable中获得观察者的退订操作的通知...

    订阅者抽象类实际上有一个方法add,可以添加将取消订阅订阅的订阅. def myObservable = Observable.create({ aSubscriber -> val liste ...

最新文章

  1. Science | 谷岩/王朗团队揭示大脑中的免疫细胞竟是记忆遗忘的“主谋”
  2. 服务器怎么可以维修荒野行动,荒野行动PC版 设置单独服务器让你尽情畅玩
  3. go get报错unrecognized import path “golang.org/x/net/context”…
  4. Python 技术篇-基于随机数的uuid码的生成
  5. tomact如何处理一个http请求?
  6. eureka同步原理_eureka原理剖析
  7. 解题:USACO15JAN Grass Cownoisseur
  8. tabBar颜色改动
  9. 泛微为芯片行业搭建数字化平台:研发、生产、销售、文档一体管理
  10. yarn lib cli.js SyntaxError: Unexpected token
  11. 投资 - 什么是EBITDA
  12. [已解决]你的支付授权失败。请核对你的信息并重试,或尝试其他支付方式。请联系你的银行了解更多信息
  13. 路由器dns被劫持怎么办 路由器DNS被劫持解决方法
  14. MachineLearning in Action (机器学习实战)源码和数据集下载地址
  15. USYD悉尼大学DATA1002 详细作业解析Module5
  16. python educoder苹果梨子_pythoneducoder苹果梨子煮水的功效_苹果梨子汤的功效
  17. JavaWeb项目监听数据表变化并通知前台(数据来源于自身)
  18. 格密码学重要概念: 分叉引理Forking lemma
  19. STM32F4内的FLASH和RAM
  20. 深度学习系列2:框架tensorflow

热门文章

  1. 【java读书笔记】——java开篇宏观把控 + HelloWorld
  2. [转载] python type() 判断数据类型
  3. [转载] JAVA出现空指针异常(初学者)
  4. Java匹马行天下之 Java国出了个Java——举国欢庆
  5. 浏览器提取已安装扩展教程
  6. 单点登录实现机制:web-sso
  7. Mac OS 10.12 - 如何关闭Rootless机制?
  8. 【计算机视觉-从入门到精通系列】 第三章 立体匹配
  9. PyTorch 入坑四 梯度、链式法则、计算图与反向传播
  10. c++中int double 与char,string之间的转换