操作符(Operator)

合并多个Flux

combineLatest

concat

concatMap

merge

repeat


cache

行为(behavior)

聚合操作

collect


reduce

distinct

group by


scan

其他


错误处理

Backpressure(背压)

event

retry

using

dematerialize,materialize

发布和订阅(hot流和cold流)

subscribe

 //subscribe(consumer,...) 最终调用public final Disposable subscribe(@Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Context initialContext) {return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,completeConsumer,null,initialContext));}public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {subscribe(subscriber);return subscriber;}public final void subscribe(Subscriber<? super T> actual) {CorePublisher publisher = Operators.onLastAssembly(this);CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);try {//如果Publisher是OptimizableOperator,则转换递归调用为loop调用。if (publisher instanceof OptimizableOperator) {OptimizableOperator operator = (OptimizableOperator) publisher;while (true) {subscriber = operator.subscribeOrReturn(subscriber);if (subscriber == null) {// null means "I will subscribe myself", returning...return;}OptimizableOperator newSource = operator.nextOptimizableSource();if (newSource == null) {publisher = operator.source();break;}operator = newSource;}}publisher.subscribe(subscriber);}catch (Throwable e) {Operators.reportThrowInSubscribe(subscriber, e);return;}}
 public final Flux<T> subscriberContext(Context mergeContext) {return subscriberContext(c -> c.putAll(mergeContext));}public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) {return new FluxContextStart<>(this, doOnContext);}

将已有Cold流转变为Hot流

使用Processor构造Hot流

replay

基于时间的操作

timeout

超过设置的时间没有emit 元素,则抛出异常。


delay

defer

元素相关Operator

filter

public final Flux<T> filter(Predicate<? super T> p) {if (this instanceof Fuseable) {return onAssembly(new FluxFilterFuseable<>(this, p));}return onAssembly(new FluxFilter<>(this, p));
}
final class FluxFilter<T> extends InternalFluxOperator<T, T> {final Predicate<? super T> predicate;static final class FilterSubscriber<T>implements InnerOperator<T, T>,Fuseable.ConditionalSubscriber<T> {final CoreSubscriber<? super T> actual;final Context ctx;final Predicate<? super T> predicate;Subscription s;boolean done;
... ... ...                            @Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(this.s, s)) {this.s = s;actual.onSubscribe(this);}}@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t,  this.ctx);return;}boolean b;try {b = predicate.test(t);}catch (Throwable e) {Throwable e_ = Operators.onNextError(t, e,  this.ctx, s);if (e_ != null) {onError(e_);}else {s.request(1);}Operators.onDiscard(t,  this.ctx);return;}if (b) {//发射出 元素。actual.onNext(t);}else {//触发 onDiscardOperators.onDiscard(t,  this.ctx);//请求下一个。s.request(1);}}}
}

filterWhen与filter过程类似,不过将发射这一步修改为放入buffer中,直到流结束将整个buffer返回。

