1.什么是RxJava? 1.1什么是响应式编程? 是一种基于异步数据流概念的编程模式(异步数据流编程) 数据流 ->河流(被观测、被过滤、被操作)

1.2响应式编程的设计原则是: 保持数据的不变性 没有共享 阻塞是有害的

1.3在我们的Java里面提供了解决方案 - RxJava? RxJava:Reactive Extensions Java(Java响应式编程) 响应式编程最初诞生.Net里面 iOS开发中也有响应式编程(block)

      // 传统写法:加载文件
//      new Thread() {
//          @Override
//          public void run() {
//              super.run();
//              for (File folder : folders) {
//                  File[] files = folder.listFiles();
//                  for (File file : files) {
//                      if (file.getName().endsWith(".png")) {
//                          final Bitmap bitmap = getBitmapFromFile(file);
//                          // 更新UI线程
//                          runOnUiThread(new Runnable() {
//                              @Override
//                              public void run() {
//                                  imageCollectorView.addImage(bitmap);
//                              }
//                          });
//                      }
//                  }
//              }
//          }
//      }.start();
复制代码

RxJava写法

        File[] folders = new File[10];Observable.from(folders)//便利.flatMap(new Func1<File, Observable<File>>() {@Overridepublic Observable<File> call(File file) {return Observable.from(file.listFiles());}})//过滤.filter(new Func1<File, Boolean>() {@Overridepublic Boolean call(File file) {//条件return file.getName().endsWith(".png");}})//加载图片.map(new Func1<File, Bitmap>() {@Overridepublic Bitmap call(File file) {return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())//更新UI.subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {imageCollectorView.addImage(bitmap);}});
复制代码

文件数组 flatMap:相当于我们手动的起嵌套循环 队列数据结构 你会发现以下这个简单的案例有哪些优势 第一点:你不需要考虑线程问题 第二点:你不要关心如何更新UI线程,如何调用

2.RxJava整体架构设计?

 整体架构设计 -> 主要观察者模式同时里面还采用其他的设计模式 代理模式、迭代器模式、Builder设计模式(构建者模式)整体RxJava框架,角色划分:Observable   :被观察者Observer      : 观察者Subscrible    : 订阅Subjects       : 科目Observable 和 Subjects 是两个“生产“实体,Observer和Subscrible是两个“消费”实体热Observables 和冷Observables从发射物的角度来看,有两种不同的Observables:热的和冷的。一个“热”的Observable典型的只要一创建完就开始发射数据。因此所有后续订阅它的观察者可能从序列中间得某个位置开始接收数据(有一些数据错过了)。一个“冷”的Observable会一直等待,知道由观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。热和冷热:主动场景:容器中目前只有一个观察者,向所有的观察者发送3条数据,因为热Observables一旦创建就立马发送消息,假设我现在发送到了第二条数据,突然之后增加了一个观察者,这个时候,第二个观察者就收不到之前的消息。
冷:被动场景:容器中目前只有1个观察者,因为冷Observables一旦创建就会等待观察者订阅,一定有观察者订阅了,我立马将所有的消息发送给这个观察者(订阅人)
复制代码

3.RxJava基本API? 第一个案例:如何创建Observables?

