之前有分享过springBoot集成Websocket推送信息。今天主要是来继续分享升级版,这次是采用STOMP协议。用这个的好处有很多,比如可以屏蔽浏览器之间的差异,更方便对接消息中间件等。

一、协议理解

HTTP、WebSocket 等应用层协议,都是基于 TCP 协议来传输数据的。
HTTP不足在于它与服务器的全双工通信依靠轮询实现,对于需要从服务器主动发送数据的情境,会给服务器资源造成很大的浪费,WebSocket是针对HTTP在这种情况下的补充。
对于 WebSocket 来说,它必须依赖 HTTP 协议进行一次握手 ,握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。
WebSocket是一个完整的应用层协议,包含一套标准的 API 。
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。 STOMP协议可以建立在WebSocket之上,也可以建立在其他应用层协议之上。

二、依赖包

<!-- websocket支持 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 有说需要依赖这个的,我这里实际没有引入
<dependency><groupId>org.springframework</groupId><artifactId>spring-messaging</artifactId>
</dependency>
-->

三、能力集成核心代码

websocket用户类


import java.security.Principal;/*** @author zhengwen**/
public class WebSocketUser implements Principal {/*** 用户信息*/private final String name;public WebSocketUser(String name) {this.name = name;}@Overridepublic String getName() {return name;}
}

消息对象类
WebSocketMsgVo


import lombok.Data;import java.time.LocalDateTime;/*** websocket信息vo对象** @author zhengwen**/
@Data
public class WebSocketMsgVo<T> {/*** 发送方*/private String from;/*** 接收方*/private String to;/*** 时间*/private LocalDateTime time = LocalDateTime.now();/*** 平台来源*/private String platform;/*** 主题通道*/private String topicChannel;/*** 信息业务对象*/private T data;
}

这里data定义为抽象类,由业务系统自行定义。因为我们这是提供能力,所以尽量不要固定死。

配置类


