目录

  • 一、服务端配置
    • 1、引入依赖包
    • 2、编写Config类进行websocket的配置
    • 3、编写拦截器WebSocketInterceptor
    • 4、编写管理在线用户的WebSocketManager
    • 5、编写消息的接收和发送的Controller
  • 二、客户端配置
    • 1、引入SockJS以及stompjs
    • 2、编写公用的websocket.js供其他地方调用
    • 3、业务代码中引用websocket进行消息发送和接收
  • 三、Spring cloud gateway网关配置
    • 1、修改gateway网关的配置文件
    • 2、在网关中添加全局过滤器
  • 四、前端代理的配置
  • 五、问题记录

一、服务端配置

整个服务端是基于ruoyi的微服务版本做的。

1、引入依赖包

首先引入websocket的maven依赖,版本号自行修改。

         <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>${spring-boot.version}</version></dependency>

2、编写Config类进行websocket的配置

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.*;//头部加注解EnableWebSocketMessageBroker,允许使用Stomp方式。
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer {@Autowiredprivate WebSocketInterceptor authChannelInterceptor;@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {//允许原生的websocket,如果只允许源生的websocket,用这段代码即可//registry.addEndpoint("/ws")//      .setAllowedOrigins("*");//允许跨域//请求地址:ws://ip:port/wsSockJsServiceRegistration registration = registry.addEndpoint("/ws").setAllowedOrigins("*")//允许跨域.withSockJS();//允许sockJS//下面注解的代码主要用于客户端不支持websocket的情况下,SockJS降级使用xhr-stream或者pjson等等传输方式的时候使用。//registration.setClientLibraryUrl("//cdn.jsdelivr.net/npm/sockjs-client@1.5.2/dist/sockjs.min.js");}/*** 注册相关的消息频道** @param config*/@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {//设置两个频道,topic用于广播,queue用于点对点发送config.enableSimpleBroker("/topic/", "/queue/");//设置应用目的地前缀config.setApplicationDestinationPrefixes("/app");//设置用户目的地前缀config.setUserDestinationPrefix("/user");}/*** 加入拦截器主要是为了验证权限的** @param registration*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {registration.interceptors(authChannelInterceptor);}//这个是为了解决和调度任务的冲突重写的bean@Primary@Beanpublic TaskScheduler taskScheduler(){ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(10);taskScheduler.initialize();return taskScheduler;}
}

3、编写拦截器WebSocketInterceptor

拦截器主要是处理权限用的,防止没有获得权限的用户访问到服务器。


import com.ruoyi.common.core.constant.CacheConstants;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.redis.service.TokenRedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;import java.security.Principal;
import java.util.List;
import java.util.Map;@Component
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class WebSocketInterceptor implements ChannelInterceptor {@Autowiredprivate WebSocketManager webSocketManager;@Autowiredprivate TokenRedisService tokenRedisService;/*** 连接前监听** @param message* @param channel* @return*/@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);//1、判断是否首次连接if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {//2、判断tokenList<String> nativeHeader = accessor.getNativeHeader(CacheConstants.HEADER);if (nativeHeader != null && !nativeHeader.isEmpty()) {String token = nativeHeader.get(0);if (StringUtils.isNotBlank(token)) {Map<String,String> pass =  tokenRedisService.validation(token);if("pass".equals(pass.get("result"))){Principal principal = new Principal() {@Overridepublic String getName() {return pass.get("username")+"_"+ accessor.getSessionId();}};accessor.setUser(principal);webSocketManager.addUser(principal.getName());return message;}}}return null;}//不是首次连接,已经登陆成功return message;}// 在消息发送后立刻调用,boolean值参数表示该调用的返回值@Overridepublic void postSend(Message<?> message, MessageChannel messageChannel, boolean b) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);Principal principal = accessor.getUser();// 忽略心跳消息等非STOMP消息if(accessor.getCommand() == null){return;}switch (accessor.getCommand()){// 首次连接case CONNECT:break;// 连接中case CONNECTED:break;// 下线case DISCONNECT:if(principal!=null){webSocketManager.deleteUser(principal.getName());}break;default:break;}}}

4、编写管理在线用户的WebSocketManager


import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.concurrent.CopyOnWriteArraySet;@Component
public class WebSocketManager {private ThreadPoolTaskScheduler taskScheduler;private Long onlineCount;private CopyOnWriteArraySet<String> onlines;private static final Integer POOL_MIN = 10;@PostConstructpublic void init() {taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(POOL_MIN);taskScheduler.initialize();this.onlines = new CopyOnWriteArraySet<>();this.onlineCount = 0L;}public boolean isOnline(String username) {return onlines.contains(username);}public void addUser(String username) {onlines.add(username);onlineCount = Long.valueOf(onlines.size());}public void deleteUser(String username) {onlines.remove(username);onlineCount = Long.valueOf(onlines.size());}
}

5、编写消息的接收和发送的Controller

此处是编写系统处理前端发送消息的业务代码,大家可以根据自己的项目需求进行更替,这里只编写简单例子供大家参考。


import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.zt.service.ZTRealDataService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;import java.security.Principal;@Controller
public class RealDataWebSocketController {@Autowiredprivate ZTRealDataService ztRealDataService;@Autowiredprivate SimpMessagingTemplate simpMessagingTemplate;//MessageMapping的注解是接收的目的地为app/datapoint的消息处理,只会处理客户端SEND发送的消息。//SendToUser注解是结果以点对点方式发送到目的地user/queue/datavalue@MessageMapping("/datapoint")@SendToUser("/queue/datavalue")public AjaxResult datapoint(Principal principal, @Payload String message) {System.out.println(principal.getName());System.out.println(message);return AjaxResult.success(ztRealDataService.getData(message.split(",")));}//SubscribeMapping的注解是订阅目的地为app/news的消息处理,只会处理客户端SUBSCRIBE发送的消息。//SendTo注解是结果发送到目的地app/topic/news@SubscribeMapping("/news")@SendTo("/topic/news")public String subscribeNews(@Payload String message) {return message;}//接收前端send命令,但是点对点返回@MessageMapping("/realdata")@SendToUser("/queue/realdata")public String realdata(Principal principal, @Payload String message) {System.out.println(principal.getName());System.out.println(message);//可以手动发送,同样有queuesimpMessagingTemplate.convertAndSendToUser(principal.getName(),"/queue/test","111");return "111";}
}

二、客户端配置

1、引入SockJS以及stompjs

安装SockJS客户端以及stompjs。
@stomp/stompjs是最新的版本,当然也可以使用stompjs,写法上略有不同,新版的支持断线重连的机制,所以本文采用了新版方式实现。

npm install sockjs-client
npm install @stomp/stompjs

2、编写公用的websocket.js供其他地方调用

import SockJS from 'sockjs-client';
import {Client} from '@stomp/stompjs';import {getToken} from '@/utils/auth'const socket = () => {//请求的起始地址,根据开发环境变量确定let baseUrl = process.env.VUE_APP_BASE_API;let stompClient = new Client({//可以不赋值,因为后面用SockJS来代替//brokerURL: 'ws://localhost:9527/dev-api/ws/',//获得客户端token的方法,把token放到请求头中connectHeaders: {"Authorization": 'Bearer ' + getToken()},debug: function (str) {//debug日志,调试时候开启console.log(str);},reconnectDelay: 10000,//重连时间heartbeatIncoming: 4000,heartbeatOutgoing: 4000,});// //用SockJS代替brokenURLstompClient.webSocketFactory = function () {//因为服务端监听的是/ws路径下面的请求,所以跟服务端保持一致return new SockJS(baseUrl + '/ws', null, {timeout: 10000});};return {stompClient: stompClient,connect(callback) {//连接stompClient.onConnect = (frame) => {callback(frame);};//错误stompClient.onStompError = function (frame) {console.log('Broker reported error: ' + frame.headers['message']);console.log('Additional details: ' + frame.body);//这里不需要重连了,新版自带重连};//启动stompClient.activate();},close() {if (this.stompClient !== null) {this.stompClient.deactivate()}},//发送消息send(addr, msg) {//添加app的前缀,并发送消息,publish是新版的stomp/stompjs发送api,老版本更改下就可以。this.stompClient.publish({destination: '/app'+addr,body: msg})},//订阅消息subscribe(addr, callback) {this.stompClient.subscribe(addr, (res)=>{//这里进行了JSON类型的转化,因为我的服务端返回的数据都是json,消息本身是string型的,所以进行了转化。var result = JSON.parse(res.body);callback(result);});}}
}
export default socket

3、业务代码中引用websocket进行消息发送和接收

调用封装后的websocket.js,这样业务代码更加的简单清晰,如果有多了连接,多new 几个Websocket对象就可以了。

//在vue中直接引用
import Websocket from '@/utils/websocket'var socket = new Websocket();
socket.connect(() => {//发送消息到app/datapoint,app的前缀我是在websocket里面已经封装,此处不用再添加socket.send("/datapoint", "123123123");//订阅目的地/user/queue/datavalue的消息socket.subscribe("/user/queue/datavalue", (res) => {console.log(res)});}

三、Spring cloud gateway网关配置

在网关中配置转发websocket的服务,因为是微服务架构,所以所有的websocket请求都必须经过网关,必须对网关进行配置,服务端才能正确响应websocket请求(以ws:开头的)。

1、修改gateway网关的配置文件

在网关的配置文件中添加路由信息,/ws路径都转发到刚才配置websocket的服务(ruoyi-zt)中,

      # 模块其他请求- id: ruoyi-zturi: lb://ruoyi-ztpredicates:- Path=/zt/**filters:- StripPrefix=1# 模块的微服务请求- id: ruoyi-zt-websocketuri: lb:ws://ruoyi-ztpredicates:- Path=/ws/**

2、在网关中添加全局过滤器

SockJS 客户端首先发送GET /info从服务器获取基本信息。之后,它必须决定使用什么传输。如果可能,使用 WebSocket。如果没有,在大多数浏览器中,至少有一个 HTTP 流选项。如果不是,则使用 HTTP(长)轮询。按照我们上述网关的配置,网关会将此请求路由成websocket请求,会导致请求的失败,所以我们必须编写一个过滤器,将第一次的这个http请求,从websocket请求还原成http请求。具体代码如下。

SockJS所有传输请求都具有以下 URL 结构:

https://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}
在哪里:

{server-id} 用于在集群中路由请求,但不用于其他用途。

{session-id} 关联属于 SockJS 会话的 HTTP 请求。

{transport}表示传输类型(例如,websocket、xhr-streaming等)。

import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;import java.net.URI;import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;/*** @author: wang.mh* 2019/6/19 17:05*/
@Component
public class WebSocketFilter implements GlobalFilter, Ordered {public final static String DEFAULT_FILTER_PATH = "/ws/info";public final static String DEFAULT_FILTER_WEBSOCKET = "websocket";/**** @param exchange ServerWebExchange是一个HTTP请求-响应交互的契约。提供对HTTP请求和响应的访问,*                 并公开额外的 服务器 端处理相关属性和特性,如请求属性* @param chain* @return*/@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {String upgrade = exchange.getRequest().getHeaders().getUpgrade();URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();//如果不是ws的请求直接通过if (!"ws".equals(scheme) && !"wss".equals(scheme)) {return chain.filter(exchange);//如果是/ws/info的请求,把它还原成http请求。} else if (DEFAULT_FILTER_PATH.equals(requestUrl.getPath())) {String wsScheme = convertWsToHttp(scheme);URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);//如果是sockJS降级后的http请求,把它还原成http请求,也就是地址{transport}不为websocket的所有请求} else if (requestUrl.getPath().indexOf(DEFAULT_FILTER_WEBSOCKET)<0) {String wsScheme = convertWsToHttp(scheme);URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);}return chain.filter(exchange);}@Overridepublic int getOrder() {return Ordered.LOWEST_PRECEDENCE - 2;}static String convertWsToHttp(String scheme) {scheme = scheme.toLowerCase();return "ws".equals(scheme) ? "http" : "wss".equals(scheme) ? "https" : scheme;}
}

四、前端代理的配置

在开发环境中,一般使用前后端分离的方式进行,这样前端的请求都是通过代理的方式访问到服务端,所以我们还要进行代理的设置,本文采用的是vue2.0的开发环境,调整vue.config.js中的代理配置,加上支持ws请求。

proxy: {[process.env.VUE_APP_BASE_API]: {target: `http://127.0.0.1:8080`,changeOrigin: true,ws: true,        //如果要代理 websockets,配置这个参数pathRewrite: {['^' + process.env.VUE_APP_BASE_API]: ''}},}

这样就完成了所有的配置信息。

五、问题记录

  我的开发环境,在按照上述完成整个配置后,发现sockJS客户端以websocket连接后,会迅速断开链接,然后降级成xhr-stream等其他方式进行数据请求,找了好久都没有发现原因。
  最后,发现是SockJS的超时设置有问题,如果采用默认的超时参数,SockJS将计算一个合理的超时时间进行等待,如果等待超时的情况下,会降级成其他方式进行数据请求,估计是计算的超时时间不合理,还没等服务端响应,就发生超时,切换到其他方式传输了。
  这种情况我们手动设置超时时间就可以了。

new SockJS(baseUrl + '/ws', null, {timeout: 10000});

Spring Cloud整合Websocket(SockJs Stomp方式)相关推荐

  1. Elasticsearch学习(3) spring boot整合Elasticsearch的原生方式

    前面我们已经介绍了spring boot整合Elasticsearch的jpa方式,这种方式虽然简便,但是依旧无法解决我们较为复杂的业务,所以原生的实现方式学习能够解决这些问题,而原生的学习方式也是E ...

  2. spring cloud整合OpenFeign

    spring cloud整合OpenFeign pom.xml配置 <!-- https://mvnrepository.com/artifact/org.springframework.clo ...

  3. spring cloud整合Ribbon

    spring cloud整合Ribbon 使用Eureka中自带的Ribbon 如果你使用的是Eureka做服务发现和注册的话,在比较新版本的Eureka中,Eureka已经集成了Ribbon进入Eu ...

  4. Spring cloud整合zookeeper

    Spring cloud整合zookeeper pom.xml依赖 spring cloud整合zookeeper需要依赖spring-cloud-starter-zookeeper-discover ...

  5. eureka集群只注册一个_Spring cloud系列教程第十篇- Spring cloud整合Eureka总结篇

    Spring cloud系列教程第十篇- Spring cloud整合Eureka总结篇 本文主要内容: 1:spring cloud整合Eureka总结 本文是由凯哥(凯哥Java:kagejava ...

  6. Spring Cloud 整合 seata 实现分布式事务极简入门

    Spring Cloud 整合 seata 实现分布式事务极简入门 seata Spring Cloud 整合 seata 实现分布式事务极简入门 1. 概述 2. 部署nacos 3. 部署seat ...

  7. phoenix+hbase+Spark整合,Spark处理数据操作phoenix入hbase,Spring Cloud整合phoenix

    1 版本要求 Spark版本:spark-2.3.0-bin-hadoop2.7 Phoenix版本:apache-phoenix-4.14.1-HBase-1.4-bin HBASE版本:hbase ...

  8. Sentinel实现限流熔断及与Spring Cloud整合

    why 在分布式中,为了保证服务高可用,就必须对请求进行限流或服务降级的方式才能够保证不会被流量拖垮导致雪崩效应, what–什么是sentinel? 它是面向分布式服务架构的轻量级流量控制组件,主要 ...

  9. 解决 Spring Cloud 整合 zipkin 报错:org.springframework.boot.actuate.health.CompositeHealthIndicator......

    文章目录 一.问题描述 二.解决方法 一.问题描述 我的 Spring Boot 版本是 2.3.4,Spring Cloud 版本是 Hoxton.SR1. 要整合 zipkin,先在服务端导入了以 ...

最新文章

  1. iphone objective-c内存管理
  2. python编写自动化脚本 与shell_脚本安装Discuz论坛(shell + Python 实现自动化安装)...
  3. 输入和if else和switch的应用
  4. RHEL环境下调试Shell脚本时遇到字符串转换整数的问题
  5. 实验一 线性表、堆栈和队列的操作与实现
  6. JetBrains —— JetBrains系列IDE优化配置(提高启动和运行速度)
  7. Linux(二) 常用命令和目录结构
  8. 多继承-注意父类之间注意不要有重名方法或属性
  9. windows下dubbo-admin和zookeeper安装部署
  10. android手机最低内存,原神手机端需要哪些配置 手机端最低配置要求介绍
  11. pwn环境搭建_pwndbg、pwntools环境搭建(Unix系统)
  12. 猎豹浏览器打飞机_墙内最好浏览器,微软带来完整版谷歌浏览器,扩展、同步无限制!...
  13. 和大家分享2015年我逐步形成的六个管理认识
  14. vue查询列表中所有用户信息_vue实现全匹配搜索列表内容
  15. Json 与 JS对象的关系与转换
  16. cad批量页码lisp_源代码:批量改页码(加前缀)及提取属性块
  17. 微信公众平台和微信开放平台的区别
  18. 2012服务器系统有什么版本的,Windows server 2012操作系统有哪几个版本
  19. 记录mysql查询过去十二个月中每个月的数据情况(含本月)
  20. 曾有一个人,爱我如生命(2)

热门文章

  1. android定位软件开发,android gps定位app源码(GpsTracker)
  2. 计算机主机通常包括( ) a运算器,通常我们把( )称为计算机主机. A. 运算器 B. 运算器.控制器和内存 C. 运算器和控制器 D. 运算器和内存...
  3. 朋友圈九宫格android,CUTTT - 四六九宫格藏图,玩出朋友圈新高度 - Android 应用 - 图像 - 【最美应用】...
  4. 2020中国高校计算机大赛网络技术挑战赛,中国高校计算机大赛-网络技术挑战赛的新跨越...
  5. pdf以文件流的形式导出乱码问题解决
  6. QQ邮箱SMTP限流
  7. VM30031:1 Uncaught ReferenceError: xxx is not defined
  8. 数据分析 时间序列分析 MA模型
  9. 占星家眼中的十二星座--处女座
  10. java创始人现在在哪里直播,三年败光120亿,他曾是某电商巨头创始人,如今却靠直播度日!...