springboot 使用webflux响应式开发教程(一)
什么是webFlux
左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,从上到下依次是Router Functions,WebFlux,Reactive Streams三个新组件。
Router Functions: 对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。
在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux<DataBuffer>格式,以便进行统一处理。
值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。
@GetMapping("/reactive/restaurants")public Flux<Restaurant> findAll() {return restaurantRepository.findAll();}
可以看到主要变化就是在 返回的类型上Flux<Restaurant>
Flux和Mono 是 Reactor 中的流数据类型,其中Flux会发送多次,Mono会发送0次或一次
使用webflux需要具备的基础是Reactive programming 的理解。 Reactor 的基础 和 熟练的java8 lambda使用
创建springboot应用
下面通过创建股票报价的demo来演示。
通过 https://start.spring.io 或idea自带功能创建springboot项目,groupId为io.spring.workshop,artifactId为 stock-quotes。
勾选 ReactiveWeb
修改 application.properties 配置文件,指定接口 8081
server.port=8081
启动应用,成功后控制台输出日志
日志显示使用Netty而不是tomcat,后续会使用Tomcat
股票报价生成
定义实体
@Data public class Quote {private static final MathContext MATH_CONTEXT = new MathContext(2);private String ticker;private BigDecimal price;private Instant instant;public Quote() {}public Quote(String ticker, BigDecimal price) {this.ticker = ticker;this.price = price;}public Quote(String ticker, Double price) {this(ticker, new BigDecimal(price, MATH_CONTEXT));}@Overridepublic String toString() {return "Quote{" +"ticker='" + ticker + '\'' +", price=" + price +", instant=" + instant +'}';} }
定义生成器
@Component public class QuoteGenerator {private final MathContext mathContext = new MathContext(2);private final Random random = new Random();private final List<Quote> prices = new ArrayList<>();/*** 生成行情数据*/public QuoteGenerator() {this.prices.add(new Quote("CTXS", 82.26));this.prices.add(new Quote("DELL", 63.74));this.prices.add(new Quote("GOOG", 847.24));this.prices.add(new Quote("MSFT", 65.11));this.prices.add(new Quote("ORCL", 45.71));this.prices.add(new Quote("RHT", 84.29));this.prices.add(new Quote("VMW", 92.21));}public Flux<Quote> fetchQuoteStream(Duration period) {// 需要周期生成值并返回,使用 Flux.intervalreturn Flux.interval(period)// In case of back-pressure, drop events .onBackpressureDrop()// For each tick, generate a list of quotes.map(this::generateQuotes)// "flatten" that List<Quote> into a Flux<Quote>.flatMapIterable(quotes -> quotes).log("io.spring.workshop.stockquotes");}/*** Create quotes for all tickers at a single instant.*/private List<Quote> generateQuotes(long interval) {final Instant instant = Instant.now();return prices.stream().map(baseQuote -> {BigDecimal priceChange = baseQuote.getPrice().multiply(new BigDecimal(0.05 * this.random.nextDouble()), this.mathContext);Quote result = new Quote(baseQuote.getTicker(), baseQuote.getPrice().add(priceChange));result.setInstant(instant);return result;}).collect(Collectors.toList());} }
使用webflux创建web应用
webflux的使用有两种方式,基于注解和函数式编程。这里使用函数式编程,先贴代码:
创建QuoteHandler
@Component public class QuoteHandler {private final Flux<Quote> quoteStream;public QuoteHandler(QuoteGenerator quoteGenerator) {this.quoteStream = quoteGenerator.fetchQuoteStream(ofMillis(1000)).share();}public Mono<ServerResponse> hello(ServerRequest request) {return ok().contentType(TEXT_PLAIN).body(BodyInserters.fromObject("Hello Spring!"));}public Mono<ServerResponse> echo(ServerRequest request) {return ok().contentType(TEXT_PLAIN).body(request.bodyToMono(String.class), String.class);}public Mono<ServerResponse> streamQuotes(ServerRequest request) {return ok().contentType(APPLICATION_STREAM_JSON).body(this.quoteStream, Quote.class);}public Mono<ServerResponse> fetchQuotes(ServerRequest request) {int size = Integer.parseInt(request.queryParam("size").orElse("10"));return ok().contentType(APPLICATION_JSON).body(this.quoteStream.take(size), Quote.class);} }
创建Router
@Configuration public class QuoteRouter {@Beanpublic RouterFunction<ServerResponse> route(QuoteHandler quoteHandler) {return RouterFunctions.route(GET("/hello").and(accept(TEXT_PLAIN)), quoteHandler::hello).andRoute(POST("/echo").and(accept(TEXT_PLAIN).and(contentType(TEXT_PLAIN))), quoteHandler::echo).andRoute(GET("/quotes").and(accept(APPLICATION_JSON)), quoteHandler::fetchQuotes).andRoute(GET("/quotes").and(accept(APPLICATION_STREAM_JSON)), quoteHandler::streamQuotes);} }
需要注意的是在springboot中Handler和Router都需要打上@Configuration。
HTTP请求交由Router转发给对应的Handler,Handler处理请求,并返回Mono<ServerResponse>,这里的Router类似@RequestMapping,Handler类似Controller。这么理解非常容易。
运行项目,浏览器输入 http://localhost:8081/hello 或者 使用curl,即可收到 "Hello Spring!"的文本信息。
到目前为止,一个简单的webflux示例已经完成,但是还没有体现出它与传统模式有何不同。
下面我们来做一下测试:
$ curl http://localhost:8081/echo -i -d "WebFlux workshop" -H "Content-Type: text/plain" HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: text/plainWebFlux workshop
还是没有区别T.T,看下一步。
$ curl http://localhost:8081/quotes -i -H "Accept: application/stream+json" HTTP/1.1 200 OK transfer-encoding: chunked Content-Type: application/stream+json{"ticker":"CTXS","price":82.77,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"DELL","price":64.83,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"GOOG","price":881,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"MSFT","price":67.3,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"ORCL","price":48.1,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"RHT","price":85.1,"instant":"2018-05-15T06:45:51.261Z"} {"ticker":"VMW","price":92.24,"instant":"2018-05-15T06:45:51.261Z"} -------------------------------无敌分割线------------------------------------- {"ticker":"CTXS","price":85.7,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"DELL","price":64.12,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"GOOG","price":879,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"MSFT","price":67.9,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"ORCL","price":46.43,"instant":"2018-05-15T06:45:52.260Z"} {"ticker":"RHT","price":86.8,"instant":"2018-05-15T06:45:52.260Z"} ...
上面的分割线是为了易于分辨人为加上去的,我们看到返回结果每隔一秒刷新一次,不终止的话会一直返回数据,传统的Request/Response是一次请求,一次返回。
注意是设置了Header Accept: application/stream+json ,
如果将Header设置为 Accept: application/json ,只会得到一次Response。
写测试
springboot的test模块包含WebTestClient,可以用来对webflux服务端进行测试。
@RunWith(SpringRunner.class) // We create a `@SpringBootTest`, starting an actual server on a `RANDOM_PORT` @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class StockQuotesApplicationTests {// Spring Boot will create a `WebTestClient` for you,// already configure and ready to issue requests against "localhost:RANDOM_PORT" @Autowiredprivate WebTestClient webTestClient;@Testpublic void fetchQuotes() {webTestClient// We then create a GET request to test an endpoint.get().uri("/quotes?size=20").accept(MediaType.APPLICATION_JSON).exchange()// and use the dedicated DSL to test assertions against the response .expectStatus().isOk().expectHeader().contentType(MediaType.APPLICATION_JSON).expectBodyList(Quote.class).hasSize(20)// Here we check that all Quotes have a positive price value.consumeWith(allQuotes ->assertThat(allQuotes.getResponseBody()).allSatisfy(quote -> assertThat(quote.getPrice()).isPositive()));}@Testpublic void fetchQuotesAsStream() {List<Quote> result = webTestClient// We then create a GET request to test an endpoint.get().uri("/quotes")// this time, accepting "application/stream+json" .accept(MediaType.APPLICATION_STREAM_JSON).exchange()// and use the dedicated DSL to test assertions against the response .expectStatus().isOk().expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON).returnResult(Quote.class).getResponseBody().take(30).collectList().block();assertThat(result).allSatisfy(quote -> assertThat(quote.getPrice()).isPositive());} }
参考文章:
https://docs.spring.io/spring-framework/docs/5.0.3.RELEASE/spring-framework-reference/web.html#web-reactive-server-functional
http://projectreactor.io/docs
https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html
https://blog.csdn.net/qq_34438958/article/details/78539234
springboot 使用webflux响应式开发教程(一)相关推荐
- springboot 使用webflux响应式开发教程(二)
本篇是对springboot 使用webflux响应式开发教程(一)的进一步学习. 分三个部分: 数据库操作 webservice websocket 创建项目,artifactId = tradin ...
- 浅谈响应式开发与自适应布局!
谈到响应式,大家不自觉的会想到什么? 首先映入眼帘的便是随着网页宽度变化而网页内容呈现出不同内容的效果!那么由来是什么呢? 2009时间段,互联网发生了一件天大的事情! 那就是在北京时间2009年6月 ...
- Android项目驱动式开发教程 第2版,《Android项目驱动式开发教程》第一章开发入门.ppt...
<Android项目驱动式开发教程>第一章开发入门 1.4 项目框架分析 4 android:versionName="1.0" > 5 8 第9行代码andro ...
- linux字体栅格化,响应式开发---网页的布局方式、媒体查询、栅格化布局、less语言...
1.响应式开发介绍 a.网页布局方式 b.响应式布局 优点:用一个页面适配不同终端的展示 缺点:产生代码冗余,同时使网页体积变得很庞大,不会因为终端的改变而改变网页的体积,不同终端上有些没有显示出来的 ...
- 用rem来做响应式开发
电脑版的商城昨晚做完了,今天赶着做手机端的,提到手机端的网站第一个想到的就是要 适应不同手机屏幕的宽度,保证在不同手机上都能正常显示给用户,我之前做这类网站都是无脑引进bootstrap的.但前一个项 ...
- 移动端WEB开发之响应式布局(响应式开发原理、bootstrap、阿里百秀案例)
移动端WEB开发之响应式布局 1.1 响应式开发原理 就是使用媒体查询针对不同宽度的设备进行布局和样式的设置,从而适配不同设备的目的. 设备的划分情况: <!DOCTYPE html> & ...
- 响应式开发中合理选定CSS媒体查询分割点
本文响应式开发中合理选定CSS媒体查询分割点翻译自David Gilbertson的The-100%-Correct-Way-To-Do-CSS-breakpoints一文.本文唔看上去有些拗口,不过 ...
- 响应式开发---网页的布局方式、媒体查询、栅格化布局、less语言
1.响应式开发介绍 a.网页布局方式 b.响应式布局 优点:用一个页面适配不同终端的展示 缺点:产生代码冗余,同时使网页体积变得很庞大,不会因为终端的改变而改变网页的体积,不同终端上有些没有显示出来的 ...
- 微金所页面制作(Bootstrap 响应式开发 栅格布局 响应式布局)
该页面适用于 PC端 和 移动端,在响应式开发的媒体查询下能够适配所有屏幕. 一.页面效果 二.结构样式说明 (需引入bootstrap 相关样式文件) 结构分为八块: 头部块:.wjs_header ...
最新文章
- 3D视觉技术的6个问答
- 5.1.5 IO核心子系统
- 考研【财经方向专场讲座】
- 如何为Myeclipse手工添加dtd支持
- java reader_Java Reader reset()方法与示例
- Quartz实现定时功能 job.xml文件的配置
- python中与label类似的控件是_python中tkinter的使用(控件整理)(一)
- win7系统电脑语言栏怎么更换输入法
- 台式电脑设置同时访问内外网
- [Excel]Excel函数和用法(10)——数组公式的使用方法与隔列求和
- 百度语音识别结合云知声离线TTSDemo(AS)
- android平台的开源框架的思考
- 复合函数求导经典例题_【2017年整理】多元函数求导经典例题.ppt
- 1080P、2k、4k、帧、fps等概念区别
- 【100%通过率】华为OD机试真题 Java 实现【完美走位】【2022.11 Q4新题】
- 实时时钟(RTC)的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告
- MinGW最新版本下载
- 文献阅读---玉米干旱响应和耐受性基因表达的调控变异定位
- 贝叶斯决策理论和概率密度估计方法
- python实现堆排序用类的方法_GitHub - lil-q/sorting-algorithm-python: 十种排序算法的python实现及复杂度分析...