import com.easylinkin.bm.handler.MyHandshakeHandler;
import com.easylinkin.bm.interceptor.MyHandshakeInterceptor;
import com.easylinkin.bm.interceptor.WebSocketUserInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;/*** websocket stomp协议配置类** @author zhengwen**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {/*** 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,* 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs** @param registry*/@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {/** 1. 将 /stomp/websocketJs路径注册为STOMP的端点,*    用户连接了这个端点后就可以进行websocket通讯,支持socketJs* 2. setAllowedOriginPatterns("*")表示可以跨域* 3. withSockJS()表示支持socktJS访问* 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器* 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息*///配置客户端连接地址registry.addEndpoint("/stomp/websocketJS").setAllowedOriginPatterns("*").addInterceptors(new MyHandshakeInterceptor()).setHandshakeHandler(new MyHandshakeHandler()).withSockJS();/** 添加多个端点* 它的实现类是WebMvcStompEndpointRegistry ,* addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,* 所以可以添加多个端点*/registry.addEndpoint("/stomp/websocket");}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 自定义调度器,用于控制心跳线程ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();// 线程池线程数,心跳连接开线程taskScheduler.setPoolSize(1);// 线程名前缀taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");// 初始化taskScheduler.initialize();// 设置广播节点registry.enableSimpleBroker("/ad", "/device", "/pay", "/data", "/warn", "/alone").setHeartbeatValue(new long[]{10000, 10000}).setTaskScheduler(taskScheduler);// 客户端向服务端发送消息需有/app 前缀registry.setApplicationDestinationPrefixes("/app");// 指定用户发送(一对一)的前缀 /user/registry.setUserDestinationPrefix("/user");}/*** 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间** @param registration*/@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registration) {/** 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节* 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节* 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒*/registration.setMessageSizeLimit(10240).setSendBufferSizeLimit(10240).setSendTimeLimit(10000);}/*** 配置客户端入站通道拦截器* 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间** @param registration*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {/** 配置消息线程池* 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务* 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程* 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒*/registration.taskExecutor().corePoolSize(10).maxPoolSize(20).keepAliveSeconds(60);registration.interceptors(new WebSocketUserInterceptor());}/*** 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间** @param registration*/@Overridepublic void configureClientOutboundChannel(ChannelRegistration registration) {registration.taskExecutor().corePoolSize(10).maxPoolSize(20).keepAliveSeconds(60);//registration.interceptors(new WebSocketUserInterceptor());}}

sebsocket的http握手拦截器
MyHandshakeInterceptor


import com.easylinkin.bm.vo.websocket.WebSocketUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.util.Map;/*** @author zhengwen**/
@Slf4j
public class MyHandshakeInterceptor implements HandshakeInterceptor {/*** websocket握手之前*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {log.info("--websocket的http连接握手之前--");ServletServerHttpRequest req = (ServletServerHttpRequest) request;WebSocketUser user = null;//获取token认证String token = req.getServletRequest().getParameter("token");//解析token获取用户信息//鉴权,我的方法是,前端把token传过来,解析token,判断正确与否,return true表示通过,false请求不通过。//TODO 鉴权设置用户if (StringUtils.isNotBlank(token)) {user = new WebSocketUser(token);}//如果token认证失败user为null,返回false拒绝握手if (user == null) {return false;}//保存认证用户attributes.put("user", user);return true;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {}
}

websocket的握手之后拦截器


import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;import java.security.Principal;
import java.util.Map;/*** @author zhengwen**/
@Slf4j
public class MyHandshakeHandler extends DefaultHandshakeHandler {@Overrideprotected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {log.info("--websocket的http连接握手之后--");//设置认证用户return (Principal) attributes.get("user");}
}

websocket设置自定义的连接通道拦截器
WebSocketUserInterceptor


import com.easylinkin.bm.vo.websocket.WebSocketUser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;import java.util.Map;/*** @author zhengwen**/
@Slf4j
public class WebSocketUserInterceptor implements ChannelInterceptor {@Overridepublic void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {log.info("--websocket信息发送后--");ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);}/*** 获取包含在stomp中的用户信息*/@SuppressWarnings("rawtypes")@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {log.info("--websocket信息发送前--");StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (accessor != null) {if (StompCommand.CONNECT.equals(accessor.getCommand())) {Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {Object nameObj = ((Map) raw).get("name");if (nameObj != null) {// 设置当前访问器的认证用户,或者做其他业务WebSocketUser webSocketUser = new WebSocketUser(String.valueOf(nameObj));accessor.setUser(webSocketUser);}}}}return message;}
}

WebSocketStompController


import com.easylinkin.bm.core.Result;
import com.easylinkin.bm.service.WebSocketService;
import com.easylinkin.bm.vo.websocket.WebSocketMsgVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** websocket stomp协议controller** @author zhengwen**/
@Slf4j
@RestController
@RequestMapping("/web/socket/stomp")
public class WebSocketStompController {@Autowiredprivate WebSocketService webSocketService;/*** 发送信息 stomp** @param webSocketMsgVo 信息对象vo* @return 统一出参*/@PostMapping("/sendStompMsg")@MessageMapping("/sendStompMsg")public Result<?> sendStompMsg(@RequestBody WebSocketMsgVo webSocketMsgVo) {log.info("--发送信息--");return webSocketService.sendStompMsg(webSocketMsgVo);}}

WebSocketService实现类


import com.alibaba.fastjson.JSON;
import com.easylinkin.bm.core.Result;
import com.easylinkin.bm.core.ResultGenerator;
import com.easylinkin.bm.service.WebSocketService;
import com.easylinkin.bm.vo.websocket.WebSocketMsgVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;/*** @author zhengwen**/
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class WebSocketServiceImpl implements WebSocketService {@Autowiredprivate SimpMessagingTemplate simpMessagingTemplate;@Overridepublic Result<?> sendStompMsg(WebSocketMsgVo webSocketMsgVo) {String topicChannel = webSocketMsgVo.getTopicChannel();if (StringUtils.isNotBlank(topicChannel)) {topicChannel = "/" + topicChannel;}String message = JSON.toJSONString(webSocketMsgVo);String to = webSocketMsgVo.getTo();try {if (StringUtils.isNotBlank(to)) {//MD 不明原因用convertAndSendToUser不能收到,确认订阅没有问题//simpMessagingTemplate.convertAndSendToUser(to, topicChannel, message);simpMessagingTemplate.convertAndSend(topicChannel + "/" + to, message);} else {simpMessagingTemplate.convertAndSend(topicChannel, message);}return ResultGenerator.genSuccessResult();} catch (Exception e) {return ResultGenerator.genFailResult("发送失败");}}
}

这里一对一发送按道理应该用template的convertAndSendToUser方法,但是死活没效果。这里先用这种方式实现的。

四、测试html

<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>Insert title here</title><link rel="stylesheet" href="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/css/bootstrap.min.css"><script src="http://cdn.static.runoob.com/libs/jquery/2.1.1/jquery.min.js"></script><script src="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/js/bootstrap.min.js"></script><script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script><script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script><script type="text/javascript">var userName = "zs";var sendTopic = "ad";//var subsc = '/ad';//广播//var subsc = '/user/' + userName + '/' + sendTopic;//一对一  /user/zs/ad//var subsc = "/user/zs/ad";var subsc = "/ad/zs";// 建立连接对象(还未发起连接)var socket = new SockJS("http://localhost:8099/stomp/websocketJS?token=zw");// 获取 STOMP 子协议的客户端对象var stompClient = Stomp.over(socket);stompClient.debug = function(str) {console.log("DEBUG---->" + str);};// 向服务器发起websocket连接并发送CONNECT帧stompClient.connect({name:userName,token:userName},function connectCallback(frame) {// 连接成功时(服务器响应 CONNECTED 帧)的回调方法setMessageInnerHTML("连接成功");console.log("---订阅:" + subsc);stompClient.subscribe(subsc, function (res) {console.log("----res:"+res);re = JSON.parse(res.body);console.log(re);setMessageInnerHTML("")setMessageInnerHTML("你接收到的消息为:" + re.data);});},function errorCallBack(error) {// 连接失败时(服务器响应 ERROR 帧)的回调方法setMessageInnerHTML("连接失败");});//发送消息function send() {var message = $("#content").val();var msg = {"data":message,"topicChannel":sendTopic,"to":"zs"};var messageJson = JSON.stringify(msg);stompClient.send("/app/sendStompMsg", {}, messageJson);sendMessageInnerHTML("/app/sendStompMsg 你发送的消息:" + message);}//将消息显示在网页上function setMessageInnerHTML(innerHTML) {$("#in").html(innerHTML + '<br/>');}function sendMessageInnerHTML(innerHTML) {$("#out").append(innerHTML + '<br/>');}$(function(){$("#btn").click(function(){send();});})</script></head>
<body><input id="content" class="form-control"><button id="btn" class="btn btn-info">发送</button><div id="in"></div><div id="out"></div>
</body>
</html>

上面的代码我就不多解释,我写了一些注释,上面也是找的博友的改改就开测了。

五、看效果

一对一发送

接口发送

六、总结
1、同事有springBoot直接集成websocket的,页面用的定时器做心跳重连,但是还是会出现被nginx断掉。这里用stomp是可以设置心跳的,用到项目上生产环境,让运维设置nginx放行ws协议的超时 + 设置指定url的超时限制,目前看是ok的。
2、controller上的注解标签很有迷惑性,postman请求的url很特别,可以自己去探索下。
3、stomp、webscoket的集成网上很多博友都有写,各有特色。我觉得写的不错的,可以看博友老郑来了的分享,还是我本家哦。
希望可以帮到到家。

SpringBoot集成websocket能力(stomp)相关推荐

  1. SpringBoot集成WebSocket

    一.什么是WebSocket WebSocket是一种网络传输协议,位于OSI模型的应用层.可在单个TCP连接上进行全双工通信,能更好的节省服务器资源和带宽并达到实时通讯. 客户端和服务端只需完成一次 ...

  2. springboot集成webSocket实现实时推送

    springboot集成webSocket实现实时推送 webSocket实现推送 webSocket是什么? 需求说明 websocket集成步骤 pom.xml webSocket实现 自定义处理 ...

  3. SpringBoot集成WebSocket实现及时通讯聊天功能!!!

    1:在SpringBoot的pom.xml文件里添加依赖: <!-- websocket --> <dependency><groupId>org.springfr ...

  4. Springboot集成websocket实现消息推送和在线用户统计

    一.HTTP 说到websocket首先要说Http,Http大家都知道是一个网络通信协议,每当客户端浏览器需要访问后台时都会发一个请求,服务器给出响应后该连接就会关闭,请求只能有客户端发起,服务端是 ...

  5. springboot集成websocket(一)客户端、客户端断线重连、客户端连接验证

    springboot集成websocket客户端 一.首先是导入依赖包 1.在pom.xml中加入下述即可 <!--websocket作为客户端 --><dependency> ...

  6. SpringBoot 集成 WebSocket,实现后台向前端推送信息

    作者 | 大树先生 来源 | https://blog.csdn.net/MacWx/article/details/111319558 前言 在一次项目开发中,使用到了Netty网络应用框架,以及M ...

  7. SpringBoot集成WebSocket,实现后台向前端推送信息

    作者 | 大树先生 来源 | https://blog.csdn.net/MacWx/article/details/111319558 前言 在一次项目开发中,使用到了Netty网络应用框架,以及M ...

  8. Springboot集成websocket实例

    一.简介 WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议. WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数 ...

  9. SpringBoot 集成 webSocket,实现后台向客户端推送消息

    图文等内容参考链接 SpringBoot2.0集成WebSocket,实现后台向前端推送信息_Moshow郑锴的博客-CSDN博客_springboot websocket WebSocket 简介 ...

最新文章

  1. Linux命令技巧之30个必会的命令技巧
  2. 看完这篇文章之后,终于明白了编译到底怎么回事
  3. Daily scrum[2013.11.28]
  4. python读取CIFAR10数据集并将数据集转换为PNG格式存储
  5. 【直播预告 | 今天10:30】多媒体技术 PI 第一期:OSS圆桌
  6. Codeforces Round #737 (Div. 2)
  7. 如何编写一个python项目
  8. vue添加html开启服务器_Vue 项目(HTML5 History 模式) 部署服务器
  9. 基于pyspark 大数据分析_基于阿里云平台的大数据教学案例 —— B站弹幕数据分析...
  10. github中markdown语言的使用规则
  11. 面试、笔试中常用的SQL语句(数据库知识必杀)一共50个!!!
  12. java 数据类型转换
  13. 基于车牌形状和颜色的车牌定位
  14. Android仿人人客户端(v5.7.1)——新鲜事之分享照片
  15. 记录下自己拙计的算法之旅 LeetCode Rotate Array
  16. 谷歌浏览器(chrome)无法正常打开网页的解决办法
  17. RabbitMQ高级特性-惰性队列
  18. 75道经典AI面试题,我就想把你们安排的明明白白的!(含答案)
  19. bootstrap 元素
  20. framework初始化错误,面试大厂应该注意哪些问题?隔壁都馋哭了

热门文章

  1. 网上邻居找不到打印机,手动IP安装流程
  2. AI一分钟|恒大集团总裁夏海钧兼任FF董事长,贾跃亭为CEO;特斯拉Model 3最快一个月就能提车...
  3. MSSQLSERVER服务无法启动的解决方案
  4. 2022-2028年中国酸辣粉产业发展动态及投资策略研究报告
  5. 基于java+springboot+mybatis+vue+elementui的羽毛球商城
  6. mysql union如何排序_Mysql中UNION用法与排序
  7. C#如何通过显示一次子窗体,判断返回的DialogResult的具体值
  8. 将html元素转成图片
  9. Web——企业信息文档管理系统
  10. BAPI相关知识整理