Android开发中必不可少会遇到轮询或定时任务,在RxJava诞生之前,我们常常使用Handler+postDelay,或者Java中的Timer来实现,实际上RxJava也可以实现这类需求。下面,我们将分别介绍这几种方案的实现原理。

方案一:使用Handler实现轮询。

Handler提供了postDelay方法可以延迟执行某个Runnable,如果我们在Runnable的run方法中继续将当前Runnable postDelay到Handler中,则可以实现轮询。

下面代码将会间隔1秒打印从1到10。

Handler handler = new Handler(Looper.getMainLooper()); // 全局变量

int count = 0; // 全局变量

void testCount() {

handler.postDelay(new Runnable() {

void run() {

// do something

count++;

Log.d(TAG, "count: " + count);

if (count < 10) {

handler.postDelay(this, 1000);

}

}

}, 1000);

}

原理如上,但是实际上开发过程,我们需要考虑更多,比如如何方便的取消任务,当Activity销毁时候,要及时取消任务以防止内存泄漏,或者出现其他异常导致崩溃。另外,上面代码对于每个任务都需要去手动调用postDelay,属于重复性工作。我们可以对其进行封装,抽象出轮询以及取消的接口,而不需要管内部的实现。

下面我封装一个HandlerTimer类,内部提供下面的接口:

// 延迟执行任务

public TimerTask schedule(final Runnable runnable, final long delay, final TimeUnit delayTimeUnit);

// 轮询任务

public TimerTask schedule(final Runnable runnable, final long delay, final long period, final TimeUnit timeUnit);

// 取消任务

public void cancel(TimerTask timerTask);

schedule方法一个支持延迟执行任务,一个支持轮询,两个方法都返回一个TimerTask类,这个类有个cancel方法可以取消任务执行。当然,HandlerTimer类也提供了cancel接口来取消任务。

源码如下:

public class HandlerTimer {

private Handler handler;

public HandlerTimer(Handler handler) {

this.handler = handler;

}

public TimerTask schedule(final Runnable runnable, final long delay, final TimeUnit delayTimeUnit) {

TimerTask timerTask = new TimerTask() {

@Override

public void doRun() {

runnable.run();

}

};

handler.postDelayed(timerTask, delayTimeUnit.toMillis(delay));

return timerTask;

}

public TimerTask schedule(final Runnable runnable, final long delay, final long period, final TimeUnit timeUnit) {

TimerTask timerTask = new TimerTask() {

@Override

public void doRun() {

runnable.run();

handler.postDelayed(this, timeUnit.toMillis(period));

}

};

handler.postDelayed(timerTask, timeUnit.toMillis(delay));

return timerTask;

}

public void cancel(TimerTask timerTask) {

if (timerTask != null) {

timerTask.cancel();

handler.removeCallbacks(timerTask);

}

}

public static abstract class TimerTask implements Runnable {

private volatile boolean isCancelled;

public abstract void doRun();

public void cancel() {

isCancelled = true;

}

@Override

public void run() {

if (!isCancelled) {

doRun();

}

}

}

}

我们需要传入Handler来构造HandlerTimer类,TimerTask运行在Handler的Looper中,所以如果你的任务比较耗时,切记不要传入MainLooper的Handler,以免产生ANR。可以构造一个HandlerThread,利用其内部的Looper来构造Handler,这样TimerTask则运行在HandlerThread的内部线程中。另外,要记住不用的时候(比如Activity销毁时)还需要调用handlerThread.quit方法来停止线程运行避免内存泄漏。原理如下:

HandlerThread handlerThread = new HandlerThread("loop-timer");

handlerThread.start();

Handler handler = new Handler(handlerThread.getLooper());

HandlerTimer handlerTimer = new HandlerTimer(handler);

...

OK,使用我们封装的类来实现从1打印到10。

Handler handler = new Handler(Looper.getMainLooper()); // 全局变量

int count = 0; // 全局变量

HandlerTimer handlerTimer = new HandlerTimer(handler);

TimeTask timeTask;

void testCount() {

timerTask = handlerTimer.schedule(new Runnable() {

@Override

public void run() {

count++;

Log.d(TAG, "count: " + count);

if (count == 10) {

handlerTimer.cancel(timerTask);

}

}

}, 1, 1, TimeUnit.SECONDS);

}

方案二:使用Java的Timer和TimerTask实现轮询。

