原标题:使用RxJava来实现网络请求轮询功能

近日有媒体报道称,腾讯重金入股永辉超市旗下生鲜超市超级物种,目前交易已经完成。受此刺激,永辉超市股价迅速涨停,午后临时停牌。若此举成行,超级物种将更有底气对垒阿里巴巴的盒马鲜生,生鲜商超的新零售市场将展开激烈争战。

作者简介

各位小伙伴们大家早上好,新的一周又开始了,希望大家都能有个好心情迎接新的一周。

本篇来自juexingzhe的投稿,分享了一种通过RxJava来轮询的实现方式,希望大家喜欢!

juexingzhe的博客地址:

http://www.jianshu.com/u/ea71bb3770b4

前言

轮询的功能很常见了,之前 Android 中比较常用的方式就是通过 Handler 来实现,发送一个 Deley 消息,在 handlerMessage 再根据条件发送消息,这种方式需要小心内存泄漏,需要自己处理这个问题。这个不是我们今天的重点,今天来看下另外一种轮询的实现方式,通过 RxJava 来实现。

开始

通过 RxJava 来实现的方式有下面两个特点:

自动解除轮询和订阅关系,没有内存泄漏的风险;

可以跟 Activity 或者 Fragment 生命周期绑定,自动停止轮询

涉及到的RxJava知识点:

Subject

TakeUntil

Filter

Compose

因为后面很多逻辑用到上面的操作符,所以先简单看下这几个操作符,热热身。

Subject

从代码可以看出来 Subject 既可以当观察者也可以当被观察者。

publicabstractclassSubject extendsObservable implementsObserver

所以可以在生命周期中通过 Subject 发送事件然后又自己接收,从而根据事件类型做相应的操作。

Subject 总共有四种类型

AsyncSubject

BehaviorSubject

PublishSubject

ReplaySubject

今天我们就说下第二种类型 BehaviorSubject,它可以给订阅者发送订阅前最近的事件和订阅后发送的事件:

图中橙色的就是订阅前最近发送的事件,在订阅后也可以收到。文字解释始终太苍白,我们来看下代码:

BehaviorSubject behaviorSubject = BehaviorSubject.create(); behaviorSubject.onNext( 1); behaviorSubject.onNext( 2); behaviorSubject.subscribe( newConsumer() { @Overridepublicvoidaccept(Integer integer)throwsException{ Timber.tag(TAG).d( "running num : "+ integer); } });

behaviorSubject.onNext( 3); behaviorSubject.onNext( 4);

上面代码运行结果就是收到2, 3,4

TakeUntil

这是一个操作符,可以这样用

AObservable.takeUntil(BObservable)

可以 AObservable 监听另外一个 BObservable,如果 BObservable 开始发送数据,AObservable 就不再发送数据。

看一下官方的图片解释,B 发送 0 数据后,A 就停止发送数据了。

talk is cheap, show me the code:

Observable.interval( 1, TimeUnit.SECONDS). subscribeOn(Schedulers.io()). takeUntil(Observable.timer( 5, TimeUnit.SECONDS)). subscribe( newConsumer() { @Overridepublicvoidaccept(Long num)throwsException{ Timber.tag(TAG).d( "running num : "+ num); } });

上面代码的意思就是从0开发每隔1秒发送一个数据,5s时停止发送,看下运行结果,和我们的预期完美一致:

Filter

filter 操作符就是过滤的意思,只有事件满足过滤条件时被观察者才会发送给观察者。看下官方的解释图,很清晰明了我就不做解释了哈。

看一下怎么用,这个代码的意思还是每个1s发送数据,但是会进行过滤只发送偶数,也是5秒后停止发送:

Observable.interval( 1, TimeUnit.SECONDS). subscribeOn(Schedulers.io()). filter( newPredicate() { @Overridepublicbooleantest(Long aLong)throwsException{ returnaLong % 2== 0; } }). takeUntil(Observable.timer( 5, TimeUnit.SECONDS)). subscribe( newConsumer() { @Overridepublicvoidaccept(Long num)throwsException{ Timber.tag(TAG).e( "running num : "+ num); } });