//把upstream的值通过一个Publisher把 value 映射为一个true或false,仅Publisher第一个发射的值被考虑,如果Publisher发射的值为empty,则 输入值不会映射成任何值。
class FluxFilterWhen<T> extends InternalFluxOperator<T, T> {final Function<? super T, ? extends Publisher<Boolean>> asyncPredicate;final int bufferSize;@Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {return new FluxFilterWhenSubscriber<>(actual, asyncPredicate, bufferSize);}static final class FluxFilterWhenSubscriber<T> implements InnerOperator<T, T> {final Function<? super T, ? extends Publisher<Boolean>> asyncPredicate;//缓存大小final int                                               bufferSize;//缓存数组,数组大小会被初始化:大于bufferSize的2个指数值的最小值。通过hash值计算下标 final AtomicReferenceArray<T>                           toFilter;final CoreSubscriber<? super T>                         actual;final Context                                           ctx;int          consumed;//消费者下标long         consumerIndex;long         emitted;Boolean      innerResult;//生产者下标long         producerIndex;Subscription upstream;volatile boolean         cancelled;volatile FilterWhenInner current;volatile boolean         done;volatile Throwable       error;volatile long            requested;volatile int             state;volatile int             wip;//条件常量。代表取消。static final FilterWhenInner INNER_CANCELLED = new FilterWhenInner(null, false);@Overridepublic void request(long n) {if (Operators.validate(n)) {Operators.addCap(REQUESTED, this, n);drain();}}@Overridepublic void onNext(T t) {long pi = producerIndex;int m = toFilter.length() - 1;//计算下标int offset = (int)pi & m;//把元素加入缓存。toFilter.lazySet(offset, t);producerIndex = pi + 1;drain();}    @Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(upstream, s)) {upstream = s;actual.onSubscribe(this);s.request(bufferSize);}}      void drain() {if (WIP.getAndIncrement(this) != 0) {return;}int missed = 1;int limit = Operators.unboundedOrLimit(bufferSize);long e = emitted;long ci = consumerIndex;int f = consumed;int m = toFilter.length() - 1;Subscriber<? super T> a = actual;for (;;) {long r = requested;//发射的数量与requested数量不一样,则一直发送。while (e != r) {if (cancelled) {clear();return;}boolean d = done;//下标int offset = (int)ci & m;T t = toFilter.get(offset);boolean empty = t == null;//如果取消并且未获取到数据,则中断。if (d && empty) {Throwable ex = Exceptions.terminate(ERROR, this);if (ex == null) {a.onComplete();} else {a.onError(ex);}return;}//未取消,但是获取到null,退出。if (empty) {break;}int s = state;if (s == STATE_FRESH) {Publisher<Boolean> p;try {//获取映射到的Publisherp = Objects.requireNonNull(asyncPredicate.apply(t), "The asyncPredicate returned a null value");} catch (Throwable ex) {Exceptions.throwIfFatal(ex);Exceptions.addThrowable(ERROR, this, ex);p = null; //discarded as "old" below}//有Publisher,if (p != null) {//if (p instanceof Callable) {Boolean u;try {u = ((Callable<Boolean>)p).call();} catch (Throwable ex) {Exceptions.throwIfFatal(ex);Exceptions.addThrowable(ERROR, this, ex);u = null; //triggers discard below}//返回true,则发射元素元素t。if (u != null && u) {a.onNext(t);e++;}//false,则取消元素t。else {Operators.onDiscard(t, ctx);}} else {//Publisher不是callable,FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));if (CURRENT.compareAndSet(this,null, inner)) {state = STATE_RUNNING;p.subscribe(inner);break;}}}T old = toFilter.getAndSet(offset, null);Operators.onDiscard(old, ctx);ci++;if (++f == limit) {f = 0;upstream.request(limit);}} elseif (s == STATE_RESULT) {Boolean u = innerResult;innerResult = null;if (u != null && u) {a.onNext(t);e++;}else {Operators.onDiscard(t, ctx);}toFilter.lazySet(offset, null);ci++;if (++f == limit) {f = 0;upstream.request(limit);}state = STATE_FRESH;} else {break;}}//发射的数量与requested数量一样。if (e == r) {if (cancelled) {clear();return;}boolean d = done;int offset = (int)ci & m;T t = toFilter.get(offset);boolean empty = t == null;if (d && empty) {Throwable ex = Exceptions.terminate(ERROR, this);if (ex == null) {a.onComplete();} else {a.onError(ex);}return;}}int w = wip;if (missed == w) {consumed = f;consumerIndex = ci;emitted = e;missed = WIP.addAndGet(this, -missed);if (missed == 0) {break;}} else {missed = w;}}}}static final class FilterWhenInner implements InnerConsumer<Boolean> {final FluxFilterWhenSubscriber<?> parent;final boolean                     cancelOnNext;//是否已完成。boolean done;volatile Subscription sub;static final AtomicReferenceFieldUpdater<FilterWhenInner, Subscription> SUB =AtomicReferenceFieldUpdater.newUpdater(FilterWhenInner.class, Subscription.class, "sub");
... ... ... @Overridepublic void onSubscribe(Subscription s) {//设置sub,并且请求所有元素。if (Operators.setOnce(SUB, this, s)) {s.request(Long.MAX_VALUE);}}@Overridepublic void onNext(Boolean t) {if (!done) {if (cancelOnNext) {sub.cancel();}done = true;parent.innerResult(t);}}@Overridepublic void onError(Throwable t) {if (!done) {done = true;parent.innerError(t);} else {Operators.onErrorDropped(t, parent.currentContext());}}@Overridepublic void onComplete() {if (!done) {done = true;parent.innerComplete();}}void cancel() {Operators.terminate(SUB, this);}}

take

final class FluxTake<T> extends InternalFluxOperator<T, T> {@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}long r = remaining;if (r == 0) {s.cancel();onComplete();return;}remaining = --r;boolean stop = r == 0L;actual.onNext(t);if (stop) {s.cancel();onComplete();}}}

skip

final class FluxSkip<T> extends InternalFluxOperator<T, T> {
static final class SkipSubscriber<T>
}
}

