最近在完善毕设的路上,由于是设计一个远程控制物联网系统,所以服务端到硬件我选用了MQTT协议。因为MQTT的发布/订阅模式很适合这种场景。接下来就来聊聊遇到的一些问题以及解决思路吧。
毕设技术栈:springboot 、swagger、springdata、shiro、JWT、redis、rabbitmq、android(语音控制远程设备)、VUE、axios、ElementUI、arduino核心开发板、ESP32(无线接收模块)

小前奏

既然是基于MQTT协议的,那么前端(Vue)我就想着使用一个支持MQTT的库直接用就好。然后:MQTT库介绍
先安装个MQTT库:

npm install mqtt --save
 var mqtt = require('mqtt')var client  = mqtt.connect('mqtt://localhost:1883')client.on('connect', function () {client.subscribe('presence', function (err) {if (!err) {client.publish('presence', 'Hello mqtt')}})})client.on('message', function (topic, message) {// message is Bufferconsole.log(message.toString())client.end()})

运行上面的代码在vue中,浏览器直接报错了,

WebSocket connection to 'ws://localhost:1883/' failed:
Connection closed before receiving a handshake response

因为只能在node.js环境下才可以使用,浏览器环境是不支持的,因为数据包浏览器不支持解析,所以会报错。那么,该怎么办呢?
看下文:

一、安装RabbitMQ

这一步去官网直接下载安装就好了。RabbitMQ官网。
安装rabbitMQ,首先需要安装erlang
在这里查看版本适配:https://www.rabbitmq.com/which-erlang.html

二、开启插件

进入sbin目录。
1.启对mqtt的支持

rabbitmq-plugins enable rabbitmq_mqtt

2.开启web管理

rabbitmq-plugins enable rabbitmq_management

web在线管理:http://127.0.0.1:15672
默认账号密码:guest

3.开启 stomp

rabbitmq-plugins enable rabbitmq_web_stomp
rabbitmq-plugins enable rabbitmq_web_stomp_examples

三、安装库

npm install sockjs-client --save
npm install stompjs --save

四、引入库

  import SockJS from 'sockjs-client';import Stomp from 'stompjs';

五、执行代码

 function mqttStart() {console.log("进入mqtt初始化");var ws = new WebSocket('ws://127.0.0.1:15674/ws');var client = Stomp.over(ws);var on_connect = function () {console.log('connected');client.subscribe('/topic/test', (msg)=> {console.log("收到:"+msg.body)});};var on_error = function () {console.log('error');};//参数依次为:用户名,密码,连接回调,错误回调,虚拟主机名client.connect('guest', 'guest', on_connect, on_error, '/');}

ps:如果不需要使用stomp协议,又找到可以直接在浏览器环境连接MQTT服务器的库:paho.mqtt,使用这个库的效果跟步骤三四五六一样,但是不推荐这种用法,下面会说原因。

六、开始测试

如上代码订阅了test,所以我给test节点发布了一条消息,前端也成功收到了。

接下来,本文到此结束…emm…不行吧。感觉这也太敷衍了吧。是啊,我们回头看看这种写法有什么不好呢?
看看这段代码吧:

client.connect('guest', 'guest', on_connect, on_error, '/');

这太不好了吧,直接把MQTT服务分配的账号密码暴露在前端,肯定不行吧。对。这样当然不行,怎么解决呢?
直接把Websocket换成sockjs就可以了,因为上面我们已经安装过了Sockjs库,所以改成如下即可:
可以看到,此时我们连接mqtt服务器无需再使用账号密码了,而是通过一个websocket连接 ,使用stomp协议转化,间接连接了MQTT服务器,连接时带上登陆后的token,后端验证token是否正确,如若正确然后就可以握手成功了。

 mqttStart() {console.log("进入mqtt初始化");this.ws = Stomp.over(new SockJS('http://127.0.0.1:8080/ws?token='+localStorage.token));this.ws.heartbeat.outgoing = 0;this.ws.heartbeat.incoming = 0;  this.ws.connect({//用户唯一识别信息 因为我的项目有web以及android类型,所以这里需要记录是什么类型@web//即为web类型name: this.userInfo.username+"@web",}, (frame) => {// 专属通知,其中user为前缀,必填,说明该订阅节点为单一通知节点this.ws.subscribe('/user/topic/reply', (msg) => {console.log( msg.body);});//系统通知,其中、topic为前缀,后面配置websocket使用了前缀topic,this.ws.subscribe('/topic/notice', (msg) => {console.log("notice");console.log(msg.body);});});}

七、配置后端

7.1 配置websocket

首先配置下springboot,让其websocket配置支持sockjs

package com.correspond.mqtt.rabbitmq.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;/*** @author Anumbrella* 通过EnableWebSocketMessageBroker* 开启使用STOMP协议来传输基于代理(message broker)的消息,* 此时浏览器支持使用@MessageMapping 就像支持@RequestMapping一样。*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConfig.class);@Autowiredprivate MyChannelInterceptor inboundChannelInterceptor;@Autowiredprivate AuthHandshakeInterceptor authHandshakeInterceptor;@Autowiredprivate MyHandshakeHandler myHandshakeHandler;// 配置消息代理,哪种路径的消息会进行代理处理@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {//将其目的地前缀设置为“/topic”这样的话,设置"/topic"// Spring就能知道所有目的地前缀为“/topic”的消息都会发送到STOMP代理中。registry.enableStompBrokerRelay("/topic").setRelayHost("localhost")      // rabbitmq-host服务器地址.setRelayPort(61613)     // rabbitmq-stomp 服务器服务端口.setClientLogin("guest")   // 登陆账户.setClientPasscode("guest"); // 登陆密码//定义一对一推送的时候前缀registry.setUserDestinationPrefix("/user/");//所有目的地以“/message”打头的消息都将会路由到带有@MessageMapping注解的方法中,而不会发布到代理队列或主题中//客户端需要把消息发送到/message/xxx地址registry.setApplicationDestinationPrefixes("/message");}/*** 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,*      并且可以指定是否使用socketjs** @param registry*/@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/ws").setAllowedOrigins("*").setHandshakeHandler(myHandshakeHandler).addInterceptors(authHandshakeInterceptor) //添加拦截处理,这里authHandshakeInterceptor 封装的认证用户信息.withSockJS();LOGGER.info("com.init rabbitmq websocket endpoint ");}/*** 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间** @param registration*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {registration.interceptors(inboundChannelInterceptor);registration.taskExecutor()    // 线程信息.corePoolSize(400)     // 核心线程池.maxPoolSize(800)      // 最多线程池数.keepAliveSeconds(60); // 超过核心线程数后,空闲线程超时60秒则杀死}/*** 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间** @param registration*/@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registration) {registration.setSendTimeLimit(15 * 1000)    // 超时时间.setSendBufferSizeLimit(512 * 1024) // 缓存空间.setMessageSizeLimit(128 * 1024);   // 消息大小}}

7.2 配置websocket握手连接器

由于本项目是采用前后端分离架构,采用了JWT+shiro鉴权。因为我们要保证websocket服务器是不能被任意连接的,所以我们前端使用sockJS是还传递了一个Authorization参数,这个参数是用户登录成功后台生成的一个token值,前端要想连接我们的sockJS服务,就必须带着这个登录的Authorization参数值,由我们的握手连接器去验证,是否是合法连接。

package com.correspond.mqtt.rabbitmq.config;import com.web.jwt.util.TokenUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.util.Map;/*** @author raven*/
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(AuthHandshakeInterceptor.class);@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {LOGGER.info("===============before handshake=============");// 比如,只有登录后,才可以进行websocket连接ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) serverHttpRequest;String token = serverRequest.getServletRequest().getParameter("Authorization");if (token != null &&!TokenUtil.verify(token) ) {LOGGER.error("Token验证失败,连接失败!");return false;}return true;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {LOGGER.info("===============after handshake=============");}
}

7.3 配置websocket握手处理器

握手之前验证完之后,我们就需要准备开始握手的处理器了。包装客户端的信息。


package com.correspond.mqtt.rabbitmq.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;import javax.servlet.http.HttpServletRequest;
import java.security.Principal;
import java.util.Map;/*** @author raven*/
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler {private static final Logger LOGGER = LoggerFactory.getLogger(AuthHandshakeInterceptor.class);@Overrideprotected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {System.out.println("--------------------------------");if (request instanceof ServletServerHttpRequest) {ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request;HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest();/*** 这边就获取用户唯一信息用来包装*/final String token = httpRequest.getParameter("Authorization");return () -> token;}return null;}}

7.4 配置管道拦截器

握手成果之后,便开始对消息管道进行拦截,因为我们要用到Stomp代理我们的消息。

package com.correspond.mqtt.rabbitmq.config;import com.correspond.mqtt.rabbitmq.entity.MyPrincipal;
import com.correspond.mqtt.rabbitmq.util.SocketSessionRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;import java.util.LinkedList;
import java.util.Map;/*** @author Anumbrella*/
@Component
//ChannelInterceptorAdapter废弃了
public class MyChannelInterceptor implements ChannelInterceptor {@Autowiredprivate SocketSessionRegistry webAgentSessionRegistry;@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);/*** 1. 判断是否为首次连接请求,如果已经连接过,直接返回message* 、*/if (StompCommand.CONNECT.equals(accessor.getCommand())) {System.out.println("连接success");Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);if (raw instanceof Map) {Object nameObj = ((Map) raw).get("name");if (nameObj instanceof LinkedList) {String name = ((LinkedList) nameObj).get(0).toString();System.out.println("name:"+name);//设置当前访问器的认证用户accessor.setUser(new MyPrincipal(name));String sessionId = accessor.getSessionId();// 统计用户在线数,可通过redis来实现更好webAgentSessionRegistry.registerSessionId(name, sessionId);}}} else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {//点击断开连接,这里会执行两次,第二次执行的时候,message.getHeaders.size()=5,第一次是6。直接关闭浏览器,只会执行一次,size是5。System.out.println("断开连接");MyPrincipal principal = (MyPrincipal) message.getHeaders().get(SimpMessageHeaderAccessor.USER_HEADER);//  如果同时发生两个连接,只有都断开才能叫做不在线if (message.getHeaders().size() == 5 && principal.getName() != null) {String sessionId = accessor.getSessionId();webAgentSessionRegistry.unregisterSessionId(principal.getName(), sessionId);}}return message;}}

7.5 实现Principal

最后实现自己的Principal :

package com.correspond.mqtt.rabbitmq.entity;import java.security.Principal;/*** @author raven*/
public class MyPrincipal implements Principal {private String loginName;public MyPrincipal(String loginName) {this.loginName = loginName;}@Overridepublic String getName() {return loginName;}
}

这时候,我们的初始配置就完成了。接下来看看具体使用吧。
我们既可以提供http的形式推送消息,也可以使用原生的stomp推送消息。
首先看看http形式如何发布数据。
如下代码。

@Controller
@RequestMapping("/push_msg")
public class SendController {@Autowiredprivate SimpMessagingTemplate messagingTemplate;@GetMapping("/notice")public void notice(String msg) {//对所有用户通知消息messagingTemplate.convertAndSend("/topic/notice", msg);}@GetMapping("/user/{username}/{type}")public void notice(@PathVariable String username, @PathVariable String type,String msg) {messagingTemplate.convertAndSendToUser(username+"@"+type, "/topic/reply",msg);}
}

为了方便演示,我现在登录了两个用户,左图是admin,右图是test。
这时候,直接在浏览器内访问 /push_msg/notice?msg=hello world!,因为两个用户已经订阅了系统通知节点/topic/notice,所以可以看到消息已经推送至每个用户。

系统通知没有问题了。我们再试试对特定用户推送消息吧。

直接在浏览器访问 /push_msg/user/admin/web?msg=林深时见鹿,因为两个用户都已经订阅了自己独有的通知节点/user/topic/reply,且消息发送的路径用户为admin,所以可以看到消息已经推送至admin用户而没有推送给test用户。

如果我们想在前端通过Stmop发送消息呢?而不是通过HTTP,因为这只是一个测试,就安全角度来讲,直接使用HTTP推送消息也需要对该接口加密,而不是任意人都可以使用HTTP方式推送消息。

使用Sockjs,可以使用如下方式给后端推送信息。

//为什么要加前缀message呢?因为7.1配置只拦截/message 开头的消息
this.ws.send("/message/test", {}, JSON.stringify({'name': "123456"}));

可以看出来,使用注解@DestinationVariable获取路径里面的参数值

@Controller
public class ReceiveController {@MessageMapping("/web.{name}")@SendToUser("/topic/reply")public String say(String message, @DestinationVariable("name") String name) throws Exception {System.out.println("name:"+name+"用户来消息啦");return name+"发送:"+message;}
}

由于我们使用了@SendToUser注解,所以发送完成以后可以看到后台给前端推送的消息。
如果我们想把该用户传来的消息发送给所有用户,可以直接使用@SendTo("/topic/notice")注解,该场景适用于聊天群组消息推送。

场景一:如果我们现在想做一个聊天软件,用户A如何给用户B发送消息呢?很简单。

如下图可见,当我们点击发送按钮的时候,会发送一个格式化的字符串给后台,该字符串包含了当前输入的消息以及和要发送给的用户。这里为了方便演示,toUser写死为test,也就是说,现在我登录admin用户给test用户发信息看看test是否会受到呢?
我们先来看看前端怎么写:

我们再来看看后端怎么写

@Controller
public class ReceiveController {@Autowiredprivate SimpMessagingTemplate messagingTemplate;@MessageMapping("toFriend")public void toFriendMsg(String message){JSONObject msgJSON = JSON.parseObject(message);String username  = msgJSON.getString("toUser");String msg  = msgJSON.getString("message");messagingTemplate.convertAndSendToUser(username+"@web", "/topic/reply",msg);}
}

接下来看看效果图,test用户完全可以收到admin发送的消息,所以也就很轻松解决了该场景问题。

总结

本毕设现在已经到了尾声阶段,采用了前后端分离架构,加持shiro进行接口权限验证以及JWT无session保障用户状态。本篇博客总结了毕设的消息推送模块,应用如上应用场景,我们可以轻轻松松写一个适合自己的消息推送系统。本篇博客就到这里了,本人能力有限,如有地方书写错误,请留言批评指正!

如何利用springboot快速搭建一个消息推送系统相关推荐

  1. 利用springboot快速实现一个天气数据查询系统(附带页面)(五)

    pom文件: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3 ...

  2. windows和Linux利用Python快速搭建一个网站

    windows和Linux利用Python快速搭建一个网站 一.windows 步骤1:安装Python3(自行百度) 步骤2:在cmd窗口输入ipconfig查看本机ip地址,IPV4那一行.如:1 ...

  3. Flex通过Blazeds利用Remoteservice与后台java消息推送

    Flex通过Blazeds利用Remoteservice与后台java消息推送 准备工作:Myeclipse中先建立一个Web project工程,然后导入Blazeds的文件,再转换为Flex项目类 ...

  4. springboot实现微信模板消息推送

    springboot实现微信模板消息推送 在上一篇文章我们已经知道了怎么获取openid 还不知道的可以查看我的上一篇文章springboot+微信小程序用codeid换取openid 这次我们不光要 ...

  5. 如何构建一个消息推送平台

    01 背 景 B/S架构下很多业务场景下我们需要服务端主动推送消息到客户端,在html5之前一般使用长轮询(除此之外还有iframe流或者Flash Socket)的方式来实现,而长轮询的方式缺点很明 ...

  6. java/web/springboot项目使用WebSocket消息推送

    java/web/springboot项目使用WebSocket消息推送 最近项目中,有消息推送的广播和在线咨询的功能,以前也没搞过啊,有些小伙伴估计也是,那肯定要赶紧学习起来啊~ 不说废话,今天就告 ...

  7. Java版WebSocket消息推送系统搭建

    Java版WebSocket消息推送系统搭建 最近在做消息推送,网上查了一些资料,开始想的是用MQ来做,后面发现用WebSocket来做的话感觉应该要简单点,话不多说,准备撸代码. 后端核心代码 /* ...

  8. django+python搭建消息推送系统

    django+channels+websocket完成一个实时推送系统 前言 因公司项目需求,需要建立一个展示网站,接入两台摄像机,当摄像机的照片流传过来的时候,实时展示到网页中,所以需要做一个实时推 ...

  9. 消息推送系统——(零)推倒萝莉之术

    当一个初学Web开发的童鞋,产生让服务器"主动"给浏览器客户端发送数据的想法的时候,比他入门稍早的同学会说: "这是Web!只能由浏览器发起请求,然后得到服务器返回的数据 ...

最新文章

  1. LabVIEW机器视觉系统图像畸变、校准和矫正(基础篇—3)
  2. 南邮java实验一报告_南邮JAVA程序设计实验1 综合图形界面程序设计
  3. 人脸识别机器学习实战
  4. Visual Studio 2017错误:无法启动程序,在当前状态下操作不合法
  5. Matlab简单系统仿真示例1
  6. 杂项-QRCode:ZXing
  7. 如何更优雅的写出你的SQL语句
  8. 《Nmap渗透测试指南》—第7章7.8节后台打印机服务漏洞
  9. 在c语言中数据类型高低,C语言的数据类型
  10. yolo模型部署——tensorRT模型加速+triton服务器模型部署
  11. Windows PrintNightmare 漏洞和补丁分析
  12. php imagick gif,PHP基于php_imagick_st-Q8.dll实现JPG合成GIF图片的方法
  13. 《Inside C#》笔记(一) .NET平台
  14. 华为OLT快速配置指南
  15. steam加速_《盗贼之海》发行之初荣登Steam榜首,UU加速器为您开黑提供保障
  16. C语言实现部标JTT808
  17. 靖哥哥教你如何用java做爬虫抓取网站美女图片(详解步骤)
  18. git rebase使用简介
  19. 锐捷 linux 网卡信息失败,锐捷校园网linux有线认证图形客户端更新,解决了deepin下无反应问题...
  20. NB-IoT天线同轴电缆RG316、RG174、RG178

热门文章

  1. 一个Android菜鸟入门Flutter 笔记(一)
  2. latex表格过长的解决办法之一:缩小字体
  3. 2020-2021 ICPC - Gran Premio de Mexico - Repechaje
  4. 【转载】宜州市德胜镇
  5. 6s连接wifi上不了网络连接服务器未响应,iPhone 6/iPhone 6S 突然连不上wifi,wifi开了连不上wifi信号差...
  6. 【修色圣典】第一章 色彩、对比度和通道
  7. 统计表格中相同数据的出现个数
  8. pandas统计表格中politics列字符个数
  9. 银河麒麟V10(Kylin Linux V10)之DBeaver安装
  10. 计算机中汉字的顺序用什么牌,中国汉字的写做顺序,你知道吗?