代码写法同我们封装的HandlerTimer差不多。

Timer timer = new Timer();

TimerTask countDownTask;

void testCount() {

if (countDownTask != null) {

countDownTask.cancel();

}

count = 0;

countDownTask = new TimerTask() {

@Override

public void run() {

count++;

if (count == 10) {

countDownTask.cancel();

}

}

};

timer.schedule(countDownTask, 1000, 1000);

}

void stopTimer() {

if (timer != null) {

timer.cancel();

}

}

Java的Timer内部有且仅有一个线程,用于执行TimerTask,使用完成后记得调用Timer的cancel方法来关闭整个Timer。单个任务的执行时长,也会影响其他任务的执行。当然上面的HandlerTimer也是如此。因为任务运行在子线程中,如果有更新UI的需求,可以利用Handler post到主线程中执行。

方案三:利用RxJava实现轮询

RxJava的interval操作符可每隔一定时间发射数据,数据从0开始。所以从1打印到10,我们可以这么写:

Disposable disposable; // 全局变量

void testCount() {

disposable = Observable.interval(0, 1, TimeUnit.SECONDS)

.map(new Function() {

@Override

public Long apply(Long aLong) throws Exception {

return aLong + 1;

}

})

.subscribe(new Consumer() {

@Override

public void accept(Long count) throws Exception {

Log.d(TAG, "count: " + count);

if (count == 10) {

if (disposable != null) {

disposable.dispose();

}

}

}

});

}

void stop() {

if (disposable != null) {

disposable.dispose();

}

}

上面我们采用了当满足某个条件(count == 10)时候,手动调用了disposable的dispose方法来终止数据继续发射。实际上,RxJava提供了take操作符,可以用来限定要接收的数据数。那么,改写一下:

disposable = Observable.interval(0, 1, TimeUnit.SECONDS)

.map(new Function() {

@Override

public Long apply(Long aLong) throws Exception {

return aLong + 1;

}

})

.take(10)

.subscribe(new Consumer() {

@Override

public void accept(Long count) throws Exception {

Log.d(TAG, "count: " + count);

}

});

对于上面的例子,实际上还可以再简单一点,RxJava提供了intervalRange操作符,可以限定要发送的数据个数。再改写一下:

disposable = Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS)

.map(new Function() {

@Override

public Long apply(Long aLong) throws Exception {

return aLong + 1;

}

})

.subscribe(new Consumer() {

@Override

public void accept(Long count) throws Exception {

Log.d(TAG, "count: " + count);

}

});

讲了这么多,上面的例子貌似没太多用,毕竟都是简单的轮询计数,也就用来做个UI方面的倒计时有用点。假如,我们需要轮询去做一个耗时操作,比如轮询请求网络呢,这个用RxJava该怎么实现呢?

假设我们的网络请求是从服务器获得某个数,因为是网络请求,所以需要考虑失败的情况。我们伪造一个数据源如下:

private Observable getDataFromServer() {

return Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

if (emitter.isDisposed()) {

return;

}

int randomSleep = new Random().nextInt(5);

try {

Thread.sleep(randomSleep * 1000);

} catch (Exception e) {}

if (emitter.isDisposed()) {

return;

}

if (randomSleep % 2 == 0) {

emitter.onError(new Exception("get fake error for " + randomSleep));

return;

}

emitter.onNext(randomSleep);

emitter.onComplete();

}

});

}

第一个版本:定时发送消息,然后收到消息就执行网络请求

结合上面,我们很容易想到,可以利用interval或者intervalRange操作符定时发送消息,接收到消息后就开始执行网络请求。

CompositeDisposable compositeDisposable = new CompositeDisposable();

@Override

public void start() {

compositeDisposable.dispose();

compositeDisposable = new CompositeDisposable();

loopAtFixRate();

//loopSequence();

}

// 嵌套风格loop, 不管实际结果,反正到点了就执行。

private void loopAtFixRate() {

compositeDisposable.add(Observable.interval(0, 5, TimeUnit.SECONDS)

.subscribe(new Consumer() {

@Override

public void accept(Long aLong) throws Exception {

Log.d(TAG, "interval: " + aLong);

getData();

}

}));

}

private void getData() {

compositeDisposable.add(getDataFromServer()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

Log.d(TAG, "getData: " + integer);

view.showText(integer + "");

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.d(TAG, "getData error " + throwable.getMessage());

view.showText(throwable.getMessage());

}

}));

}