上面代码的运行效果,确实是只收到了偶数。

Compose

compose 操作符是用来对 Observable 进行转换操作的,并且可以保证调用链不被破坏。

比如我们经常这样用:

Observable.interval( 1,TimeUnit.SECONDS) .subscribeOn(Schedulers.io()). observeOn(AndroidSchedulers.mainThread());

这部分代码经常写,怎么进行封装呢?可能有的小伙伴立马就想到下面的方法:

privateObservable composeObservable(Observable observable){ returnobservable.subscribeOn(Schedulers.io()). observeOn(AndroidSchedulers.mainThread()); }

但是上面这样用就破坏了调用链了,因为你肯定得这样调用,这样就会变得怪怪的,不是Observable 开头了,变成函数开头。

composeObservable(Observable.interval( 1,TimeUnit.SECONDS)).subscribe( newConsumer() { @Overridepublicvoidaccept(Long aLong)throwsException{ } });

这个问题用 compose 就可以完美解决:

Observable.interval( 1, TimeUnit.SECONDS). compose(bindUntil( 5)). subscribe( newConsumer() { @Overridepublicvoidaccept(Long num)throwsException{ Timber.tag(TAG).d( "running num : "+ num); } }); privateObservableTransformer bindUntil( finallongdeleyTime) { returnnewObservableTransformer() { @OverridepublicObservableSource apply(Observable upstream){ returnupstream.subscribeOn(Schedulers.io()).takeUntil(Observable.timer(deleyTime, TimeUnit.SECONDS)); } }; }

操作符就到这了,需要详细了解的小伙伴可以自行参考官方文档哈。下面进入我们的正文,首先看下怎么使用。

使用

目前有两种使用方式:

bindIntervalEvent 就是绑定事件进行轮询,事件发生时将停止轮询

bindLifeCycle 就是绑定生命周期,在指定生命周期发生时停止轮询

在开始之前我们先定义事件 Event,其中 FragmentEvent 对应 Fragment 的生命周期,ActivityEvent 对应 Activity 的生命周期,BizEvent 对应我们自定义的事件:

INTERVAL 就是对应 RxJava 中的 interval 操作符产生的周期事件,可以制定轮询间隔;

TIMER 就是对应的 timer 事件,可以制定多长事件后产生一个事件;

STOP 就是停止事件,这个是自定义的;

ALL 可以匹配所有事件。

public

interfaceEvent{

enumFragmentEvent implements EventInterface{ ATTACH, CREATE, CREATE_VIEW, START, RESUME, PAUSE, STOP, DESTROY_VIEW, DESTROY, DETACH }

enumActivityEvent implements EventInterface{ CREATE, START, RESUME, PAUSE, STOP, DESTROY }

enumBizEvent implements EventInterface{ INTERVAL, TIMER, STOP, ALL } }

talk is cheap, show me the code, 使用起来也很简单,首先看下第一种的使用,在 Activity中添加两个按钮,一个开始轮询,一个停止轮询,布局太简单了就不贴代码了哈,看下重点代码:

privatestaticfinalString TAG = MainActivity.class.getSimpleName() + "_POLLING"; //开启轮询

PollingManager.getInstance().bindIntervalEvent( 1, TAG, Event.BizEvent.INTERVAL, null); //停止轮询

PollingManager.getInstance().stopPolling(TAG, Event.BizEvent.INTERVAL);

看下日志打印情况,接收到 INTERVAL 事件后就停止轮询了。

接着看下第二种使用方式,有两个步骤:

承 BaseActivity,其中接口 LifeInterface 需要自己实现

publicabstractclassBaseActivityextendsActivityimplementsLifeInterface

publicinterfaceLifeInterface{ voidbindLife(); String getTag(); }

在需要轮询的 Activity 实现接口 LifeInterface 的两个方法,看下例子:

