目录

  • RSocket简介
  • Reactor Core
  • Java服务端
    • Maven依赖
    • 创建RSocket服务端
    • 服务端对四种交互方式的处理
      • fireAndForget
      • requestResponse
      • requestStream
      • requestChannel
      • wrapMsg
  • JS客户端
    • 创建项目 & 安装依赖
    • 连接服务端
    • Flowable API
    • 四种交互方式
      • fireAndForget
      • requestResponse
      • requestStream
      • requestChannel

RSocket简介

看这里

Reactor Core

关于Spring的Reactor项目,可以参考这里学习

Java服务端

Maven依赖

 <!-- 基于Netty的RSocket --><dependency><groupId>io.rsocket</groupId><artifactId>rsocket-core</artifactId><version>1.0.1</version></dependency><dependency><groupId>io.rsocket</groupId><artifactId>rsocket-transport-netty</artifactId><version>1.0.1</version></dependency>

创建RSocket服务端

共三步:

  1. 创建基于WebSocket的传输层transport;
// 创建一个Http服务,WebSocket协议依赖Http协议
HttpServer server = HttpServer.create().host("localhost").port(8009);// 实例化RSocket的transport,基于WebSocket实现
WebsocketServerTransport transport = WebsocketServerTransport.create(server);
  1. 创建一个用于实现四种交互方式的RSocket实现类
    private static class RSocketImpl implements RSocket {@Overridepublic Mono<Void> fireAndForget(Payload payload) {...}@Overridepublic Mono<Payload> requestResponse(Payload payload) {...}@Overridepublic Flux<Payload> requestStream(Payload payload) {...}@Overridepublic Flux<Payload> requestChannel(Publisher<Payload> payloads) {...}}
  1. 创建RSocket服务端,并用RSocket实现类接收rsocket连接请求