subscribe 相关源码:

    public final Subscription subscribe(final Observer<? super T> observer) {if (observer instanceof Subscriber) {return subscribe((Subscriber<? super T>)observer);}if (observer == null) {throw new NullPointerException("observer is null");}return subscribe(new ObserverSubscriber<T>(observer));}static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {// validate and proceedif (subscriber == null) {throw new IllegalArgumentException("subscriber can not be null");}if (observable.onSubscribe == null) {throw new IllegalStateException("onSubscribe function can not be null.");/** the subscribe function can also be overridden but generally that's not the appropriate approach* so I won't mention that in the exception*/}// new Subscriber so onStart itsubscriber.onStart();/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}// The code below is exactly the same an unsafeSubscribe but not used because it would// add a significant depth to already huge call stacks.try {// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// in case the subscriber can't listen to exceptions anymoreif (subscriber.isUnsubscribed()) {RxJavaHooks.onError(RxJavaHooks.onObservableError(e));} else {// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}}return Subscriptions.unsubscribed();}}public class SafeSubscriber<T> extends Subscriber<T> {private final Subscriber<? super T> actual;boolean done;public SafeSubscriber(Subscriber<? super T> actual) {super(actual);this.actual = actual;}/*** Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.* <p>* The {@code Observable} will not call this method if it calls {@link #onError}.*/@Overridepublic void onCompleted() {if (!done) {done = true;try {actual.onCompleted();} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);RxJavaHooks.onError(e);throw new OnCompletedFailedException(e.getMessage(), e);} finally { // NOPMDtry {// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken// and we throw an UnsubscribeFailureException.unsubscribe();} catch (Throwable e) {RxJavaHooks.onError(e);throw new UnsubscribeFailedException(e.getMessage(), e);}}}}/*** Notifies the Subscriber that the {@code Observable} has experienced an error condition.* <p>* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or* {@link #onCompleted}.** @param e*          the exception encountered by the Observable*/@Overridepublic void onError(Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);if (!done) {done = true;_onError(e);}}/*** Provides the Subscriber with a new item to observe.* <p>* The {@code Observable} may call this method 0 or more times.* <p>* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or* {@link #onError}.** @param t*          the item emitted by the Observable*/@Overridepublic void onNext(T t) {try {if (!done) {actual.onNext(t);}} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwOrReport(e, this);}}/*** The logic for {@code onError} without the {@code isFinished} check so it can be called from within* {@code onCompleted}.** @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>*/@SuppressWarnings("deprecation")protected void _onError(Throwable e) { // NOPMDRxJavaPlugins.getInstance().getErrorHandler().handleError(e);try {actual.onError(e);} catch (OnErrorNotImplementedException e2) { // NOPMD/** onError isn't implemented so throw** https://github.com/ReactiveX/RxJava/issues/198** Rx Design Guidelines 5.2** "when calling the Subscribe method that only has an onNext argument, the OnError behavior* will be to rethrow the exception on the thread that the message comes out from the observable* sequence. The OnCompleted behavior in this case is to do nothing."*/try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD}throw e2;} catch (Throwable e2) {/** throw since the Rx contract is broken if onError failed** https://github.com/ReactiveX/RxJava/issues/198*/RxJavaHooks.onError(e2);try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));}throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));}// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catchtry {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException(unsubscribeException);}}/*** Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.** @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}*/public Subscriber<? super T> getActual() {return actual;}
}复制代码

subscriber 实际上就是Observer

RxJava基本使用 源码分析 Observable创建原理分析: 第一步:调用Observable.create()方法 第二步:添加观察者订阅监听Observable.OnSubscrible 第三步:在Observable.create方法中创建被观察者new Observable(hook.onCreate(f)); 第四步:在Observable类构造方法中保存了观察者订阅监听

订阅观察者原理分析: 第一步:注册观察者监听observable.subscribe(new Observer()) 第二步:在Observable类中调用了 public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber(observer)); } 方法中注册观察者 第三步:在Observable类中调用了 public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }方法 第四步:调用了Observable.subscribe(subscriber, this);方法 第五步:在 Observable.subscribe方法中调用了监听观察者订阅的回调接口 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

    private Observable<String> observableString;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_simple2);// 创建一个被观察者// 配置回调接口---OnSubscribe// 为什么要配置?// 监听观察者订阅,一旦有观察者订阅了,立马回调改接口observableString = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> observer) {Log.i("main", "回到了");//访问请求// 所以在这个方法里面我们可以干一些事情// 进行数据通信(说白了就是通知观察者)for (int i = 0; i < 5; i++) {observer.onNext("第" + i + "个数据");}//访问完成// 当我们的数据传递完成observer.onCompleted();}});}public void click(View v) {// 观察者订阅// 回调原理:// 核心代码:// hook.onSubscribeStart(observable,// observable.onSubscribe).call(subscriber);observableString.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {Log.i("main", "---onCompleted---");}@Overridepublic void onError(Throwable e) {System.out.println("Oh,no! Something wrong happened!");}@Overridepublic void onNext(String item) {// 接受数据Log.i("main", "观察者接收到了数据: " + item);}});}结果输出
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 回到了
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第0个数据
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第1个数据
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第2个数据
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第3个数据
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 观察者接收到了数据: 第4个数据
08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: ---onCompleted---
复制代码