@Override

publicString getTag(){ returnTAG; } @Override

publicvoidbindLife(){ PollingManager.getInstance().bindLifeCycle(getTag(), Event.ActivityEvent.PAUSE); }

上面这个例子监听 PAUSE 事件,在 Activity 进入 onPause 时会停止轮询,看下日志打印情况:

完全符合我们的预期哈。下面我们来看下代码实现。

PollingManager

主要逻辑在 PollingManager 中,这个是这个工具的门面,有点类似于外观模式。

首先是单例模式,activeSubjectMap 是 Subject 的仓库,所有注册的轮询 Model 保存的地方。

privateHashMap> activeSubjectMap; privatestaticPollingManager manager; privatePollingManager(){ activeSubjectMap = newHashMap<>(); } publicstaticPollingManager getInstance(){ if( null== manager) { synchronized(PollingManager.class) { if( null== manager) { manager = newPollingManager(); } } } returnmanager; }

对上面的轮询 Model 进行下说明,每个 Model 封装了轮询器,RxJava 订阅关系 disposable和 Subject。disposable 就是用来停止轮询的时候解除订阅关系防止内存泄漏。

//SubjectprivateBehaviorSubject behaviorSubject; //订阅关系privateDisposable disposable; //轮询器privatePollingRequest pollingRequest; publicvoidclearSubject(){ if( null== disposable || disposable.isDisposed()) return; disposable.dispose(); }

每个轮询需要做的工作可以抽象出来就是上面的 PollingRequest,注释比较清楚就不说了,每个 PollingRequest 对外接口就是 execute,其中 doAction 是在每个轮询到的时候会进行调用。