CloseableChannel channel = RSocketServer.create().acceptor((setup, rSocket) -> {// rSocket 是客户端的RSocket// 建立连接时会进入这里,在此处进行身份校验// 这里就简单的校验一下metadata的值是否为123456,生产中应该将鉴权信息解析出来并校验有效性String meta = setup.getMetadataUtf8();if (meta.equalsIgnoreCase("123456")) {// 校验成功时,需要返回一个RSocket实例,用于处理客户端的四种请求System.out.println("客户端连接建立");return Mono.just(new RSocketImpl());} else {// 校验失败时,返回错误信息,此时会触发客户端的onError,RSocket连接也没有成功建立System.out.println("密码错误");return Mono.error(new IllegalArgumentException("密码错误"));}})// 绑定到传输层transport,可以选择不同的transport实现,底层的传输协议可以不同(WebSocket\TCP\Aero).bind(transport).block();
System.out.println("server启动" + channel.address());

上面在接收新的客户端连接请求时,进行了鉴权操作,比较简单实现,生产环境中可以考虑使用用户名密码、AK\SK、JWT等携带鉴权信息进行校验。

服务端对四种交互方式的处理

在服务端想要处理一个客户端的四种交互请求,只需要在RSocket实现类中处理即可,这里是在RSocketImpl类中。

fireAndForget

@Override
publicMono<Void> fireAndForget(Payload payload) {System.out.println("[fireAndForget]Client:" + payload.getDataUtf8());return Mono.empty();
}

这个模式下,客户端发送消息后不关心服务端的返回,因此服务端在接收到数据后返回一个empty就可以了。请求一次。

requestResponse

@Override
public Mono<Payload> requestResponse(Payload payload) {System.out.println("[requestResponse]Client:" + payload.getDataUtf8());System.out.println("[requestResponse]Server:好的,多喝热水");return Mono.just(DefaultPayload.create(wrapMsg("好的,多喝热水")));
}

requestResponse类似Http协议的请求与响应,客户端的一次请求服务端需要有一个响应。因为只需要一个响应,所以返回一个Mono即可。请求一次,返回一次。

requestStream

@Override
public Flux<Payload> requestStream(Payload payload) {System.out.println("[requestStream]Client:" + payload.getDataUtf8());return Flux.range(0, 4).map(i -> {if (i == 0) {return "重要的事情说三遍";} else {return "多喝热水 * " + i;}}).doOnNext(str -> {System.out.println("[requestStream]Server:" + str);}).map(RSServer::wrapMsg).map(DefaultPayload::create);
}

requestStream模式下,客户端请求的是一系列数据,服务端会返回多个数据给客户端,因此方法返回的是一个Flux。Flux代表的事件源每发射一个事件,就会向客户端发送一个消息(事件)。请求一次,返回多次。

requestChannel

requestChannel模式得到一个双工通道,客户端和服务端均可以向通道中发送任意数量的消息,直到有一方终止通道。请求多次,返回多次。
在RSocket的服务端中,双工通道用一个客户端事件源Publisher和一个服务端事件源Flux来体现。当客户端或服务端事件源有一方调用了onComplete或onError时,通道会被关闭。
这里我验证双工通道的方法是将服务端写成一个echo服务,每次收到客户端消息的时候都返回一个数据。先看代码

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {// 这里接收到的第一个payload是申请channel时传递的payloadFlux<Payload> clientPayloadFlux = Flux.from(payloads);// 返回值的Flux是返回给客户端的payload,不断的异步发射payload来实现向客户端发送数据// 要实现在接收到客户端消息的时候,返回给客户端一个消息,首先需要订阅客户端的flux,然后再发射出去// 这里写了一个EchoController来实现EchoController controller = new EchoController();clientPayloadFlux.subscribe(controller);return Flux.from(controller);
}

这个方法里,入参payloads就是客户端的事件源Publisher,为了方便操作,一般也把它封装为Flux。为了得到客户端发送过来的数据,需要订阅clientPayloadFlux;为了发送数据给客户端,需要创建一个Publisher事件源来发射数据。在这里我创建了一个EchoController类,它既是订阅客户端Flux的Subscriber,也是发射事件给客户端的Publisher,声明如下。

class EchoController extends BaseSubscriber<Payload> implements Publisher<Payload> {// 用于统计从客户端收到了多少个消息private AtomicInteger msgCounter = new AtomicInteger(0);private Subscriber<? super Payload> subscriber;@Overridepublic void subscribe(Subscriber<? super Payload> s) {this.subscriber = s;}@Overrideprotected void hookOnNext(Payload payload) {// 在这个钩子函数中,处理客户端发送过来的数据String data = payload.getDataUtf8();System.out.println("[requestChannel]Client:" + data);// 处理完客户端发送的数据,再返回给客户端一个payload// 通过Subscriber来发射,但如果是第一个payload进来,这个时候publisher还没被订阅,subscriber为空,不能发射数据if (this.subscriber != null) {String msg = "知道了,你多喝热水(" + msgCounter.incrementAndGet() + ")";System.out.println("[requestChannel]Server:" + msg);this.subscriber.onNext(DefaultPayload.create(wrapMsg(msg)));}}@Overrideprotected void hookOnError(Throwable t) {System.out.println("[requestChannel]Client发送了一个异常");t.printStackTrace();}@Overrideprotected void hookOnComplete() {System.out.println("[requestChannel]Client结束了这个channel");}
}

EchoController通过继承BaseSubscriber来实现订阅者的功能。
BaseSubscriber中有hookOnXXX方法,重写这些方法即可实现Subscriber中onXXX方法同样的功能,而且子类实现的hookOnNext和hookOnComplete中如果抛出异常,会被自动转交给hookOnError处理,比较方便。继承BaseSubscriber相比较直接实现Subscriber的其他好处,待研究。

EchoController实现Publisher的subscribe方法,来将Subscriber存储下来,并在接收到客户端数据时,通过存储的Subscriber来发射数据给客户端。

wrapMsg

因为JS客户端使用JSON来格式化Payload,所以发送数据给客户端时,要发送一个JSON字符串,这个方法用来将字符串消息包装为JSON串。

private static String wrapMsg(String msg) {Map<String, String> data = new HashMap<>();data.put("msg", msg);return JacksonUtil.toJSONString(data);
}

JS客户端

RSocket的js实现有两种,一个是在Node环境中使用的基于TCP协议实现,另一个是在浏览器环境中使用的基于WebSocket实现,我们使用浏览器的实现。

创建项目 & 安装依赖

首先新建一个Vue项目,通过vue-cli创建(请自行百度)完成后,安装rsocket的npm依赖,共有两个。

npm install --save rsocket-core
npm install --save rsocket-websocket-client

连接服务端

在任意一个组件中可以开始创建RSocket客户端连接,我在App.vue组件的created函数中建立连接。

// 创建一个RSocket客户端
window.rsclient = new RSocketClient({// payload数据使用json序列化serializers: JsonSerializers,setup: {keepAlive: 60000,lifetime: 180000,dataMimeType: 'application/json',metadataMimieType: 'application/json',},// 在浏览器端,需要使用RSocketWebSocketClient作为传输协议transport: new RSocketWebSocketClient({ url: 'ws://localhost:8009' }),
})// 连接服务端
window.rsclient.connect();

这里创建连接会报错,提示REJECTED_SETUP密码错误,这是因为Java服务端在建立连接时校验了setup payload中的密码,而我在建立连接的时候并没传递setup payload,修改如下即可成功建立连接。
另外,为了方便测试,在连接服务端成功后,将rsocket实例赋值给全局变量引用。

// 创建一个RSocket客户端
window.rsclient = new RSocketClient({// payload数据使用json序列化serializers: JsonSerializers,setup: {keepAlive: 60000,lifetime: 180000,dataMimeType: 'application/json',metadataMimieType: 'application/json',payload: {metadata: 123456,}},// 在浏览器端,需要使用RSocketWebSocketClient作为传输协议transport: new RSocketWebSocketClient({ url: 'ws://localhost:8009' }),
});//连接服务端
window.rsclient.connect().then(rsocket => {console.info('成功连接到服务端');window.rsocket = rsocket;
});

Flowable API

RSocketJS并没有依赖reactor-core-js,而是自己实现了两个类Flowable、Single来实现类似RxJS的操作,分别对应reactor中的Flux和Mono。
Flowable是发射一系列事件的事件源;Single是只发射一个事件,而且直接会触发onComplete操作符。
使用方式与Flux和Mono类似,订阅即可。

flowable.subscribe({onNext: payload => {},onComplete: () => {},onError: error => {},onSubscribe: subscription => {subscription.request(0x7fffffff);// subscription.cancel();}
});
single.subscribe({onComplete: payload => {},onError: error => {},onSubscribe: cancel => {// 调用cancel方法来停止触发onComplete或onError  cancel();}
});

四种交互方式

fireAndForget

window.rsFireAndForget = (msg) => {// fireAndForget模式下,客户端将消息发送出去之后,不关心服务端的返回window.rsocket.fireAndForget({data: msg});
};window.rsFireAndForget('hello, faf');

连接服务端成功后,执行上面代码,可以在服务端看到控制台有如下输出

客户端连接建立
[fireAndForget]Client:"hello, faf"

requestResponse

window.rsRequestResponse = (msg) => {// requestResponse模式下,服务端会返回一个payload,在RSocketJS中用Single表示const single = window.rsocket.requestResponse({data: msg});single.subscribe({onComplete: payload => {console.info(`[requestResponse]${payload.data.msg}`)}});
};window.rsRequestResponse('requestAndResponse');
客户端连接建立
[requestResponse]Client:"requestAndResponse"
[requestResponse]Server:好的,多喝热水

requestStream

window.rsRequestStream = msg => {window.rsocket.requestStream({data: msg}).subscribe({onNext: payload => {console.info(`[requestStream]${payload.data.msg}`);},onComplete: () => {console.info('[requestStream][complete]')},onError: () => {debugger},// requestStream时,数据是懒发送的,只有在返回的Flowable被订阅,并且被request(n)请求数据之后才发送// 因此这里还需要request一下onSubscribe: s => {s.request(INT32_MAX);// 0x7fffffff}});
};window.rsRequestStream('hello, stream');

这里有一个坑,开始的时候,订阅flowable时没有写onSubscribe,也没有去request,导致不论怎么样都没有发送requestStream的数据包。后来查官方文档,requestStream接口有这样一句:

大致意思是,客户端请求requestStream的数据包是懒发送到服务端的,也就是并没有真正的发送,只有在返回的Flowable被订阅,且调用了request方法请求数据才表明需要请求交互,才会发送数据包。而且request的时候,参数n的最大值是32位有符号整型的最大值,也就是0x7fffffff,超过这个值会报错。
按照上面的方式发起请求后,服务端控制台可见如下输出:

客户端连接建立
[requestStream]Client:"hello, stream"
[requestStream]Server:重要的事情说三遍
[requestStream]Server:多喝热水 * 1
[requestStream]Server:多喝热水 * 2
[requestStream]Server:多喝热水 * 3

requestChannel

requestChannel得到一个双工通道,客户端与服务端均可以发送消息,我在获取到channel之后赋值给全局变量,这样方便在控制台测试输入。

window.rsRequestChannel = msg => {if (window.rsChannel) {return;}window.rsChannel = {};// 创建一个Flowable用于发射数据给服务端window.rsChannel.flowable = new Flowable(subscriber => {// subscriber用与发射数据window.rsChannel.subscriber = subscriber;// availableRequest表示服务端能接受多少个数据window.rsChannel.availableRequest = 0;// 发射数据,判断了服务端能不能接受新的数据,如果不能就丢弃,如果能就通过subscriber发射window.rsChannel.onNext = msg => {if (window.rsChannel.availableRequest > 0) {window.rsChannel.subscriber.onNext({data: msg});window.rsChannel.availableRequest--;}};// 当这个Flowable被订阅的时候,传递一个subscription,用于处理订阅者的取消订阅或请求数据操作subscriber.onSubscribe({cancel: () => {// 当订阅者取消订阅时,设置不能再发送数据// 一般是服务端发生异常主动断开连接或挂掉时window.rsChannel = null;},request: n => {// 当服务端准备好接受数据,会调用该方法提示能接受的数据数量nwindow.rsChannel.availableRequest += n;console.info('[requestChannel]服务端已就绪 ' + n);if (n === 1) {// 第一次就绪时发射第一个数据window.rsChannel.subscriber.onNext({data: msg});window.rsChannel.availableRequest--;}}})});window.rsocket.requestChannel(window.rsChannel.flowable).subscribe({onNext: payload => {console.info(`[requestChannel]${payload.data.msg}`);},onComplete: () => {window.rsChannel = null;console.info('[requestChannel][complete]')},onError: () => {debugger},onSubscribe: s => {console.info('[requestChannel]客户端已就绪');s.request(INT32_MAX);}});
}

和requestStream一样,返回Flowable时也是懒发送数据的,需要订阅返回的Flowable并request(n)。

上面创建客户端发送数据的Flowable时,考虑到服务端请求的数据数量可能不多,做了背压处理,如果觉得不需要,可以将创建Flowable的代码改成下面这样:

// 创建一个Flowable用于发射数据给服务端
window.rsChannel.flowable = new Flowable(subscriber => {// subscriber用与发射数据window.rsChannel.subscriber = subscriber;window.rsChannel.onNext = msg => {window.rsChannel.subscriber.onNext({data: msg});};// 当这个Flowable被订阅的时候,传递一个subscription,用于处理订阅者的取消订阅或请求数据操作subscriber.onSubscribe({cancel: () => {// 当订阅者取消订阅时,设置不能再发送数据// 一般是服务端发生异常主动断开连接或挂掉时window.rsChannel = null;},request: n => {// 当服务端准备好接受数据,会调用该方法提示能接受的数据数量nconsole.info('[requestChannel]服务端已就绪 ' + n);// 第一次就绪时发射第一个数据if (n === 1) {window.rsChannel.subscriber.onNext({data: msg});}}})
});

在控制台进行如下输入,可在服务端控制台看到对应的输出:

基于RSocket的Java与浏览器JS通信相关推荐

  1. 基于Topic消息路由的M2M设备间通信Node JS SDK 示例

    概述 M2M(即Machine-to-Machine)是一种端对端通信技术.本章节以Node JS SDK为例,使用基于Topic消息路由的M2M设备间通信,主要介绍如何基于物联网平台构建一个M2M设 ...

  2. 基于RSocket协议实现客户端与服务端通信

    RSocket基础开发demo package com.pshdhx.rsocket;import io.rsocket.Payload; import io.rsocket.RSocket; imp ...

  3. 基于Java NIO的Socket通信

    基于Java NIO的Socket通信 Java NIO模式的Socket通信,是一种同步非阻塞IO设计模式,它为Reactor模式实现提供了基础. 下面看看,Java实现的一个服务端和客户端通信的例 ...

  4. 基于Java的TCP Socket通信详解(计算机端/Android手机端)

    TCP Socket通信是一种比较常用的基于连接的网络通信方式.本文通过Java实现TCP Socket通信,并将其用于计算机端.Android手机端,同时做到代码规范化,实现代码最大化复用. 本文代 ...

  5. 基于asp.net + easyui框架,js实现上传图片之前判断图片格式,同时实现预览,兼容各种浏览器+下载...

    2019独角兽企业重金招聘Python工程师标准>>> 最近在做图片上传的一个前台页面,上传图片功能虽然很简单,但是需要我们学习的地方很多.在上传图片之前验证图片的格式,并同时实现预 ...

  6. 基于javaweb的高校运动会管理系统(java+ssm+jsp+js+jquery+mysql)

    基于javaweb的高校运动会管理系统(java+ssm+jsp+js+jquery+mysql) 运行环境 Java≥8.MySQL≥5.7.Tomcat≥8 开发工具 eclipse/idea/m ...

  7. 基于javaweb的社区居民户籍管理系统(java+ssm+jsp+js+html+mysql)

    基于javaweb的社区居民户籍管理系统(java+ssm+jsp+js+html+mysql) 运行环境 Java≥8.MySQL≥5.7.Tomcat≥8 开发工具 eclipse/idea/my ...

  8. 基于javaweb的律师事务所律师管理系统(java+ssm+html+js+jsp+mysql)

    基于javaweb的律师事务所律师管理系统(java+ssm+html+js+jsp+mysql) 运行环境 Java≥8.MySQL≥5.7.Tomcat≥8 开发工具 eclipse/idea/m ...

  9. 基于javaweb的药品进货销售管理系统(java+ssm+html+js+jsp+mysql)

    基于javaweb的药品进货销售管理系统(java+ssm+html+js+jsp+mysql) 运行环境 Java≥8.MySQL≥5.7.Tomcat≥8 开发工具 eclipse/idea/my ...

最新文章

  1. Winmail邮件服务器
  2. 计算机主板上电源怎么插,教大家电脑主板上的电源开关插头怎么接
  3. MyBatis开发步骤
  4. instanceof的用法①
  5. AOP和IOC个人理解
  6. PyTorch 深度学习:32分钟快速入门——DenseNet
  7. 为什么要用Vue.js的组件化开发
  8. python3-字符串常用操作
  9. unity 中画布随相机视野实时变化
  10. 已知两点坐标求水平距离_知道两个点的坐标X,Y,如何计算出两点间的距离以及角度,公式是什么...
  11. 旺旺怎么去服务器接收文件夹,xp系统下找到阿里旺旺安装路径文件夹的方法
  12. 8个免费、高质量PPT素材网站,值得收藏
  13. html字体随页面大小变化,字体大小随网页大小变化
  14. 腾讯android一键root工具,腾讯一键root手机版
  15. 一个善意的谎言拯救一个团队 (又叫沙漠中的指南针)
  16. 如何在printf中输出,特殊字符(如:%、\、““)或表示八进制012、十六进制0xc
  17. 送给1987年左右的朋友,看完是不是有些泪水
  18. 海胆状聚苯乙烯与α-氧化铁复合结构微球/聚苯乙烯/氧化石墨烯/CNTs复合微球研究方式
  19. [Error Code: 904, SQL State: 42000] ORA-00904 : 标识符无效
  20. oracle-ora 各种sql异常描述

热门文章

  1. 5大常见SCI投稿系统-Editorial Manager最全分步指导说明
  2. 艾伟:一次挂死(hang)的处理过程及经验
  3. 看完通辽可汗小约翰之后应该掌握的英语词汇 01 外交类
  4. 最短路问题 Bellman-Ford(单源最短路径)(图解)
  5. hinton深度学习nature_【深度学习】卷积神经网络之父LeCun:关于深度学习必须知道的传奇人物...
  6. 领扣算法:234 回文链表
  7. mysql二进制文件转文本文件_使用mysqlbinlog把mysql二进制文件转换文本
  8. 如何配置Java和tomcat环境变量
  9. 上海交大计算机科学技术导师介绍,上海交通大学
  10. Ubuntu下系统重启dns就被清空的解决方案