目录

  • 一、buffer()
    • 1、示例
    • 2、原理
  • 二、buffer(Duration bufferingTimespan)
    • 1、示例
    • 2、原理
  • 三、buffer(Duration bufferingTimespan, Duration openBufferEvery)
    • 1、示例
    • 2、原理

一、buffer()

1、示例

元素经过 buffer 方法后,转换成了 list 传递给订阅者

 @Testpublic void buffer() {Flux.range(0, 5).buffer(2).subscribe(list -> list.stream().forEach(System.out::println));Flux.range(0, 20).buffer(4,2).subscribe(list -> list.stream().forEach(System.out::println));Flux.range(0, 20).buffer(4,2).subscribe(list -> list.stream().forEach(System.out::println));}

2、原理

(1)FluxBuffer

 @Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super C> actual) {if (size == skip) {//每经过size个元素,消费一次return new BufferExactSubscriber<>(actual, size, bufferSupplier);}else if (skip > size) {//丢弃一部分元素return new BufferSkipSubscriber<>(actual, size, skip, bufferSupplier);}else {//一部分元素重叠return new BufferOverlappingSubscriber<>(actual, size, skip, bufferSupplier);}}

(2)BufferExactSubscriber

     @Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}C b = buffer;if (b == null) {try {//获取到 Collection 容器b = Objects.requireNonNull(bufferSupplier.get(),"The bufferSupplier returned a null buffer");}catch (Throwable e) {Context ctx = actual.currentContext();onError(Operators.onOperatorError(s, e, t, ctx));Operators.onDiscard(t, ctx); //this is in no bufferreturn;}buffer = b;}//将元素添加到容器中b.add(t);//校验size,达到设置的size,重置 buffer 并消费元素if (b.size() == size) {buffer = null;actual.onNext(b);}}
     @Overridepublic void onComplete() {if (done) {return;}done = true;C b = buffer;//如果容器中还有元素,消费掉if (b != null && !b.isEmpty()) {actual.onNext(b);}actual.onComplete();}

(3)BufferSkipSubscriber

     @Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, this.ctx);return;}C b = buffer;long i = index;//每经过 skip 个元素,重新获取依次容器if (i % skip == 0L) {try {b = Objects.requireNonNull(bufferSupplier.get(),"The bufferSupplier returned a null buffer");}catch (Throwable e) {onError(Operators.onOperatorError(s, e, t, this.ctx));Operators.onDiscard(t, this.ctx); //t hasn't got a chance to end up in any bufferreturn;}buffer = b;}if (b != null) {//将元素放入容器,达到指定个数时,消费元素b.add(t);if (b.size() == size) {//将buffer设置为null,则之后的元素在下一次获取容器前,都会被丢弃buffer = null;actual.onNext(b);}}else {//droppingOperators.onDiscard(t, this.ctx);}index = i + 1;}

(4)BufferOverlappingSubscriber

     @Overridepublic void onNext(T t) {if (done) {Operators.onNextDropped(t, actual.currentContext());return;}long i = index;//每经过 skip 个元素,重新获取依次容器if (i % skip == 0L) {C b;try {b = Objects.requireNonNull(bufferSupplier.get(),"The bufferSupplier returned a null buffer");}catch (Throwable e) {Context ctx = actual.currentContext();onError(Operators.onOperatorError(s, e, t, ctx));Operators.onDiscard(t, ctx); //didn't get a chance to be added to a bufferreturn;}//放入队列offer(b);}//取出第一个队列元素C b = peek();//如果集合中元素+ 1 达到了指定的sizeif (b != null && b.size() + 1 == size) {//从队列中移除poll();//添加元素b.add(t);//消费元素actual.onNext(b);produced++;}//将元素添加到队列中的每个集合,由于skip小于size,则集合中元素个数达到size时,队列中的集合数量大于1,这是会给每个集合中添加同一个元素for (C b0 : this) {b0.add(t);}index = i + 1;}

二、buffer(Duration bufferingTimespan)

每经过一段时间,传递给订阅者一次数据

通过线程池定时调度 IntervalRunnable,将定时周期内收集到的元素传递给下一个订阅者

1、示例

    @Testpublic void bufferDuration() {Flux.range(0, 20).buffer(Duration.ofSeconds(2)).subscribe(list -> list.stream().forEach(System.out::println));}

2、原理

(1)buffer(Duration.ofSeconds(2))

public final Flux<List<T>> buffer(Duration bufferingTimespan) {return buffer(bufferingTimespan, Schedulers.parallel());}public final Flux<List<T>> buffer(Duration bufferingTimespan, Scheduler timer) {return buffer(interval(bufferingTimespan, timer));}//构建 FluxIntervalpublic static Flux<Long> interval(Duration delay, Duration period, Scheduler timer) {return onAssembly(new FluxInterval(delay.toNanos(), period.toNanos(), TimeUnit.NANOSECONDS, timer));}//构建 FluxBufferBoundary,并将 FluxInterval 作为 other 参数传入public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier) {return onAssembly(new FluxBufferBoundary<>(this, other, bufferSupplier));}

(2)FluxBufferBoundary

 @Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super C> actual) {//存放元素的 buffer 容器C buffer = Objects.requireNonNull(bufferSupplier.get(),"The bufferSupplier returned a null buffer");BufferBoundaryMain<T, U, C> parent =new BufferBoundaryMain<>(source instanceof FluxInterval ?actual : Operators.serialize(actual),buffer, bufferSupplier);actual.onSubscribe(parent);//FluxInterval 作为 BufferBoundaryOther 的发布者,BufferBoundaryOther  包装了 FluxBufferBoundaryother.subscribe(parent.other);return parent;}

(3)FluxInterval

 @Overridepublic void subscribe(CoreSubscriber<? super Long> actual) {Worker w = timedScheduler.createWorker();//创建线程 IntervalRunnableIntervalRunnable r = new IntervalRunnable(actual, w);actual.onSubscribe(r);try {//定时调度线程w.schedulePeriodically(r, initialDelay, period, unit);}catch (RejectedExecutionException ree) {if (!r.cancelled) {actual.onError(Operators.onRejectedExecution(ree, r, null, null,actual.currentContext()));}}}

(4)IntervalRunnable

a. run()

     @Overridepublic void run() {if (!cancelled) {if (requested != 0L) {//调用 BufferBoundaryOther 的 onNext()actual.onNext(count++);if (requested != Long.MAX_VALUE) {REQUESTED.decrementAndGet(this);}} else {cancel();actual.onError(Exceptions.failWithOverflow("Could not emit tick " + count + " due to lack of requests" +" (interval doesn't support small downstream requests that replenish slower than the ticks)"));}}}

b. onNext(U t)

BufferBoundaryOther.java@Overridepublic void onNext(U t) {//调用 FluxBufferBoundary 的otherNext()main.otherNext();}

c. otherNext()

 void otherNext() {C c;try {//获取容器c = Objects.requireNonNull(bufferSupplier.get(),"The bufferSupplier returned a null buffer");}catch (Throwable e) {otherError(Operators.onOperatorError(other, e, this.ctx));return;}C b;synchronized (this) {//重置 bufferb = buffer;buffer = c;}//buffer 为空if (b == null || b.isEmpty()) {return;}//传递 bufferemit(b);}

d. emit(b)

 boolean emit(C b) {long r = requested;if (r != 0L) {//将 buffer 传递给下一个订阅者actual.onNext(b);if (r != Long.MAX_VALUE) {REQUESTED.decrementAndGet(this);}return true;}else {actual.onError(Operators.onOperatorError(this, Exceptions.failWithOverflow(), b, this.ctx));Operators.onDiscardMultiple(b, this.ctx);return false;}}}

(5)将元素存入buffer的过程

FluxBufferBoundary.java@Overridepublic void onNext(T t) {synchronized (this) {C b = buffer;if (b != null) {//将元素存入 buffer 中b.add(t);return;}}Operators.onNextDropped(t, this.ctx);}

三、buffer(Duration bufferingTimespan, Duration openBufferEvery)

bufferingTimespan:收集元素的时间间隔
openBufferEvery:创建新的 buffer 的时间间隔

1、示例

 @Testpublic void bufferDuration() {Flux.range(0, 2000).buffer(Duration.ofSeconds(2),Duration.ofSeconds(1)).subscribe(list -> list.stream().forEach(System.out::println));}

2、原理

(1)buffer(Duration.ofSeconds(2),Duration.ofSeconds(1))

public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery) {return buffer(bufferingTimespan, openBufferEvery, Schedulers.parallel());}public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer) {//两个时间参数一样if (bufferingTimespan.equals(openBufferEvery)) {return buffer(bufferingTimespan, timer);}//创建 FluxInterval,使用参数 openBufferEvery,用于创建 buffer//创建 MonoDelay,使用参数 bufferingTimespan,用于移除buffer,并将移除队列中buffer 发布给订阅者//创建 FluxBufferWhenreturn bufferWhen(interval(Duration.ZERO, openBufferEvery, timer), aLong -> Mono.delay(bufferingTimespan, timer));}

(2)FluxBufferWhen

 @Overridepublic CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super BUFFER> actual) {//创建 BufferWhenMainSubscriberBufferWhenMainSubscriber<T, OPEN, CLOSE, BUFFER> main =new BufferWhenMainSubscriber<>(actual, bufferSupplier, queueSupplier, start, end);actual.onSubscribe(main);//创建 BufferWhenOpenSubscriberBufferWhenOpenSubscriber<OPEN> bos = new BufferWhenOpenSubscriber<>(main);if (main.subscribers.add(bos)) {//FluxInterval 作为 BufferWhenOpenSubscriber 的发布者start.subscribe(bos);return main;}else {return null;}}

(3)FluxInterval

a. run()

 @Overridepublic void run() {if (!cancelled) {if (requested != 0L) {//调用 BufferWhenOpenSubscriber 的 onNext()actual.onNext(count++);if (requested != Long.MAX_VALUE) {REQUESTED.decrementAndGet(this);}} else {cancel();actual.onError(Exceptions.failWithOverflow("Could not emit tick " + count + " due to lack of requests" +" (interval doesn't support small downstream requests that replenish slower than the ticks)"));}}}

b. onNext()

BufferWhenOpenSubscriber.java@Overridepublic void onNext(OPEN t) {//调用 BufferWhenMainSubscriber 的 open()parent.open(t);}

c. open(t)

void open(OPEN token) {Publisher<? extends CLOSE> p;BUFFER buf;try {//创建新的bufferbuf = Objects.requireNonNull(bufferSupplier.get(), "The bufferSupplier returned a null Collection");//p 是 MonoDelayp = Objects.requireNonNull(bufferClose.apply(token), "The bufferClose returned a null Publisher");}catch (Throwable ex) {..}long idx = index;index = idx + 1;synchronized (this) {Map<Long, BUFFER> bufs = buffers;if (bufs == null) {return;}//新的buffer放入缓存bufs.put(idx, buf);}//创建 BufferWhenCloseSubscriberBufferWhenCloseSubscriber<T, BUFFER> bc = new BufferWhenCloseSubscriber<>(this, idx);subscribers.add(bc);//MonoDelay 作为 BufferWhenCloseSubscriber 的发布者p.subscribe(bc);}

(4)BufferWhenCloseSubscriber

MonoDelay 在延时后会调用 BufferWhenCloseSubscriber 的 onNext()

     @Overridepublic void onNext(Object t) {Subscription s = subscription;if (s != Operators.cancelledSubscription()) {SUBSCRIPTION.lazySet(this, Operators.cancelledSubscription());s.cancel();//调用 BufferWhenMainSubscriber 的 close 关闭 index 处的 bufferparent.close(this, index);}}
void close(BufferWhenCloseSubscriber<T, BUFFER> closer, long idx) {//移除subscribers.remove(closer);boolean makeDone = false;if (subscribers.size() == 0) {makeDone = true;Operators.terminate(S, this);}synchronized (this) {Map<Long, BUFFER> bufs = buffers;if (bufs == null) {return;}//移除buffer,并添加到队列中queue.offer(buffers.remove(idx));}if (makeDone) {done = true;}//drain();}

从队列中取出buffer,并将buffer发布给订阅者

 void drain() {if (WINDOWS.getAndIncrement(this) != 0) {return;}int missed = 1;long e = emitted;Subscriber<? super BUFFER> a = actual;Queue<BUFFER> q = queue;for (;;) {long r = requested;while (e != r) {if (cancelled) {Operators.onDiscardQueueWithClear(q, this.ctx, BUFFER::stream);return;}boolean d = done;if (d && errors != null) {Operators.onDiscardQueueWithClear(q, this.ctx, BUFFER::stream);Throwable ex = Exceptions.terminate(ERRORS, this);a.onError(ex);return;}BUFFER v = q.poll();boolean empty = v == null;if (d && empty) {a.onComplete();return;}if (empty) {break;}a.onNext(v);e++;}if (e == r) {if (cancelled) {Operators.onDiscardQueueWithClear(q, this.ctx, BUFFER::stream);return;}if (done) {if (errors != null) {Operators.onDiscardQueueWithClear(q, this.ctx, BUFFER::stream);Throwable ex = Exceptions.terminate(ERRORS, this);a.onError(ex);return;}else if (q.isEmpty()) {a.onComplete();return;}}}emitted = e;missed = WINDOWS.addAndGet(this, -missed);if (missed == 0) {break;}}}

(5)将元素存入buffer的过程

BufferWhenMainSubscriber.java@Overridepublic void onNext(T t) {synchronized (this) {//所有创建的 buffersMap<Long, BUFFER> bufs = buffers;if (bufs == null) {return;}if (bufs.isEmpty()) {Operators.onDiscard(t, this.ctx);return;}for (BUFFER b : bufs.values()) {//将元素添加到每个 bufferb.add(t);}}}

flux 中的 buffer 的原理相关推荐

  1. 二十五、Node中的Buffer缓冲器和EventEmitter事件触发器

    @Author:Runsen @Date:2020/6/5 作者介绍:Runsen目前大三下学期,专业化学工程与工艺,大学沉迷日语,Python, Java和一系列数据分析软件.导致翘课严重,专业排名 ...

  2. buffer pool mysql_理解Mysql中的Buffer pool

    Buffer Pool在数据库里的地位 1.回顾一下Buffer Pool是个什么东西? 数据库中的Buffer Pool是个什么东西?其实他是一个非常关键的组件,数据库中的数据实际上最终都是要存放在 ...

  3. 网站统计中的数据收集原理及实现(js埋点实现)

    网站统计中的数据收集原理及实现 网站统计 埋点 Web Openresty 网站数据统计分析工具是网站站长和运营人员经常使用的一种工具,比较常用的有谷歌分析.百度统计和腾讯分析等等.所有这些统计分析工 ...

  4. 嵌入式操作系统VxWorks中网络协议存储池原理及实现

    嵌入式操作系统VxWorks中网络协议存储池原理及实现 周卫东 蔺妍 刘利强 (哈尔滨工程大学自动化学院,黑龙江 哈尔滨,150001) 摘  要  本文讨论了网络协议存储池的基本原理和在嵌入式操作系 ...

  5. Linux操作系统中内存buffer和cache的区别

    我们一开始,先从Free命令说起. free 命令相对于top 提供了更简洁的查看系统内存使用情况: $ free                      total  used   free  s ...

  6. jQuery中getJSON跨域原理详解

    详见:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp28 jQuery中getJSON跨域原理详解 前几天我再开发一个叫 河蟹工 ...

  7. 超专业解析!10分钟带你搞懂Linux中直接I/O原理

    导语 | 本文主要以一张图为基础,向大家介绍Linux在I/O上做了哪些事情,即Linux中直接I/O原理,希望本文的经验和思路能为读者提供一些帮助和思考. 引言 我们先看一张图: 这张图大体上描述了 ...

  8. Linux中内存buffer和cache的区别

    Linux中内存buffer和cache的区别 分类: LINUX 原文地址:Linux中内存buffer和cache的区别 作者:platinaluo 细心的朋友会注意到,当你在linux下频繁存取 ...

  9. DockOne微信分享(一一二):Flannel中vxlan backend的原理和实现

    本文讲的是DockOne微信分享(一一二):Flannel中vxlan backend的原理和实现[编者的话]Overlay网络是kubernetes网络模型的重要解决方案之一,而Flannel作为焦 ...

最新文章

  1. SAP RETAIL 参考PO创建分配表之二
  2. linux编程两个子进程,Linux中fork同时创建多个子进程的方法
  3. Visual Studio 2008 安装失败(“Web 创作组件”无法安装)解决方法
  4. adb 命令的个人记录
  5. proc文件系统探索 之 以数字命名的目录
  6. b站弹幕姬python_自用 Bilibili 弹幕姬 for macOS
  7. linux系统 锐捷_Client for RuiJie(锐捷客户端 for linux) 升级版
  8. 为什么我们要写单元测试用例?
  9. The JSR-133 Cookbook for Compiler Writers 中英对照版翻译
  10. activiti设计器会签人员配置
  11. Eclipse常用便捷设置
  12. 戴尔服务器重装系统识别不到硬盘,戴尔台式机重装系统(戴尔台式机重装系统找不到硬盘)...
  13. cf596B. Wilbur and Array
  14. 电脑通过wifi连接手机(adb移动设备连接电脑)
  15. 某头部证券机构云化与信创双转型深度解析|信创专题
  16. CCPC 1010 YJJ's Salesman
  17. php 按汉字拼音排序,php 数组按中文拼音排序
  18. 【黑马Java笔记+踩坑】Maven高级
  19. 微信小程序电商项目商品详情页开发实战之数据绑定与事件应用
  20. C语言练习二 :找出一个二维数组的鞍点

热门文章

  1. 调试输出信息OutPutDebugString
  2. 贝尔生物再度备战上市:拟赴上交所主板IPO,已实现连续盈利
  3. 视频监控RTSP 客户端
  4. FPU与VFP最全面解释
  5. H5兼容性问题解决方法
  6. java调用命令行校对系统时间
  7. 字符串转为日期,日期转为字符串
  8. 泼辣修图服务器没有响应,泼辣修图使用常见问题整理,为你答疑解惑
  9. django-视图集ViewSet
  10. c语言程序24转换12时间,C语言将24小时制转换为12小时制的方法