publicabstractclassPollingRequest{ //每个Subject的唯一标识protectedString tag; //事件接口protectedEventInterface eventInterface; //轮询动作protectedPollingAction pollingAction; publicPollingRequest(String tag, EventInterface eventInterface, PollingAction pollingAction){ this.tag = tag; this.eventInterface = eventInterface; this.pollingAction = pollingAction; } publicabstractDisposable execute(PollingManager pollingManager); publicString getTag(){ returntag; } publicEventInterface getEventInterface(){ returneventInterface; } } publicinterfacePollingAction{ voiddoAction(Object accept); }

看下 IntervalPolling 的实现方式,逻辑也比较简单,就是每隔 intevals 进行轮询,轮询间隔会调用 doAction 完成动作。

publicclassIntervalPollingextendsPollingRequest{ privateintinteval; publicIntervalPolling(intinterval, String tag, EventInterface eventInterface, PollingAction action){ super(tag, eventInterface, action); this.inteval = interval; } @OverridepublicDisposable execute(PollingManager manager){ returnObservable.interval(inteval, TimeUnit.SECONDS). compose(manager.composeEvent(tag, eventInterface)). observeOn(AndroidSchedulers.mainThread()). doOnNext( newConsumer() { @Overridepublicvoidaccept(Long aLong)throwsException{ Timber.tag(Constants.TAG).d( "emit interval polling, Tag = "+ tag + ", num = "+ aLong); } }). subscribe( newConsumer() { @Overridepublicvoidaccept(Long num)throwsException{ if( null!= pollingAction) { pollingAction.doAction(num); } Timber.tag(Constants.TAG).d( "running interval polling, Tag = "+ tag + ", num = "+ num); } }); } }

上面可能比较费解的逻辑就是这一行:

compose(manager.composeEvent(tag, eventInterface))

调用 PollingManager 中的 composeEvent 方法,跟进去看看:

publicObservableTransformer composeEvent( finalString tag, finalEventInterface outEvent) { BehaviorSubject subject = getSubject(tag); if( null== subject) { Timber.tag(Constants.TAG).e( "subject = null"); returnnewEmptyObservableTransformer(); } finalObservable observable = subject.filter( newPredicate() { @Overridepublicbooleantest(EventInterface event)throwsException{ Timber.tag(Constants.TAG).i( "receive event: %s", event); booleanfilter = outEvent == event || event == ALL; if(filter) clearSubject(tag); returnfilter; } }); returnnewObservableTransformer() { @OverridepublicObservableSource apply(Observable upstream){ returnupstream.subscribeOn(Schedulers.io()).takeUntil(observable); } }; }

首先就是 takeUntil 操作符,当Subject发送数据时, IntervalPolling 就会停止轮询;

Subject 什么时候发送数据?就是在 subject.filter 返回真的时候。Subject 会根据接收到的Event 和订阅时的 Event 进行相等,或者接收到的事件是 ALL 都会返回真。

其实上面的逻辑需要对 RxJava 有一定的了解,这个不在本文的范围,小伙伴们自行网上查阅哈。轮询器,Model 和触发条件都有了,剩下的问题就是创建启动和销毁的问题了,先看下创建。

创建启动

先看下第一种绑定事件的创建方式:

publicBehaviorSubject bindIntervalEvent(intinterval, @NonNull String tag, @NonNull EventInterface eventInterface, PollingAction action){ //1.创建轮询器IntervalPolling intervalPolling = newIntervalPolling(interval, tag, eventInterface, action); //2.创建SubjectcreateSubject(intervalPolling); //3.启动轮询startPolling(tag); //4.返回SubjectreturnactiveSubjectMap.get(tag).getBehaviorSubject(); }

逻辑比较简单哈,其中第二步创建 Subject 时会将 Subject 和轮询器缓存到 HashMap> activeSubjectMap;其中 key 就是 Subject 的唯一标识 tag。

生命周期的创建方式也是一样的四个步骤,唯一不一样的就是这里轮询器是生命周期轮询器。

publicBehaviorSubject bindLifeCycle(@NonNull String tag,@NonNull EventInterface eventInterface){ //1.创建轮询器PollingRequest request = newLifePolling(tag, eventInterface, null); //2.创建SubjectcreateSubject(request); //3.启动轮询startPolling(tag); //4.返回SubjectreturnactiveSubjectMap.get(tag).getBehaviorSubject(); }

创建分析完了,就看下怎么停止轮询了。

停止

停止的逻辑其实就是发射事件给 Subject,这样 Subject 自己可以接收到,然后进入 Filter的逻辑进行判断,和创建的时候注册事件或者 ALL 事件一致就会停止轮询了。

publicbooleanstopPolling(String tag, EventInterface event){ BehaviorSubject subject = getSubject(tag); if( null== subject) { Timber.tag(Constants.TAG).e( "can not find subject according to the %s", tag); returnfalse; } subject.onNext(event); Timber.tag(Constants.TAG).i( "Stop Polling SubjectTag = "+ tag + ", Event = "+ event.toString()); returntrue; }

最后再补充一点就是发射事件的逻辑,会扫描 activeSubjectMap 中的所有 Subject,然后发射事件:

publicvoidemitEvent( @NonNull EventInterface event){ if( null== activeSubjectMap) return; for(Map.Entry> next : activeSubjectMap.entrySet()) { BehaviorSubject behaviorSubject = next.getValue().getBehaviorSubject(); if( null== behaviorSubject) return; behaviorSubject.onNext(event); } }

总结

到这里基本涉及的逻辑都分析完了,希望能提供给到家另外一种轮询的实现方式,如果有什么问题欢迎留言哈,谢谢!返回搜狐,查看更多

责任编辑:

java 轮询请求_使用RxJava来实现网络请求轮询功能相关推荐

  1. c++ post请求_前端工程师进阶:网络请求方法详解,GET和POST的区别

    1 前言 最近看了一些同学的面经,发现无论什么技术岗位,还是会问到 get 和 post 的区别,而搜索出来的答案并不能让我们装得一手好逼,那就让我们从 HTTP 报文的角度来撸一波,从而搞明白他们的 ...

  2. Android OkHttp+Retrofit+Rxjava+Hilt 的网络请求封装

    今天给大家简单的封装一个现在比较流行的网络请求框架 第一步是导入我们所需要的依赖还需要在android {}闭包下添加一个 buildFeatures{viewBinding true } imple ...

  3. rxswift 网络请求_使用RxSwift将身份验证请求链接到多个服务

    rxswift 网络请求 At a company that I have worked in the past, a high-traffic online classifieds, the bac ...

  4. ajax jq 图片上传请求头_全面分析前端的网络请求方式:Ajax ,jQuery ,axios,fetch

    链接:https://juejin.im/post/5c9ac607f265da6103588b31 一.前端进行网络请求的关注点 大多数情况下,在前端发起一个网络请求我们只需关注下面几点: 传入基本 ...

  5. java代码请求2次_Android基于OkHttpUtils网络请求的二次封装

    OkHttpUtils网络请求为什么进行二次封装? 1.减少代码量 2.后期换网络处理框架方便 二次封装的实现原理 1.将网络请求提取在一个方法中 2.对里面的可变参数,可以通过参数传递过去,也可以提 ...

  6. 6748如何设置edma为事件触发方式_全面分析前端的网络请求方式

    作者|ConardLi 编辑|覃云 来源|code 秘密花园公众号 一.前端进行网络请求的关注点 大多数情况下,在前端发起一个网络请求我们只需关注下面几点: 传入基本参数(url,请求方式) 请求参数 ...

  7. Retrofit网络请求框架使用简析——Android网络请求框架(四)

    题记:-- 很累,累到想要放弃,但是放弃之后将会是一无所有,又不能放弃, 唯有坚持,唯有给自忆打气,才能更勇敢的走下去,因为无路可退,只能前行, 时光一去不复返,每一天都不可追回,所以要更珍惜每一存光 ...

  8. Android Http请求框架二:xUtils 框架网络请求

    一:对Http不了解的请看 Android Http请求框架一:Get 和 Post 请求 二.正文 1.xUtils 下载地址 github 下载地址  : https://github.com/w ...

  9. uniapp网络请求获取数据_2.uni-app 发起网络请求

    ## uni.request(OBJECT) 发起网络请求. **OBJECT 参数说明** ![](https://box.kancloud.cn/a90bf284df069eddde4019c04 ...

最新文章

  1. python使用正则表达式删除字符串中的其它字符只保留数字和字母
  2. 域控制器活动目录之备份与恢复
  3. 【v8】一些关于内存泄漏的踏坑
  4. 使用Moq框架的各种模拟设置
  5. [置顶] Ubuntu 12.04中文输入法的安装
  6. go 并发的非阻塞缓存
  7. 技术沙龙 | 深度赋能AI全场景,揭秘你不知道的移动云
  8. 复制 在div层加滚动条
  9. 锂离子电池性能测试软件,锂离子电池有什么性能测试设备?
  10. linux修改系统语言为中文
  11. Nginx 配置子域名
  12. 图解机器学习算法(6) | 决策树模型详解(机器学习通关指南·完结)
  13. PCB学习笔记——DRC检查
  14. PHP开发手机自动拨号软件
  15. 如何把一个3D的stp文件导入AD做边框
  16. 苹果专用视频下载工具:Downie 3 for Mac
  17. 时艳强对话Ricky Ng:交易平台新势力 引领行业新变革
  18. STM8开发实例-I2C
  19. 曾经风靡的1394接口是什么?有什么作用?
  20. 用 Photoshop 计算命令优化美女照片皮肤

热门文章

  1. @Valid参数校验
  2. AFT三代引擎登录加密算法
  3. 23/24哈工程计算机考研解读
  4. SpringBoot集成SocketIO
  5. 【Python】如何简单获取糗事百科?【详细步骤】
  6. ViewBag的使用方法
  7. Node.JS实战60:解除“封印”!给Node更多的内存。
  8. group conv 和 depthwise conv
  9. 别人为什么愿意跟你相处
  10. Python爬虫教程-00-写在前面