@Override

public void stop() {

if (compositeDisposable != null) {

compositeDisposable.dispose();

}

}

这个版本是最容易想到的,也是最简单的。但是有一些缺点,比如:嵌套风格的Rx代码显得比较丑陋。另外,

每个请求不能按照顺序执行,可能会出现后发的请求先到的情况。如果轮询的任务需要请求顺序执行的话,或者下次轮询的间隔跟请求结果相关联的话,这种方式就不适用。

针对这两种缺点,我们依次试试看。

第二个版本:去掉嵌套的固定请求间隔的loop

利用RxJava的flatMap操作符可以将获取数据的Observable链到原始消息数据源上。这样就不存在嵌套了。OK,先看看下面:

private void loopAtFixRateEx() {

compositeDisposable.add(Observable.interval(0, 5, TimeUnit.SECONDS)

.flatMap(new Function>() {

@Override

public ObservableSource apply(Long aLong) throws Exception {

return getDataFromServer();

}

})

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer value) throws Exception {

Log.d(TAG, "value: " + value);

view.showText(value + "");

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.e(TAG, "loopAtFixRateEx", throwable);

}

}));

}

上面代码看起来是OK的,但是你一跑起来就会发现从服务端拉取数据失败后上面并不会进行重试。原因就在于flatMap链接了getDataFromServer的数据源,而这个数据源抛出的异常会转移到最外面订阅的onError回调中,并且默认出现error,就会dispose整个数据源。所以也就终止了轮询操作。ok,马上分析怎么解决这个问题。

flatMap有个参数delayErrors,如果传入为true,表示遇到错误后不会立即抛出来,等到所有数据发射完了,或者dispose了之后再抛出来。所以,是不是传入delayErrors为true就解决了。

实际上,想多了,这个会带来新的问题,如果任务一直运行,所有的error都会累积到内存中,会导致内存溢出。另外,我实际测试发现,如果有error产生,我调用了dispose,会导致crash。具体怎么解决,我还没找到办法,所以,用flatMap这个就算了。

第三个版本:采用repeat、结合retry实现轮询

RxJava中的repeat操作符可以在原始数据源发射数据完成后重新订阅数据源,而retry可以在原始数据源产生错误后重新订阅数据源。结合起来就可以在无论是成功还是失败的都能重新执行任务,则实现了轮询请求。再结合delay操作符,实现延迟执行任务。

// 按照顺序loop,意味着第一次结果请求完成后,再考虑下次请求

private void loopSequence() {

Disposable disposable = getDataFromServer()

.doOnSubscribe(new Consumer() {

@Override

public void accept(Disposable disposable) throws Exception {

Log.d(TAG, "loopSequence subscribe");

}

})

.doOnNext(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

Log.d(TAG, "loopSequence doOnNext: " + integer);

}

})

.doOnError(new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.d(TAG, "loopSequence doOnError: " + throwable.getMessage());

}

})

.delay(5, TimeUnit.SECONDS, true) // 设置delayError为true,表示出现错误的时候也需要延迟5s进行通知,达到无论是请求正常还是请求失败,都是5s后重新订阅,即重新请求。

.subscribeOn(Schedulers.io())

.repeat() // repeat保证请求成功后能够重新订阅。

.retry() // retry保证请求失败后能重新订阅

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

view.showText(integer + "");

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

view.showText(throwable.getMessage());

}

});

compositeDisposable.add(disposable);

}

上面这个例子是对轮询的其中一个场景的示范,实际上你的需求可能会千奇百怪,比如说,有轮询次数限制,并且轮询间隔需要根据轮询的次数做调整,或者由返回的结果来决定下次轮询的时间间隔等。这时候,可能就需要多尝试,多实验才能玩对了。

RxJava要实现一个完全正确可用的轮询,还是需要多测试的。不过,你对RxJava了解的越多,用起来就越爽。

综上,如果是比较简单的轮询,上面哪种方式都可以玩。如果稍微复杂点呢,如果你没办法用RxJava玩对的话,建议就用原始的Handler+postDelay或者Java的Timer来做,先保证做对,再保证做好。

