响应式reactive是Java中高效应用的下一个前沿,但它目前主要有两个障碍:数据访问和网络。RSocket是一种新的第7层语言无关的应用网络协议(解决后者),它由Facebook,Netifi和Pivotal等工程师开发,提供Java,JavaScript,C ++和Kotlin等实现,RSocket与Servlet并不是同类的产品。

RSocket

RSocket RSocket是一个二进制的协议,以异步消息的方式提供4种对等的交互模型,以字节流的方式运行在TCP, WebSockets, Aeron等传输层之上。RSocket专门设计用于与Reactive风格应用配合使用,这些应用程序基本上是非阻塞的,并且通常(但不总是)与异步行为配对。它是传输无关的,支持 TCP、WebSocket和Aeron UDP协议,并支持无语义损失的混合传输协议——回压和流量控制仍然有效。

它还支持连接恢复。当你建立 RSocket 连接时,你可以指定前一个连接的 ID,如果流仍然在服务器的内存中,则你可以继续消费你的流。

Payload就是前面说到基于消息通讯,那就是拿到消息返回消息。对于一个消息来说,由两部分组成,原信息(metadata)和数据(data)。Mono和Flux是用来处理异步的关键字,这是Reactive编程要求。

public interface RSocket extends Availability, Closeable {/** * 推送元信息,数据可以自己定*/Mono<Void> metadataPush(Payload payload);/**请求/响应* 当你发送一个请求并接收一个响应时,该协议也比 HTTP 更具优势,因为它是异步且多路复用的*/Mono<Payload> requestResponse(Payload payload);/**即发即忘* 请求/响应的优化,在不需要响应时非常有用,比如用于非关键事件的日志记录*/Mono<Void> fireAndForget(Payload payload);/**请求/流* 类似于返回集合的请求/响应,集合将以流的方式返回,而不是等到查询完成,例如,发送一个银行帐号,使用一个实时的帐户事务流进行响应*/Flux<Payload> requestStream(Payload payload);/**通道* 允许任意交互模型的双向消息流*/Flux<Payload> requestChannel(Publisher<Payload> payloads);/**健康度检查* double值可以作为权重,如1.0表示处理能力非常好,0.8一般*/default double availability() {return isDisposed() ? 0.0 : 1.0;}
}

RSocket vs Servlet

Servlet是一套Java的API规范,基于HTTP协议之上。主要功能提供HTTP服务的class,就是通过HTTP Request,处理后,最终调用HTTP Response完成输出!

public abstract class HttpServlet extends Servlet {protected abstract void doGet(HttpServletRequest request,HttpServletResponse response)  throws ServletException, IOException;protected abstract void doPost(HttpServletRequest request,HttpServletResponse response)  throws ServletException, IOException;
}

协议层

Servlet 基于HTTP协议的,HTTP并非非常简单,1.1,2.0版本开始是有点复杂的
RSocket 自定义二进制协议,RSocket定位高性能通讯,比HTTP高非常多(号称10倍)

通讯模式

Servlet 都是request/response模式,所以也叫做 request command,其他例如流式推送、fireAndForget和双向通讯,Servlet2.0都不支持,但是这些指令都是为浏览器设计的,并非为服务通讯设计的
RSocket 对等通讯,不再介于传统的理解是Client -> Server模式,RSocket没有这个概念,大家的地位是对等的,都可以在server端,我调用你的服务,你也可以调用我的服务

message

Servlet HTTP1.1是基于文本的通讯,2.0是基于message的(二进制),基于message的好处是异步化。message都必须有一个ID,这个消息发送出去后,就不用等立即返回,可以继续发其他message,收到message后,再根据返回的message ID和之前的发出去的message ID进行匹配。
RSocket 基于message

RSocket && dubbo

Dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 对响应式编程提供了支持,用户可以非常方便的使用RSocket的语法。使用实例可以参阅官方,待正式版发布后,接触RSocket的机会也会越来越多。

RSocket &&  Spring

随着Spring Cloud的推出,Spring Framework 5.2 即将要把RSocket作为缺省的通讯协议,springBoot中提供相应支持。

RSocket && 微服务

