文章目录

  • lamda与FunctionalInterface
  • Reactive Programming、Reactive Streams和Reactor
  • Thread per Connection 和 Reactor in Single Thread
  • spring mvc和spring webflux
  • Reactor使用
  • 总结
  • 写在最后

lamda与FunctionalInterface

lamda表达式,其本质是一段函数,确切的说是一段闭包。我们当然可以通过定义普通的函数来代替lamda,但是lamda形式的函数使得函数的定义更加灵活,同时可以让你的代码看起来很简洁。

在java8以前,如果想使用lamda表达式(确切的说是像使用lamda表达式那样取调用一个函数),只能使用接口+匿名类的方式:

    // 定义一个接口interface TestInterface {void method();}static void main(String[] args) {// 定义接口的匿名实现TestInterface testInterface = new TestInterface() {@Overridepublic void method() {System.out.println("hello world");}};// 调用接口testInterface.method();}

从Java8开始,引入了lamda表达式和functioninterface。所谓functioninterface,是指当接口中有且只有一个需要实现的函数时,可以给此接口加上@FunctionalInterface注解,同时此接口可以采用lamda的形式进行实现。

使用lamda表达式:

    // 定义一个接口@FunctionalInterface//注意这里添加了注解interface TestInterface {void method();}static void main(String[] args) {// 定义接口的匿名实现TestInterface testInterface = () -> {System.out.println("hello world");};// 调用接口testInterface.method();}

@FunctionalInterface注解严格来说更像一种标识,用来标识当前接口符合FunctionalInterface规范,因此如果接口本身已经符合了 FunctionalInterface规范,是可以不用添加@FunctionalInterface注解的。因此上面示例中的@FunctionalInterface注解是可以省略的。

java8在引入FunctionalInterface的同时,还将一些高频使用的场景进行了封装,这些接口都位于java.util.function路径下,例如:

public interface Consumer<T>:只有一个参数的函数

public interface BiConsumer<T, U>:两个参数的函数

public interface Function<T, R>:一个入参,一个返回值的函数

public interface Supplier<T>:只有一个返回值的函数

其他更多的示例可以直接查看源码,这些基本能覆盖80%以上的场景。

同时,一些Java以前的接口,例如RunnableCallable,也都添加了@FunctionalInterface注解,因此在使用这些接口时,可以直接使用lamda表达式去编写他们的实现。


Reactive Programming、Reactive Streams和Reactor

Reactive Programming,中文称反应式编程,是一种高性能应用的编程方式。其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术( Promise编程)。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。

和Reactive Programming对应的就是Imperative Programming(指令式编程),我们常用的 Java、Python 等语言写代码的常见编程风格即为指令式编程,此风格的特点是代码执行顺序和编写顺序基本一致。

Reactive Programming也可以称为Observable模式(可以类比观察者模式),Imperative Programming可以称为Iterable模式,对应的前者即为推模式,后者为拉模式,推的是事件,拉的是指令。

在Java 平台上,Netflix、TypeSafe、Pivatol共同制定了一个被称为 Reactive Streams 的项目规范,用于制定反应式编程相关的规范以及接口,此规范主要的接口有这三个:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 主要包含了 onNext、onError、onCompleted 这三个方法。对于 Reactive Streams,大家只需要理解其思想就可以。

在Spring 5中,作为在背后支持其反应式编程的框架 Reactor,onNext对应Reactor的doOnNext()方法;onError对应Reactor的onErrorContinue()onErrorResume()onErrorReturn()方法;onCompleted对应Reactor的doOnSuccess()方法。

一句话,Reactive Programming是一种编程方式;Reactive Streams是针对Reactive Programming制定的编程规范;Reactor是实现了Reactive Streams的编程框架(有具体的组件包),这些为spring webflux提供了组件基础。


Thread per Connection 和 Reactor in Single Thread

