java中的.take(),Rxjava2~take~timer~interval~buffer~filter等源码如何实现(你应该懂的)~学渣带你扣rxjava2...
take()
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver())
输出没错是123
我们面来看看源码
直接来看ObservableTake的subscribeActual,[不懂的同学请看我前面的学渣系列]
protected void subscribeActual(Observer super T> observer) {
source.subscribe(new TakeObserver(observer, limit));
}
这个source是ObservableSource的对象。 那么我们去找实现他的Observable
好吧 又回到了。
public final void subscribe(Observer super T> observer)
subscribeActual(observer);
其他的省略了
关键点一步,这回调用了谁的方法呢? 下面来揭晓
是ObservableObserveOn的subscribeActual
@Override
protected void subscribeActual(Observer super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
}
}
看到了吗 又会调用
source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
然后 又要调用的是ObservableSubscribeOn的subscribeActual
@Override
public void subscribeActual(final Observer super T> s) {
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
大家会好奇这两个地方为什么会被调用呢?
下面我给大家看一个地方
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
O(∩_∩)O
你没有看错
public final Observable observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}
大家可以看到不。 这两个方法返回的也是Observable对象。 所以 他们会分别调用这两个对象subscribeActual方法。好吧,让我们来像下进行。
【下面是一个小扩展 给大家一个小小的感觉】
Observable.just(1, 2, 3, 4, 5)
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver())
看到有什么不同了吗? 我注释掉了一个方法。我为什么要这么干?我注视掉了那么
source.subscribe 会调用谁呢? 我直接给出来答案。大家可以思考一个 当我直接注释之后会调用just的subscribeActual
public final class ObservableFromArray extends Observable {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer super T> s) {
FromArrayDisposable d = new FromArrayDisposable(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
相信大家看过我之前的应该可以看懂。
让我们回归正题当执行到ObservableSubscribeOn的subscribeActual的方法的时候
public void subscribeActual(final Observer super T> s) {
final SubscribeOnObserver parent = new SubscribeOnObserver(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
source.subscribe(parent); 看到这个方法了吗、首先它是异步的。另外执行
.source.subscribe(parent);的时候 ,实际上就执行了ObservableFromArray的subscribeActual
public void subscribeActual(Observer super T> s) {
FromArrayDisposable d = new FromArrayDisposable(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
剩下的就好理解了,都是分别执行onnext等方法。
到这里task的大体思路介绍完毕
2下面开始timer 定时器
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
重复的就不贴了。 都是差不多重复的。 只是给大家贴上关键代码
看到这里面了吗。delay 就是大家贴上的时间。 详细这个大家都是可以看明白的。,
3interval
做周期性操作,从翻译上大家就应该可以看明白
ComputationScheduler的schedulePeriodicallyDirect的方法
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
NewThreadWorker的schedulePeriodicallyDirect的方法
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Future> f = executor.scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
return Disposables.fromFuture(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
分别设置了 什么时候开始。多长时间执行一次
4buffer
Observable> buffered = getObservable().buffer(3, 2);
buffered.subscribe(getObserver());
ObservableBuffer的subscribeActual的方法
protected void subscribeActual(Observer super U> t) {
if (skip == count) {
BufferExactObserver bes = new BufferExactObserver(t, count, bufferSupplier);
if (bes.createBuffer()) {
source.subscribe(bes);
}
} else {
source.subscribe(new BufferSkipObserver(t, count, skip, bufferSupplier));
}
}
好吧到了关键的地方 source.subscribe是调用谁的地方
Observable.just("one", "two", "three", "four", "five");
所以是ObservableFromArray的subscribeActual方法
public void subscribeActual(Observer super T> s) {
FromArrayDisposable d = new FromArrayDisposable(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
看到这个for方法了吗 这个就是决定你跳过的数量的。
5filter
Paste_Image.png
这个相信大家很熟悉,对就是过滤
fromArray(1, 0, 6)
.filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
return integer.intValue() > 5;
}
})
这里只是放出来关键代码
ObservableFilter的onNext
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
actual.onNext(t);
}
} else {
actual.onNext(null);
}
}
这个b就是你的过滤条件。 下面的就是判断。 不符合的就不执行 actual.onNext(t);其实很简单的方式
6skip
和上面同理关键部分ObservableSkip的onNext方法
public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
actual.onNext(t);
}
}
7 scan
Paste_Image.png
RxJava的scan()函数可以看做是一个累加器函数。scan()函数对原始Observable发射的每一项数据都应用一个函数,它将函数的结果填充回可观测序列,等待和下一次发射的数据一起使用。
关键代码
@Override
public void onNext(T t) {
if (done) {
return;
}
final Observer super T> a = actual;
T v = value;
if (v == null) {
value = t;
a.onNext(t);
} else {
T u;
try {
u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The value returned by the accumulator is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
value = u;
a.onNext(u);
}
}
执行的时候value 会累加。 a.onNext(u);在发射出去
8 replay
PublishSubject source = PublishSubject.create();
ConnectableObservable connectableObservable = source.replay(2); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
/*
* it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
*/
connectableObservable.subscribe(getSecondObserver());
replay 这个是缓存操作。
第二次订阅之后,就是缓存后面两个数据
9concat
Paste_Image.png
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable aObservable = Observable.fromArray(aStrings);
final Observable bObservable = Observable.fromArray(bStrings);
Observable.concat(aObservable, bObservable)
.subscribe(getObserver());
他的过程是
return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
concat操作符肯定也是有序的,实际上fromArray(sources)这么一个过程。
10merge
Paste_Image.png
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable aObservable = Observable.fromArray(aStrings);
final Observable bObservable = Observable.fromArray(bStrings);
Observable.merge(aObservable, bObservable)
.subscribe(getObserver());
无序的合并
11distinct 去除重复的
Paste_Image.png
enum HashSetCallable implements Callable> {
INSTANCE;
@Override
public Set call() throws Exception {
return new HashSet();
}
}
HashSet中 是不允许重复元素的
12last
Paste_Image.png
private void doSomeWork() {
getObservable().last("A1") // the default item ("A1") to emit if the source ObservableSource is empty
.subscribe(getObserver());
}
private Observable getObservable() {
return Observable.just("A1", "A2", "A3", "A4", "A5", "A6");
}
打印出来的是a6
ObservableFromArray的run方法
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
ObservableLastSingle的onComplete
public void onComplete() {
s = DisposableHelper.DISPOSED;
T v = item;
if (v != null) {
item = null;
actual.onSuccess(v);
} else {
v = defaultItem;
if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}
last方法会返回Single
13throttleFirst
Paste_Image.png
private void doSomeWork() {
getObservable()
.throttleFirst(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable getObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
从这个可以理解到发送第一个之后。剩下的500之后才会接受第二个
14throttleLast
Paste_Image.png
从这个可以看出来,这是在一段时间内接受最后一个数据
getObservable()
.throttleLast(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable getObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
15debounce
getObservable()
.debounce(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable getObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
Paste_Image.png
这个接受的一一个时间跨度之内的数据
16window
Paste_Image.png
可以看出来大概 的意思就是截取被观察者组成一个新的被观察者
17delay
Paste_Image.png
java中的.take(),Rxjava2~take~timer~interval~buffer~filter等源码如何实现(你应该懂的)~学渣带你扣rxjava2...相关推荐
- java中mdc是什么_MDC是什么鬼?用法、源码一锅端
近期用到阿里的一款开源的数据同步工具 Canal,不经意之中看到了 MDC 的用法,而且平时项目中也多次用到 MDC,趁机科普一把. 通过今天的分享,能让你轻松 get 如下几点,绝对收获满满. a) ...
- Java中的String为什么是不可变的? -- String源码分析
什么是不可变对象? 众所周知, 在Java中, String类是不可变的.那么到底什么是不可变的对象呢? 可以这样认为:如果一个对象,在它创建完成之后,不能再改变它的状态,那么这个对象就是不可变的.不 ...
- 【JAVA进阶】java中的集合(番外篇3)- HashMap源码底层数据结构分析
写在前面的话 脑子是个好东西,可惜的是一直没有搞懂脑子的内存删除机制是什么,所以啊,入行多年,零零散散的文章看了无数,却总是学习了很多也忘了很多. 痛定思痛的我决定从今天开始系统的梳理下知识架构,记录 ...
- Java生鲜电商平台-促销系统的架构设计与源码解析
Java生鲜电商平台-促销系统的架构设计与源码解析 说明:本文重点讲解现在流行的促销方案以及源码解析,让大家对促销,纳新有一个深入的了解与学习过程. 促销系统是电商系统另外一个比较大,也是比较复杂的系 ...
- Java毕设项目菜鸟驿站快递分发系统计算机(附源码+系统+数据库+LW)
Java毕设项目菜鸟驿站快递分发系统计算机(附源码+系统+数据库+LW) 项目运行 环境配置: Jdk1.8 + Tomcat8.5 + Mysql + HBuilderX(Webstorm也行)+ ...
- java毕业设计—— 基于java+JSP+SSH的任务调度系统设计与实现(毕业论文+程序源码)——任务调度系统
基于java+JSP+SSH的任务调度系统设计与实现(毕业论文+程序源码) 大家好,今天给大家介绍基于java+JSP+SSH的任务调度系统设计与实现,文章末尾附有本毕业设计的论文和源码下载地址哦. ...
- Java毕设项目校园外卖系统Web端计算机(附源码+系统+数据库+LW)
Java毕设项目校园外卖系统Web端计算机(附源码+系统+数据库+LW) 项目运行 环境配置: Jdk1.8 + Tomcat8.5 + Mysql + HBuilderX(Webstorm也行)+ ...
- java计算机毕业设计ssm社区团购系统13kbd(附源码、数据库)
java计算机毕业设计ssm社区团购系统13kbd(附源码.数据库) 项目运行 环境配置: Jdk1.8 + Tomcat8.5 + Mysql + HBuilderX(Webstorm也行)+ Ec ...
- java计算机毕业设计广东省梅州市宇恒节能科技有限公司(附源码、数据库)
java计算机毕业设计广东省梅州市宇恒节能科技有限公司(附源码.数据库) 项目运行 环境配置: Jdk1.8 + Tomcat8.5 + Mysql + HBuilderX(Webstorm也行)+ ...
- Java毕设项目餐厅线上点菜系统计算机(附源码+系统+数据库+LW)
Java毕设项目餐厅线上点菜系统计算机(附源码+系统+数据库+LW) 项目运行 环境配置: Jdk1.8 + Tomcat8.5 + Mysql + HBuilderX(Webstorm也行)+ Ec ...
最新文章
- (最新最全)windows使用anaconda安装pytorch进行深度学习并使用GPU加速
- 项目实战-本地自动化部署
- 从零开始入门 K8s | Kata Containers 创始人带你入门安全容器技术
- Shiro-授权(RBAC)
- python合理拆分类别_如何用Python进行词组拆分?
- nginx ------反向代理和负载均衡
- 【APICloud系列|9】APICloud自定义APPloader一个月未更新,快速更新的办法
- JavaScript正则表达式之分组匹配 / 反向引用
- linux 查看gcc安装目录,linux安装GCC详解
- 难道是我洞悉了CSDN网站订阅专栏收益的秘密?带你看看网站专栏一天营收几何?
- html状态码206,http状态码204/206/200理解
- 认知科学、神经科学、和认知神经科学
- Code Review: Rietveld平台的搭建和Rietveld的使用。
- 极客空间-MySQL实战45天-第一天
- 无法连接数据库2003-cant connect to Mysql server on ‘localhost’(10038)
- 不懂java,这篇文章带你入门起飞
- Mutual Graph Learning for Camouflaged Object Detection阅读笔记
- Linux宝塔控制面板如何实现多个二级域名301重定向跳转
- Python中冷门但非常好用的内置函数
- JavaScriptCore内部原理(一):从JS源码到字节码的追踪
热门文章
- linux 下svn安装
- ci框架 乱码 mysql_CodeIgniter(CI)发邮件标题中文乱码解决方案
- html设置分割线怎么下移,HTML设置水平分割线
- 安装SqlServer2014出现(‘无法启动 Windows Management Instrumentation 服务。若要继续安装,必须找到问题并修复该服务‘)
- C#判断对象是不是数组
- C# 查询集合中某个元素里的值
- css 居中50%,CSS中的translate(-50%,-50%)实现水平垂直居中效果
- PHP把商品详情数据加入到商品数据,如果商品有多个详情只展示一次商品数据
- LINUX登录界面,输入密码,循环重启出现
- 泰山JDK8新实现的一个字体斜体效果