sample

single

element


插入元素

switch

忽略元素值


then

sort


map

 public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {if (this instanceof Fuseable) {return onAssembly(new FluxMapFuseable<>(this, mapper));}return onAssembly(new FluxMap<>(this, mapper));}
final class FluxMap<T, R> extends InternalFluxOperator<T, R> {final Function<? super T, ? extends R> mapper;public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) {if (actual instanceof Fuseable.ConditionalSubscriber) {Fuseable.ConditionalSubscriber<? super R> cs =(Fuseable.ConditionalSubscriber<? super R>) actual;return new MapConditionalSubscriber<>(cs, mapper);}//包装Subscriber为MapSubscriberreturn new MapSubscriber<>(actual, mapper);} static final class MapSubscriber<T, R>implements InnerOperator<T, R> {final CoreSubscriber<? super R>        actual;final Function<? super T, ? extends R> mapper;boolean done;//原始 SubscriptionSubscription s;
... ...@Overridepublic void onSubscribe(Subscription s) {if (Operators.validate(this.s, s)) {this.s = s;actual.onSubscribe(this);}}@Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}R v;try {//应用mapperv = Objects.requireNonNull(mapper.apply(t),"The mapper returned a null value.");}catch (Throwable e) {//对异常进行处理。Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);if (e_ != null) {onError(e_);}else {s.request(1);}return;}//把mapper处理之后的值 发射出去。actual.onNext(v);}@Overridepublic void onError(Throwable t) {if (done) {Operators.onErrorDropped(t, actual.currentContext());return;}done = true;actual.onError(t);}@Overridepublic void onComplete() {if (done) {return;}done = true;actual.onComplete();}
... ...@Overridepublic void request(long n) {//s.request(n);}}}

expand

杂项


拆分

window

buffer

回到同步操作


to

调试类Operator

log

elapsed

timestamp


checkpoint


webflux系列--reactor源码(二)相关推荐

  1. webflux系列--reactor源码(一)

    文章目录 基础 顶级接口 reactor 核心原理 core 声明阶段 FluxOperator OptimizableOperator InternalFluxOperator InnerOpera ...

  2. AQS源码二探-JUC系列

    Python微信订餐小程序课程视频 https://edu.csdn.net/course/detail/36074 Python实战量化交易理财系统 https://edu.csdn.net/cou ...

  3. MVC系列——MVC源码学习:打造自己的MVC框架(一:核心原理)(转)

    阅读目录 一.MVC原理解析 1.MVC原理 二.HttpHandler 1.HttpHandler.IHttpHandler.MvcHandler的说明 2.IHttpHandler解析 3.Mvc ...

  4. MVC系列——MVC源码学习:打造自己的MVC框架(一:核心原理)

    前言:最近一段时间在学习MVC源码,说实话,研读源码真是一个痛苦的过程,好多晦涩的语法搞得人晕晕乎乎.这两天算是理解了一小部分,这里先记录下来,也给需要的园友一个参考,奈何博主技术有限,如有理解不妥之 ...

  5. Java集合Collection源码系列-ArrayList源码分析

    Java集合系列-ArrayList源码分析 文章目录 Java集合系列-ArrayList源码分析 前言 一.为什么想去分析ArrayList源码? 二.源码分析 1.宏观上分析List 2.方法汇 ...

  6. 100种目标检测数据集【voc格式yolo格式json格式coco格式】+YOLO系列算法源码及训练好的模型

    提示:本文介绍并分享了应用于各行业.各领域非常有用的目标检测数据集(感谢您的关注+三连,数据集持续更新中-),其中绝大部分数据集作者已应用于各种实际落地项目,数据集整体质量好,标注精确,数据的多样性充 ...

  7. 阅读react-redux源码(二) - createConnect、match函数的实现

    阅读react-redux源码 - 零 阅读react-redux源码 - 一 阅读react-redux源码(二) - createConnect.match函数的实现 上一节看了Provider组 ...

  8. 抖音seo源码 短视频seo源码二次开发,怎么使用抖音seo源码,视频seo源码私有化部署?

    抖音seo源码 短视频seo源码二次开发,怎么使用抖音seo源码,短视频seo源码私有化部署? 抖音seo源码 短视频seo源码二次开发,怎么使用抖音seo源码,短视频seo源码私有化部署到本地.首先 ...

  9. 拉拉米抢单发单系统源码+二开ui带视频介绍+ 放量功能

    拉拉米抢单发单系统源码+二开ui带视频介绍+ 放量功能 安装搭建说明 服务器系统:Linux+宝塔 亲测环境:Nginx1.16.1+PHP5.6+Mysql5.5 修改数据库配置文件:/config ...

最新文章

  1. json格式 转换的时候 注意是否是类还是数组 微信json为null
  2. android本地xml文件怎么打开,android 打开本地文件
  3. mode: 'history', 去掉路由地址的#
  4. c语言p,用C语言实现P、V操作
  5. 论文多到读不完?不如看看我们为你精选的这 15 篇
  6. java事件大全_Java sctipt常用事件汇总介绍
  7. 程序如何在两个gpu卡上并行运行_深度学习分布式训练相关介绍 - Part 1 多GPU训练...
  8. 【p081】ISBN号码
  9. aspnet_UsersInRoles_GetUsersInRoles
  10. 牛客网数据开发题库_牛客网SQL题库NO.32~40
  11. python网络爬虫资源库名_Python网络爬虫
  12. MAC OS上将项目提交到github
  13. 探索专有领域的端到端ASR解决之道
  14. 史上最低价Surface!微软Surface Laptop Go上架 3700元起
  15. canvas换图时候会闪烁_基于Canvas实现的高斯模糊(上)「JS篇」
  16. 基于动态代理 Mock dubbo 服务的实现方案
  17. rffc2071_基于RFFC2071的变频器设计
  18. C++程序设计:字符图形输出(空白三角形)
  19. Simulink代码生成:通过Matlab Function集成C函数
  20. 算笔账:养老保险应该少交还是多交

热门文章

  1. 由 Windows 向 Linux 迁移字体
  2. 项目经理主要工作职责
  3. 纯CSS无hacks的跨游览器多列布局(转)
  4. 解决引入 lombok 注解不生效
  5. 如何快速压测电商网站?
  6. 静默安装oracle11.2.0.4
  7. 【Python实战】机型自动化标注(搜狗爬虫实现)
  8. GreenPlum的并行查询优化策略
  9. jQuery 9 相对选择器
  10. android底层rsa加密,android 下RSA加密解密