java定时轮询_RxJava应用场景之轮询定时任务相关推荐

  1. java定时开始和关闭_springboot自带定时器实现定时任务的开启关闭以及定时时间可以配置详解...

    一.序言: 最近项目需要用到定时任务,需要完成一个定时功能.经过了解,项目中目前实现定时任务,一般有三种选择,一是用Java自带的timer类.稍微看了一下,可以实现大部分的指定频率的任务的调度(ti ...

  2. java线程轮询_基于springboot实现轮询线程自动执行任务

    本文使用: Timer:这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务.使用这种方式可以让你的程序按照某一个频度执行, 但不能在指定 ...

  3. java 实现http长轮询,webim使用http长轮询如何保证消息的绝对实时性

    一.webim如何实现消息推送 webim通常有三种方式实现推送通道: 1)WebSocket 2)FlashSocket 3)http轮询 其中1)和2)是用Tcp长连接实现的,其消息的实时性很好理 ...

  4. java定时数据同步_java 定时同步数据的任务优化

    前言 定时任务在系统中并不少见,主要目的是用于需要定时处理数据或者执行某个操作的情况下,如定时关闭订单,或者定时备份.而常见的定时任务分为2种,第一种:固定时间执行,如:每分钟执行一次,每天执行一次. ...

  5. S7-200SMART PLC进行MODBUS通信轮询卡死时重新开启轮询的具体方法演示

    S7-200SMART PLC进行MODBUS通信轮询卡死时重新开启轮询的具体方法演示 我们在进行MODBUS通信轮询时,若遇到轮询卡死时,如何重新开始轮询,具体可以参考以下内容: 如下图所示,正常轮 ...

  6. java定时14点30分_单机定时任务的四种基本实现方式

    引言 在实际项目开发中,定时任务调度是经常会出现的一类需求. 定时任务的场景可以说非常广泛,例如: 购买某些视频网站的会员后,每天给会员送成长值,每月给会员送电影券 在保证最终一致性的场景中,利用定时 ...

  7. java定时关机源码_java实现电脑定时关机的方法

    本文实例讲述了java如何注册成windows服务程序及一个简单的java定时关机程序代码,分享给大家供大家参考.具体方法如下: 一.问题: 最近想找个软件来控制电脑的关机时间,在网上找了几个,都是可 ...

  8. Java定时任务调度工具

    tags: 定时任务调度, title: Java定时任务调度工具 为什么要使用定时任务调度工具? 在最近的开发中,想开发一个"个人备忘录"的功能,实际上就是用户在指定某个时间点后 ...

  9. Java 定时任务调度(8)--ElasticJob 入门实战(ElasticJob-Lite使用)

    ElasticJob 是一个分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成.本文主要介绍 ElasticJob-Lite 的 ...

最新文章

  1. iOS lldb调试
  2. Tensorflow警告:our CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
  3. Elasticsearch之Mapping Meta-Fields
  4. IOS中类和对象还有,nil/Nil/NULL的区别
  5. Linux利器:QEMU!用它模拟开发板能替代真开发板?
  6. STM32外部中断具体解释
  7. mysql的可视化工具_Mysql可视化工具Navicat的基本使用
  8. python显示实时时间校对_Python实现系统时间自动校正
  9. Red Giant Universe 3中文版
  10. 前端vue项目下载zip压缩包及附加
  11. 互联网络业的十大发展趋势
  12. 为何插入10万数据只需2秒
  13. 天嵌i.mx6q--2
  14. HTML 写代码流星雨
  15. JVM堆内存(heap)
  16. Springboot2中文件上传报java.io.FileNotFoundException: C:\Users\WIzarder\AppData\Local\Temp\tomcat.8080.589
  17. 短视频运营课程怎么样
  18. qq音乐正式版官方免费版
  19. java利用Date类做练习,实现倒计时30s功能
  20. 面试题整理出炉附答案,建议收藏

热门文章

  1. listagg()行转列函数
  2. 照着书敲linux下载安装命令?大汇总来咯!!!
  3. printk 内核打印
  4. nginx lua 调试 - 死磕
  5. java计算机毕业设计web家庭财务管理系统源码+数据库+系统+lw文档
  6. Android4.4 wifi代理流程
  7. Android判断网络连接是否可用,WiFi、移动数据是否打开等
  8. python获取实时股票价格,python股票价格实时数据馈送(脚本调试)
  9. 网络安全-防守-护网
  10. PostgreSQL数据库头胎——后台一等公民进程StartupDataBase StartupXLOG函数进入Recovery模式