observableString.subscribe 中 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 调用call方法

另一种方式自动发送

 private Observable<String> observableString;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_simple2);List<String> items = new ArrayList<String>();items.add("Kpioneer");items.add("Xpioneer");items.add("haocai");items.add("Huhu");// 框架本身提供了这样的API// from: 一旦当你有观察者注册,立马发送消息序列// 框架内部实现// 框架内部调用create方法// 迭代器模式// OnSubscribeFromIterable类专门用于遍历集合// OnSubscribeFromArray类专门用于遍历数组observableString = Observable.from(items);}public void click(View v) {observableString.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {Log.i("main", "---onCompleted---");}@Overridepublic void onError(Throwable e) {System.out.println("Oh,no! Something wrong happened!");}@Overridepublic void onNext(String item) {// 接受数据Log.i("main", "观察者接收到了数据: " + item);}});}结果输出08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: Kpioneer
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: Xpioneer
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: haocai
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 观察者接收到了数据: Huhu
08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: ---onCompleted---
复制代码
/*** Copyright 2014 Netflix, Inc.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package rx.internal.operators;import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;/*** Converts an {@code Iterable} sequence into an {@code Observable}.* <p>* ![](http://upload-images.jianshu.io/upload_images/1824809-fa9342290145e00e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)* <p>* You can convert any object that supports the Iterable interface into an Observable that emits each item in* the object, with the {@code toObservable} operation.* @param <T> the value type of the items*/
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {final Iterable<? extends T> is;public OnSubscribeFromIterable(Iterable<? extends T> iterable) {if (iterable == null) {throw new NullPointerException("iterable must not be null");}this.is = iterable;}@Overridepublic void call(final Subscriber<? super T> o) {Iterator<? extends T> it;boolean b;try {it = is.iterator();b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!o.isUnsubscribed()) {if (!b) {o.onCompleted();} else {o.setProducer(new IterableProducer<T>(o, it));}}}static final class IterableProducer<T> extends AtomicLong implements Producer {/** */private static final long serialVersionUID = -8730475647105475802L;// 具体的观察者private final Subscriber<? super T> o;// 具体的数据private final Iterator<? extends T> it;IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {this.o = o;this.it = it;}@Overridepublic void request(long n) {if (get() == Long.MAX_VALUE) {// already started with fast-pathreturn;}if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {fastPath();} elseif (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {slowPath(n);}}void slowPath(long n) {// backpressure is requestedfinal Subscriber<? super T> o = this.o;final Iterator<? extends T> it = this.it;long r = n;long e = 0;for (;;) {while (e != r) {if (o.isUnsubscribed()) {return;}T value;try {value = it.next();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}o.onNext(value);if (o.isUnsubscribed()) {return;}boolean b;try {b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!b) {if (!o.isUnsubscribed()) {o.onCompleted();}return;}e++;}r = get();if (e == r) {r = BackpressureUtils.produced(this, e);if (r == 0L) {break;}e = 0L;}}}void fastPath() {// fast-path without backpressurefinal Subscriber<? super T> o = this.o;final Iterator<? extends T> it = this.it;for (;;) {if (o.isUnsubscribed()) {return;}T value;try {value = it.next();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}o.onNext(value);if (o.isUnsubscribed()) {return;}boolean b;try {b  = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!b) {if (!o.isUnsubscribed()) {o.onCompleted();}return;}}}}}
复制代码

响应式编程RxJava (一)相关推荐

  1. 响应式编程Rxjava 书籍视频教程

    转载请注明 AIQ - 最专业的机器学习大数据社区 http://www.6aiq.com AIQ 机器学习大数据 知乎专栏 点击关注 响应式编程业界知秋大佬: <Java 编程方法论响应式 之 ...

  2. Android函数响应式编程——RxJava最快速度入门

    gradle // RxJava compile 'io.reactivex:rxjava:1.2.0' compile 'io.reactivex:rxandroid:1.2.1' 创建Observ ...

  3. RxJava响应式编程学习笔记

    1.概述 RxJava是一个著名的开源库,是ReactiveX(Reactive Extensions)的一种java实现.ReactiveX是一种响应式扩展框架,有很多实现,如RxAndroid,R ...

  4. Android【Retrofit(HTTP客户端),RxJAVA(响应式编程)】

    1 Retrofit(HTTP客户端) 1.1 简介 我们项目当中的每个app都需要用到网络和服务器进行交互,在Android项目开发中使用HTTP协议完成通信的话,基本上都要用到OkHttp或者Re ...

  5. Java响应式(反应式)编程——RxJava

    相关资源 RxJava文档:https://github.com/ReactiveX/RxJava/wiki RxJava中文文档:https://mcxiaoke.gitbooks.io/rxdoc ...

  6. Rxjava响应式编程

    一.Rxjava的思维 (1)响应式编程 (根据上一层的响应来影响下一层的变化) Rx全称:reactivex 链式编程:起点-需求1-需求2-......-终点 eg:登录操作: 触发登录按钮(起点 ...

  7. 响应式编程笔记(二):代码编写

    2019独角兽企业重金招聘Python工程师标准>>> 响应式编程笔记(二):代码编写 博客分类: 架构 原文:Notes on Reactive Programming Part ...

  8. 赠书:响应式编程到底是什么?

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 最近几年,随着Go.Node 等新语言.新技术的出现,J ...

  9. WebFlux基础之响应式编程

    上篇文章,我们简单的了解了WebFlux的一些基础与背景,并通过示例来写了一个demo.我们知道WebFlux是响应式的web框架,其特点之一就是可以通过函数式编程方式配置route.另外究竟什么是响 ...

最新文章

  1. PHP哈希表碰撞攻击原理
  2. 多元价值呼唤教育性父母
  3. 笔试算法题及解答(Python)
  4. MATLAB图像函数 块和邻域的处理
  5. PAT甲级1058 A+B in Hogwarts :[C++题解]字符串,进制,简单
  6. 服务器定期巡检项目,服务器定期巡检制度..docx
  7. js中document.referrer认识
  8. Avalonia跨平台入门第十二篇之动画效果
  9. go mongodb排序查询_《MongoDB》day two
  10. 启动盘Linux windows,Linux 中创建 USB 启动盘来拯救 Windows 用户
  11. ultraedit 运行的是试用模式_单元测试 —— 前后端分离开发模式下后端质量的保证...
  12. jocky1.0.3 (原joc) java混淆器 去除jdk版本限制
  13. 性能测试(二)确定需求,执行测试
  14. c++ 字符串拼接_python字符串零碎总结
  15. 7-10 统计字符出现次数 (20 point(s))
  16. linux screen会话命令
  17. OSS实现多文件多线程的断点下载(java)
  18. Golang 流媒体音视频网络传输开源项目-LAL
  19. Pytorch测试模型的GFLOPs和Param大小
  20. 宝宝泡药浴和直接吃药有什么区别吗?

热门文章

  1. C#中通过单例模式以及Dictionary实现键值对的映射,通过key获取value
  2. Webservice开发之xsd中开发list请求参数的接口
  3. ElementUI介绍以及安装
  4. 一个跨国银行的敏捷转型案例要点之Agile Center
  5. 四大价值观和12准则
  6. c语言程序朴素贝叶斯分类器,生成式学习算法(四)之----朴素贝叶斯分类器
  7. linux下redis权限,Linux(Centos)下Redis开机自启设置
  8. 神策数据荣获北京市广播电视局优秀推荐项目
  9. 奢侈品级别的广告位,到底要不要继续砸钱?
  10. 计算机如何读懂“人话”?五分钟了解文本挖掘那些事儿