在操作系统中,进程(process)是线程(thread)的集合,线程是进程中的最小执行单位。在处理并发时,我们有两种编程思路,一个是多线程方式,一个是协程方式(python里的概念,这里我想不到更好的名词了)。

  • Thread per Connection:即一请求一线程,为每个请求分配一个线程,此线程负责请求的执行,当请求执行结束时,线程也随之结束。在没有nio之前,这是传统的java网络编程和servlet等所采用的即为线程模型。此方案的优点即实现简单,缺点则是方案的伸缩性受到线程数的限制。
  • Reactor in Single Thread:即为将需要执行的多个任务放置到一个队列中,并通过事件驱动的方式(通常做法),将每个任务交由某个线程去执行。在此方式下,即使只有一个线程,也可以实现一种伪多线程(参考python的协程),此方式的实现方式之一即为IO多路复用。此方案的优点是不受线程数的限制,且适合于CPU资源紧张的应用上。基于nio的mina、netty等框架,就是使用的此方式;缺点是受限于使用场景,仅适合于IO密集的应用,不太适合CPU密集的应用。

spring mvc和spring webflux

先说springmvc,springmvc刚诞生时,servlet 大行其道(现在也是),所以springmvc是完全基于servlet的。servlet的设计思路即为内部启动了一个线程池,池内会有一定数量的空闲线程,当有http请求进入时,servlet会从线程池中获取一空闲线程负责http请求的执行。因此springmvc的并发数极容易受线程池容量的限制,当请求数超过线程池的容量时,会直接导致请求无法被处理。因此在高并发场景下,如果使用springmvc,只能通过增加线程池上限(但线程数量又受cpu核心数和操作系统允许的open files数量限制)和扩展物理机器的方式来增加整个系统的吞吐量。其实对于servlet来将,其自身是典型的io密集型而非cpu密集型,是不需要启动很多线程的,在这种背景下spring webflux应运而生。

Spring 5中引入了一个基于 Netty 而不是 Servlet 的高性能的 Web 框架,Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。spring webflux将Netty和Reactor集成在一起用于处理web请求(所以说spring webflux的诞生离不开nio和Reactive Programming),能充分发挥两者的优势,从而极大增加框架的吞吐量(业务自身的耗时不在讨论之列)。
我们先来看以下两者的编程区别,当我们使用springmvc时,通常的编码是这样的:

@RestController
@RequestMapping("/test")
public class TestController {@RequestMapping("/hello")public String demo(){return "hello world";}}

当我们使用spring webflux时,通常的编码是这样的:

@RestController
@RequestMapping("/test")
public class TestController {@RequestMapping(value = "/foobar")public Mono<String> demo() {return Mono.just("hello world");}}

两者的变化即为由之前的直接返回目标对象改为返回一个Mono/Flux对象(Mono和Flux的介绍在后面)。

这里可以看到,当我们使用Reactor编程时,大部分情况下并不需要直接接触Publisher、Subscriber、Subcription等接口,因为这些接口已经被spring webflux封装了,我们只需要构建我们的Mono/Flux对象即可,spring webflux会自动把Mono/Flux对象所包含的任务推送到任务队列中并被Reactor和Netty处理。

此外,当我们使用feign进行服务调用时,在springmvc框架下和spring webflux框架下,所使用的feign是不同的。
在springmvc框架下,通常使用spring-cloud-starter-openfeign(内部实现是feign+ribbon)来进行http服务调用;在spring webflux下,目前还没有正式的框架可以使用,不过有一个官方还为正式发布的可以使用feign-reactor-spring-cloud-starter,不过目前此项目还在孵化器中。


Reactor使用

具体的说是Mono和Flux的使用,先放一段Reactor中关于Mono和Flux的介绍:

  • Mono

A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.
The recommended way to learn about the Mono API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the “which operator do I need?” appendix
The rx operators will offer aliases for input Mono type to preserve the “at most one” property of the resulting Mono. For instance flatMap returns a Mono, while there is a flatMapMany alias with possibly more than 1 emission.
Mono should be used for Publisher that just completes without any value.
It is intended to be used in implementations and return types, input parameters should keep using raw Publisher as much as possible.
Note that using state in the java.util.function / lambdas used within Mono operators should be avoided, as these may be shared between several Subscribers.

  • Flux