RSocket的主要障碍是应用程序之间必须要用RSocket通讯。微服务普及后,其为了“简化”微服务之间的通讯,引入了很多层的技术栈。这当然是好事,但是很多的决定是由于收到上一代的通讯协议的技术所限制。

示例

spring-boot-starter-rsocket其实也已经封装好了,使用起来比下面例子更加简单方便,感觉离rpc更近了一步

public final class ChannelEchoClient {static final Payload payload1 = ByteBufPayload.create("Hello ");public static void main(String[] args) {RSocketFactory.receive().frameDecoder(PayloadDecoder.ZERO_COPY).acceptor(new SocketAcceptorImpl()).transport(LocalServerTransport.create("localhost")).start().subscribe();RSocket socket =RSocketFactory.connect().keepAliveAckTimeout(Duration.ofMinutes(10)).frameDecoder(PayloadDecoder.ZERO_COPY).transport(LocalClientTransport.create("localhost")).start().block();Flux.range(0, 100000000).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast();}private static class SocketAcceptorImpl implements SocketAcceptor {@Overridepublic Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {return Mono.just(new AbstractRSocket() {@Overridepublic Mono<Void> fireAndForget(Payload payload) {//System.out.println(payload.getDataUtf8());payload.release();return Mono.empty();}@Overridepublic Mono<Payload> requestResponse(Payload payload) {return Mono.just(payload);}@Overridepublic Flux<Payload> requestChannel(Publisher<Payload> payloads) {return Flux.from(payloads).subscribeOn(Schedulers.single());}});}}
}
//request/response
public final class HelloWorldClient {public static void main(String[] args) {RSocketFactory.receive().acceptor((setupPayload, reactiveSocket) ->Mono.just(new AbstractRSocket() {boolean fail = true;@Overridepublic Mono<Payload> requestResponse(Payload p) {if (fail) {fail = false;return Mono.error(new Throwable());} else {return Mono.just(p);}}})).transport(TcpServerTransport.create("localhost", 7000)).start().subscribe();RSocket socket =RSocketFactory.connect().transport(TcpClientTransport.create("localhost", 7000)).start().block();socket.requestResponse(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).onErrorReturn("error").doOnNext(System.out::println).block();socket.requestResponse(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).onErrorReturn("error").doOnNext(System.out::println).block();socket.requestResponse(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).onErrorReturn("error").doOnNext(System.out::println).block();socket.dispose();}
}
//request/stream
public final class StreamingClient {public static void main(String[] args) {RSocketFactory.receive().acceptor(new SocketAcceptorImpl()).transport(TcpServerTransport.create("localhost", 7000)).start().subscribe();RSocket socket =RSocketFactory.connect().transport(TcpClientTransport.create("localhost", 7000)).start().block();socket.requestStream(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).doOnNext(System.out::println).take(10).then().doFinally(signalType -> socket.dispose()).then().block();}private static class SocketAcceptorImpl implements SocketAcceptor {@Overridepublic Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {return Mono.just(new AbstractRSocket() {@Overridepublic Flux<Payload> requestStream(Payload payload) {return Flux.interval(Duration.ofMillis(100)).map(aLong -> DefaultPayload.create("Interval: " + aLong));}});}}
}

参阅资料

  • Introduction to RSocket

REST的最大限制是它与HTTP相关联,经常使用REST的原因是它易于调试,因为它是“人类可读”。开源RSocket专为服务而设计。它是一种面向连接的消息驱动协议,在应用程序级别具有内置流控制。它既可以在浏览器中同样使用,也可以在服务器上使用。这意味着您可以流式传输数据或执行Pub / Sub而无需设置应用程序队列。在Facebook,RSocket用于名为LiveServer的服务,该服务负责响应可被视为GraphQL订阅的实时查询。

响应式编程之网络新约:RSocket相关推荐

  1. 浅谈RSocket与响应式编程

    简介: RSocket是高效一个二进制的网络通讯协议,能够满足很多场景下使用.另外,RSocket也是一个激进的响应式捍卫者,激进到连API都跟响应式无缝集成.本文我们将和大家分享RSocket与响应 ...

  2. 赠书:响应式编程到底是什么?

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 最近几年,随着Go.Node 等新语言.新技术的出现,J ...

  3. Android什么是函数,什么是函数响应式编程(JavaAndroid版本)

    什么是函数响应式编程(Java&Android版本) 函数响应式编程(FRP)为解决现代编程问题提供了全新的视角.一旦理解它,可以极大地简化你的项目,特别是处理嵌套回调的异步事件,复杂的列表过 ...

  4. 响应式编程优点 有效_Reactive(响应式)编程

    Reactor 和Rxjava是Reactive Programming范例的一个具体实现,可以概括为: 反应式编程是一种涉及数据流和变化传播的异步编程范例.这意味着可以通过所采用的编程语言轻松地表达 ...

  5. Java的HTTP服务端响应式编程

    传统的Servlet模型走到了尽头 传统的Java服务器编程遵循的是J2EE的Servlet规范,是一种基于线程的模型:每一次http请求都由一个线程来处理. 线程模型的缺陷在于,每一条线程都要自行处 ...

  6. 响应式编程优点 有效_什么是响应式编程?

    响应式编程是一种通过异步和数据流来构建事物关系的编程模型.这里每个词都很重要,"事物的关系"是响应式编程的核心理念,"数据流"和"异步"是实 ...

  7. 阿里专家杜万:Java响应式编程,一文全面解读

    本篇文章来自于2018年12月22日举办的<阿里云栖开发者沙龙-Java技术专场>,杜万专家是该专场第四位演讲的嘉宾,本篇文章是根据杜万专家在<阿里云栖开发者沙龙-Java技术专场& ...

  8. Reactive(1) 从响应式编程到好莱坞

    目录 概念 面向流设计 异步化 响应式宣言 参考文档 概念 Reactive Programming(响应式编程)已经不是一个新东西了. 关于 Reactive 其实是一个泛化的概念,由于很抽象,一些 ...

  9. 【响应式编程的思维艺术】 (5)Angular中Rxjs的应用示例

    [摘要] Rxjs在angular中的基本应用 本文是[Rxjs 响应式编程-第四章 构建完整的Web应用程序]这篇文章的学习笔记. 示例代码托管在:http://www.github.com/das ...

最新文章

  1. QTreeWidgetItem和QTreeWidgetItemIterator
  2. AMD Developer Center有关ATI Stream的内容
  3. Hi3516A开发--RTC电路
  4. k8s argo workflow获取登录token的命令
  5. 管道过滤器(Pipe-And-Filter)模式
  6. 收藏 | 综述:目标检测二十年
  7. Linux下安装composer报错 The openssl extension is missing / The zlib extension is not loaded等等
  8. 继英伟达、三星后,育碧也遭攻击,员工密码重置
  9. 使用“ for”循环遍历字典
  10. diamond专题(一)– 简介和快速使用
  11. 固高GTS控制卡功能介绍2:NewWatch功能
  12. java实现网络下载进度_Retrofit+Rxjava下载文件进度的实现
  13. 4款好用流程图软件,都是经验总结出来的
  14. 【ReID】Pyramidal Person Re-IDentification via Multi-Loss Dynamic Training
  15. 用Hive、Impala查询Hbase数据
  16. Windows Workflow Foundation中实现人工活动的demo,按照XPDL规范的实现
  17. CreateProcess error=193, %1 不是有效的 Win32 应用程序
  18. C++ 定时每天十二点做某事
  19. 单词拆分(动态规划)
  20. 计算机系统 ahci模式,打开ahci模式后需要重装系统吗

热门文章

  1. 攻击方式 ---- SSH暴力破解
  2. STL 的 std::set 创建自定义结构体的对象,定义严格弱序的比较函数
  3. TiDB v5.4.0 与 v6.0.0 的 sysbench 性能对比
  4. CentOS 安装Httpie
  5. 通过sql实现模糊搜索按匹配度从高到低排序
  6. 萝卜家园 Ghost XP 新春装机版 V200801
  7. 竞争情报分析工具Alexa
  8. 学习一样新东西行而有效的方法
  9. 开关电源的缓启动Soft Start
  10. 月薪40k+测试·开发同步认可的FastAPI:Python 世界里最受欢迎的异步框架