A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
The recommended way to learn about the Flux API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the “which operator do I need?” appendix .
It is intended to be used in implementations and return types. Input parameters should keep using raw Publisher as much as possible.
If it is known that the underlying Publisher will emit 0 or 1 element, Mono should be used instead.
Note that using state in the java.util.function / lambdas used within Flux operators should be avoided, as these may be shared between several Subscribers.
subscribe(CoreSubscriber) is an internal extension to subscribe(Subscriber) used internally for Context passing. User provided Subscriber may be passed to this “subscribe” extension but will loose the available per-subscribe Hooks.onLastOperator.

通俗来说,Mono就是Reactor中一个基础的任务单元,或者说是一段lamda表达式指令。所以我们查看Mono的各种方法时,入参基本都是FunctionalInterface,这这段指令内,我们可以接收若干参数,同时返回若干结果值;甚至可以返回另一个Mono对象。

Flux是Mono的集合,Mono是Reactor中的基础任务单元,Flux就是若干个基础单元的集合。因此Flux更适合处理并发任务(作为类比,Mono是串行的)。

下面介绍Mono的一些创建方式。

从对象创建:

Mono.just("hello world")//从字符串创建一个Mono对象,当任务被执行时,会返回此字符串
.subscribe();
Mono.just(123)//从int创建一个Mono对象
.subscribe();
Mono.empty()//创建一个不返回任何值的Mono对象,函数原型为`public static <T> Mono<T> empty()`,因此返回值可以被当作任何类型
.subscribe();

RunnableFuture等对象创建,基本都是使用Mono.fromXxxx()方法创建

Mono.fromRunnable(()->{//从Runnable创建,这里使用了lamda表达式,当Mono被执行时,此lamda包裹的代码段会被执行System.out.println("hello world");
});
Mono.fromCallable(() -> {//从Callable创建,和Runnable的区别是带返回值return "Hello world";
})
.subscribe();

创建一个Mono,且这个Mono会创建另一个Mono,此场景使用还是比较多的

Mono.defer(()->{return Mono.just("hello world");//注意这里返回了一个Mono,而非其他类型
})
.subscribe();

在上一个Mono处理完后,如果需要处理另一个Mono,可以使用Mono.then()方法进行连接:public final <V> Mono<V> then(Mono<V> other)

Mono.empty().then(Mono.fromRunnable(() -> {System.out.println("hello world");}))
.subscribe();

接收参数(消费者)

Mono.just("hello world").doOnNext((e) -> {//e的类型为Mono.just()返回的类型System.out.println(e);//打印hello world}).doOnNext((e) -> {//这里调用了两次doOnNext,因为变量会在doOnNext间传递System.out.println(e);//打印hello world})
.subscribe();

变量转换(变量变形,就是编程中的map操作)

Mono.just("hello world").map((e) -> {System.out.println(e);//打印hello worldreturn "simon's dream";//这里返回了一个新的变量}).doOnNext((e) -> {System.out.println(e);//打印simon's dream})
.subscribe();
Mono.just("hello world").flatMap((e) -> {//这里使用了flatMap,flatMap和map的区别是,map是直接转换,flatMap是返回一个包裹了新值的MonoSystem.out.println(e);//打印hello worldreturn Mono.just("simon's dream");}).doOnNext((e) -> {System.out.println(e);//打印simon's dream})
.subscribe();

缓存。当调用Mono.cache()方法后,当前产生的值会被缓存下来,当有新的订阅者加入后,Mono任务会从调用了cache()方法的位置开始执行,而非从头开始

Mono a1 = Mono.defer(() -> {System.out.println("start");return Mono.just("a");
});a1.subscribe((e) -> {System.out.println("s1:" + e);
});
a1.subscribe((e) -> {System.out.println("s2:" + e);
});System.out.println("==============================");
a1 = a1.cache()
;a1.subscribe((e) -> {System.out.println("s21:" + e);
});
a1.subscribe((e) -> {System.out.println("s22:" + e);
});

输出:

start < - 输出start

s1:a

start < - 输出start

s2:a

==============================

start < - 输出start

s21:a

s22:a

可以看到,当调用了cache后,新的subscribe会从cache处开始执行

异常处理

Mono.just("hello world").map((e) -> {throw new RuntimeException("这是手动扔出的异常");}).doOnError(e -> {//doOnError会拦截到异常,但不会捕获异常,这里可以打印异常的信息,同时异常会被继续向上抛e.printStackTrace();}).onErrorResume(e -> {//onErrorResume会捕获异常,同时产生一个新的Mono继续后续其他操作e.printStackTrace();return Mono.just("simon's dream");}).doOnNext(e -> {System.out.println(e);//打印simon's dream}).doOnSuccess(e -> {System.out.println(e);//打印simon's dream})
;

repeat,如果循环执行某个Mono,可以使用repeatWhen(),repeatWhen()会根据条件判断是否循环执行

int count = 0;
Mono.fromRunnable(() -> {System.out.println("hello world");//会被打印3次}).repeatWhen(Repeat.onlyIf((e) -> {count++;return count < 3;})).subscribe()
;

retry,如果需要在出现error时自动重试,可使用retryWhen()

int count = 0;
Mono.fromRunnable(() -> {System.out.println("hello world");//会打印三次throw new RuntimeException("手动扔出的异常");}).retryWhen(Retry.onlyIf((e) -> {count++;return count < 3;})).onErrorResume((e) -> {System.out.println(e.getMessage());//打印一次return Mono.empty();}).subscribe()
;

filter 过滤,对上一步产生的值进行判断,并返回true/false。当返回true时,后续的的doOnNext可以正常执行;当返回false时,后续的doOnNext会被忽略,其他不需要接收参数的任务不受影响,例如then等

Mono.just(10).filter((e) -> {return e > 5;}).doOnNext((e) -> {System.out.println("after filter1:" + e);//正常输出}).filter((e) -> {return e < 5;}).doOnNext((e) -> {System.out.println("after filter2:" + e);//没有输出}).then(Mono.fromRunnable(() -> {System.out.println("then");//正常输出})).subscribe()
;

doOnSuccess、doOnError、doFinally

当所有mono正常执行完毕,没有出现任何异常时,doOnSuccess会被执行;当有任何异常且异常未被捕获时,doOnError会被执行(注意doOnError自身并不会捕获异常);doFinally在任何条件下都会执行

Mono.just("hello world").doOnSuccess((e) -> {System.out.println(e);//打印hello world}).doFinally((e) -> {System.out.println("doFinally");//打印doFinally}).subscribe();
Mono.just("hello world").doOnNext((e) -> {System.out.println(e);//打印hello worldthrow new RuntimeException("手动扔出的异常");}).doOnSuccess((e) -> {//因为出现了Exception,因此doOnSuccess并不会被执行System.out.println("doOnSuccess:" + e);}).doOnError((e) -> {System.out.println(e.getMessage());//打印异常信息}).doFinally((e) -> {System.out.println("doFinally");//打印doFinally}).subscribe();

zip 并发。我们可以使用zip方法来同时执行多个Mono,并把他们的结果汇总在一起。注意以下多个Mono实际是并发执行的

Mono<Integer> mono1 = Mono.just(1);
Mono<Integer> mono2 = Mono.just(2);
Mono<Integer> mono3 = Mono.just(3);Mono.zip(mono1, mono2, mono3).doOnNext(e -> {System.out.println("结果:" + e);//输出[1,2,3],注意e是一个Tripple类型}).subscribe()

也可以用这种写法

Mono<Integer> mono1 = Mono.just(1);
Mono<Integer> mono2 = Mono.just(2);
Mono<Integer> mono3 = Mono.just(3);Mono.zip((e) -> {return e;}, mono1, mono2, mono3).doOnNext(e -> {System.out.println("结果:" + Arrays.asList(e));//输出[1,2,3],注意e是一个Object[]类型}).subscribe()
;

Mono的用法基本就是这些了,实际使用时就是这些用法的排列组合。

可以看到,Reactive Programming和Imperative Programming编程思路还是相差很大的,Reactive就是各种代码段的组合,然后变量会在各个代码段之间流动;Imperative Programming则是定义各个方法,然后各个方法之间互相调用。因此在使用Reactor时,需要考虑清除业务的处理流程,然后将整个业务流程拆分成一段一段的过程,然后用Mono去实现。

在来说说Flux,首先先看一下Mono和Flux的解释:

  • Mono

  • FLux

可以看到,Mono每次是对单个任务进行操作,Flux是对多个任务同时进行操作,因此Flux更适合多元素处理的场景。

这里先用前面的filter来演示一下Flux是运作的

Flux.just(1,2,3,4,5,6,7,8,9)//这里可以理解为同时产生了多个Mono,且每个Mono包裹了一个元素,每个Mono会被分别交给后续的任务进行处理.filter((e) -> {//每个元素都会被此过滤器过滤,其中通过的元素会流动到后续其他任务return e > 5;}).collectList()//收集所有元素并产生一个List.doOnNext((list) -> {//这里的入参是一个ListSystem.out.println("after filter1:" + list);//输出 after filter1:[6, 7, 8, 9]}).then(Mono.fromRunnable(() -> {System.out.println("then");//输出then})).subscribe()
;

Flux的另一个特点是能对结果集执行reduce操作

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
Set set = new HashSet();Flux.fromIterable(list).reduce(set, (initial, e) -> {//将每个元素过滤并添加到Set中if (e > 5) {initial.add(e);}return initial;}).doOnNext(e -> {//e是Set类型System.out.println("结果:" + e);//输出:[6, 7, 8, 9]}).subscribe()
;

我们在了解了这些后,就可以在spring webflux编写Reactor风格的程序了。

总结

本文开始先介绍了lamda与FunctionalInterface,因为在Reactor中大量用到了它们;然后介绍了Reactive Programming、Reactive Streams和Reactor的区别与联系,只有理解了这些才能理解Reactor编程的思路;之后对比了Thread per Connection 和 Reactor in Single Thread及spring mvc和spring webflux,用来对比当下比较流行的处理http请求的方式;在最后列举了Reactor中Mono和Flux的常见用法。

写在最后

Reactor在编程复杂度和代码可读性上,相比传统的方式更加的复杂,因此对编写者的要求较高,而且当Reactor程序出现问题时排查问题也较为困难。但Reactor在高并发场景下优势更加明显,其编写的程序吞吐量相对传统风格程序有质的提升。因此我们需要仔细评估可能带来的技术风险和对程序性能的提升,最终确定是否需要在项目中使用。

Reactor编程之旅相关推荐

  1. 父与子的编程之旅:与小卡特一起学Python.pdf

    下载地址:网盘下载 编辑推荐 编程是一项充满乐趣的挑战,想要上手也非常容易!这本<父与子的编程之旅:与小卡特一起学Python>中,Warren和Carter父子以亲切的笔调.通俗的语言, ...

  2. 与小卡特一起学python 豆瓣_《父与子的编程之旅:与小卡特一起学Python》

    原标题:<父与子的编程之旅:与小卡特一起学Python> 内容简介 本书是一本家长与孩子共同学习编程的入门书.作者是一对父子,他们以Python语言为例,详尽细致地介绍了Python如何安 ...

  3. 《Microduino实战》——3.2 “Hello World!”—— 开启编程之旅

    本节书摘来自华章出版社<Microduino实战>一 书中的第3章,第3.1节,作者:姚琪 杨立斌,更多章节内容可以访问云栖社区"华章计算机"公众号查看. 3.2 &q ...

  4. python参考书推荐--父与子的编程之旅

    之学ython用的是 笨办法学Python 这本书讲的很细,但练习分得太细,太繁琐,光是打印就分了好几个练习,其实这些小知识点在用的过程中慢慢就记住了,大费周折的有点没意思. 后来在国图又看见这本 零 ...

  5. 学习参考《父与子的编程之旅python【第二版】》高清中文版PDF+高清英文版PDF+源代码...

    对于初步接触编程语言的朋友,推荐看一看<父与子的编程之旅第2版>,对于完全编程零基础的很友好! 图文并茂,过多的文字堆垒很容易让人产生厌倦情绪,也更容易让人产生放弃的想法.使用了大量插图, ...

  6. 新手编程之旅:零基础如何学习MySQL?

    SQL如何学习? SQL入门有一本常常提起的书<SQL in 10minutes>这本可以看看.常年amazon销量前列.或者动物社的,learning SQL,也是很好的. 你需要在实际 ...

  7. 高考完?入门级的开源项目带你开启编程之旅

    记得我刚高考结束时,并没有想象中的狂欢,反而是一种处于一种坐立不安的焦虑中,因为那时单纯地认为:这张试卷,将决定我的一生.对于将信仰寄托于高考的学生来说,当网页上高考成绩弹出的一瞬间,世界要么明亮要么 ...

  8. python面对对象建立自己的电子宠物的编码_父与子的编程之旅:与小卡特一起学Python(第3版)(全彩印刷)...

    Python青少年编程启蒙畅销书全新升级! 上一版豆瓣评分8.5分,被众多老师.家长推荐! 左耳朵耗子.爱编程的魏校长.周自恒倾力推荐! 1.第3版的示例使用Python 3 而不是Python 2, ...

  9. Storm入门与实践(3)通过WordCount展开Storm的编程之旅

    介绍 貌似WordCount已经成了大数据,分布式计算的入门标配程序,其实仔细想一下WordCount的例子,它还有很用应用的场景,例如统计过去一段时间网站中各个商品的浏览量,最近一段时间相同查询的数 ...

最新文章

  1. ajax content download,关于ajax的content-download时间过慢问题的解决方案与思考
  2. Hive Serde、Beeline、JDBC
  3. Could not get dialect instance.
  4. 大数据算法:对5亿数据进行排序
  5. JS的parseInt
  6. debconf: DbDriver config: /var/cache/debconf/config.dat is locked by another process
  7. java writeint_Java DataOutputStream.writeInt(int v)类型
  8. MSChart BarChart
  9. 香港年轻人买房压力有多大
  10. mysql 数据库安装命令_数据库mysql安装及最基本命令
  11. MSP430使用__delay_cycles实现延时1ms和1us
  12. 【Python3之模块及包的导入】
  13. 关于Spring Cloud Netflix
  14. PyQt4---QTextBrowser
  15. 【Python】:修改图片后缀
  16. 【MyBatis】 动态SQL——模糊查询 LIKE
  17. office2010如何使用excel冻结窗格
  18. 为什么你的人脉都没什么用!
  19. 金明的预算budget题解
  20. python气象绘图技巧之箱线图

热门文章

  1. XM7 FOR ANDROID,Android-UI组件(一):布局管理器
  2. java.io.IOException: Cannot run program “***.exe“ CreateProcess error=2, 系统找不到指定的文件,java调用可执行程序
  3. 0~5VDC线性模拟量电压输出开口式电流互感器
  4. 动真格!4名博士生,拟被985大学退学
  5. 欢迎光临我的公众号和我的博客
  6. 数据挖掘:降低汽油精制过程中的辛烷值损失模型(二)
  7. 爱客ikcrm企业级应用的“免费”是鸡汤?
  8. http_request
  9. JDK11变化详解,JDK8升级JDK11详细指南
  10. windows版grub2在NTFS格式U盘安装并使用grub4dos引导win8pe