有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。最近发现RabbitMQ可以很方便的实现即时通讯功能,如果你没有特殊的业务需求,甚至可以不写后端代码,今天给大家讲讲如何使用RabbitMQ来实现即时通讯

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念

  • Publisher(发布者):消息的发出者,负责发送消息。

  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。

  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。

  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。

  • Payload(负载);可以理解为发送消息的内容。

  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0QoS 1QoS 2三个等级,下面分别介绍下:

    • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;

    • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;

    • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

RabbitMQ启用MQTT功能,需要先安装然RabbitMQ然后再启用MQTT插件。

  • 首先我们需要安装并启动RabbitMQ,对RabbitMQ不了解的朋友可以参考《花了3天总结的RabbitMQ实用技巧,有点东西!》;

  • 接下来就是启用RabbitMQ的MQTT插件了,默认是不启用的,使用如下命令开启即可;

rabbitmq-plugins enable rabbitmq_mqtt
  • 开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。

MQTT客户端

我们可以使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是MQTTBox这个客户端工具。

  • 首先下载并安装好MQTTBox,下载地址:http://workswithweb.com/mqttbox.html

  • 点击Create MQTT Client按钮来创建一个MQTT客户端;

  • 接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;

  • 再配置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;

  • 发布者向主题中发布消息,订阅者可以实时接收到。

前端直接实现即时通讯

既然MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,那我们是不是直接使用前端技术也可以实现即时通讯?答案是肯定的!下面我们将通过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!

  • 由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;

rabbitmq-plugins enable rabbitmq_web_mqtt
  • 开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;

  • WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库,项目地址:https://github.com/mqttjs/MQTT.js

  • 实现的功能非常简单,一个单聊功能,需要注意的是配置好MQTT服务的访问地址为:ws://localhost:15675/ws

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<div><label>目标Topic:<input id="targetTopicInput" type="text"></label><br><label>发送消息:<input id="messageInput" type="text"></label><br><button onclick="sendMessage()">发送</button><button onclick="clearMessage()">清空</button><div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>//RabbitMQ的web-mqtt连接地址const url = 'ws://localhost:15675/ws';//获取订阅的topicconst topic = getQueryString("topic");//连接到消息队列let client = mqtt.connect(url);client.on('connect', function () {//连接成功后订阅topicclient.subscribe(topic, function (err) {if (!err) {showMessage("订阅topic:" + topic + "成功!");}});});//获取订阅topic中的消息client.on('message', function (topic, message) {showMessage("收到消息:" + message.toString());});//发送消息function sendMessage() {let targetTopic = document.getElementById("targetTopicInput").value;let message = document.getElementById("messageInput").value;//向目标topic中发送消息client.publish(targetTopic, message);showMessage("发送消息给" + targetTopic + "的消息:" + message);}//从URL中获取参数function getQueryString(name) {let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");let r = window.location.search.substr(1).match(reg);if (r != null) {return decodeURIComponent(r[2]);}return null;}//在消息列表中展示消息function showMessage(message) {let messageDiv = document.getElementById("messageDiv");let messageEle = document.createElement("div");messageEle.innerText = message;messageDiv.appendChild(messageEle);}//清空消息列表function clearMessage() {let messageDiv = document.getElementById("messageDiv");messageDiv.innerHTML = "";}
</script>
</html>
  • 接下来我们订阅不同的主题开启两个页面测试下功能(页面放在了SpringBoot应用的resource目录下了,需要先启动应用再访问):

    • 第一个订阅主题testTopicA,访问地址:http://localhost:8088/page/index?topic=testTopicA

    • 第二个订阅主题testTopicB,访问地址:http://localhost:8088/page/index?topic=testTopicB

  • 之后互相发送消息,让我们来看看效果吧!

在SpringBoot中使用

没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲如何在SpringBoot应用中使用MQTT。

  • 首先我们需要在pom.xml中添加MQTT相关依赖;

<!--Spring集成MQTT-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
  • application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;

rabbitmq:mqtt:url: tcp://localhost:1883username: guestpassword: guestdefaultTopic: testTopic
  • 编写一个Java配置类从配置文件中读取配置便于使用;

/*** MQTT相关配置* Created by macro on 2020/9/15.*/
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {/*** RabbitMQ连接用户名*/private String username;/*** RabbitMQ连接密码*/private String password;/*** RabbitMQ的MQTT默认topic*/private String defaultTopic;/*** RabbitMQ的MQTT连接地址*/private String url;
}
  • 添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;

/*** MQTT消息订阅者相关配置* Created by macro on 2020/9/15.*/
@Slf4j
@Configuration
public class MqttInboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",mqttConfig.getDefaultTopic());adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());//设置消息质量:0->至多一次;1->至少一次;2->只有一次adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {//处理订阅消息log.info("handleMessage : {}",message.getPayload());}};}
}
  • 添加MQTT消息发布者相关配置;

/*** MQTT消息发布者相关配置* Created by macro on 2020/9/15.*/
@Configuration
public class MqttOutboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] { mqttConfig.getUrl()});options.setUserName(mqttConfig.getUsername());options.setPassword(mqttConfig.getPassword().toCharArray());factory.setConnectionOptions(options);return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publisherClient", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
}
  • 添加MQTT网关,用于向主题中发送消息;

/*** MQTT网关,通过接口将数据传递到集成流* Created by macro on 2020/9/15.*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {/*** 发送消息到默认topic*/void sendToMqtt(String payload);/*** 发送消息到指定topic*/void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);/*** 发送消息到指定topic并设置QOS*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
  • 添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;

/*** MQTT测试接口* Created by macro on 2020/9/15.*/
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;@PostMapping("/sendToDefaultTopic")@ApiOperation("向默认主题发送消息")public CommonResult sendToDefaultTopic(String payload) {mqttGateway.sendToMqtt(payload);return CommonResult.success(null);}@PostMapping("/sendToTopic")@ApiOperation("向指定主题发送消息")public CommonResult sendToTopic(String payload, String topic) {mqttGateway.sendToMqtt(payload, topic);return CommonResult.success(null);}
}
  • 调用接口向主题中发送消息进行测试;

  • 后台成功接收到消息并进行打印。

2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息
2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息

总结

消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!

项目源码地址

https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt

RabbitMQ实现即时通讯相关推荐

  1. RabbitMQ实现即时通讯-MQTT协议

    有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功.最近发现RabbitMQ可以很 ...

  2. RabbitMQ的MTQQ插件实现即时通讯

    RabbitMQ的MQTT插件实现即时通讯 MQTT协议 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish ...

  3. 一款 Java 开源的 Spring Boot 即时通讯 IM 聊天系统

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 开篇 电商平台最不能缺的就是即时通讯,例如通知类下发,客服 ...

  4. 推荐:一款Java开源的Springboot 即时通讯 IM 聊天系统

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 # 开篇 电商平台最不能缺的就是即时通讯,例如通知类下发,客服聊天 ...

  5. 即时通讯新手入门:一文读懂什么是Nginx?它能否实现IM的负载均衡?

    本文引用了"蔷薇Nina"的"Nginx 相关介绍(Nginx是什么?能干嘛?)"一文部分内容,感谢作者的无私分享. 1.引言 Nginx(及其衍生产品)是目前 ...

  6. Openfire XMPP Smack RTC IM 即时通讯 聊天 MD

    Markdown版本笔记 我的GitHub首页 我的博客 我的微信 我的邮箱 MyAndroidBlogs baiqiantao baiqiantao bqt20094 baiqiantao@sina ...

  7. 2022-2028年中国即时通讯市场投资分析及前景预测报告

    [报告类型]产业研究 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了即时通讯行业相关概述.中国即时通讯行业运行环境.分析了中国即时通讯行 ...

  8. 接入网易云信IM即时通讯的微信小程序聊天室

    微信小程序开发交流qq群   173683895    承接微信小程序开发.扫码加微信. 接入流程: 初次接触网易云通信IM服务,您可以通过以下产品介绍文档了解我们的产品功能.相关概念.业务限制: 产 ...

  9. 即时通讯下数据粘包、断包处理实例(基于CocoaAsyncSocket)

    来源:涂耀辉 www.jianshu.com/p/2e16572c9ddc 如有好文章投稿,请点击 → 这里了解详情 前言 本文旨以实例的方式,使用CocoaAsyncSocket这个框架进行数据封包 ...

  10. 在linux系统下实现音视频即时通讯的部分代码

    由于使用习惯,Linux在中国受欢迎程度远不如windows,相应的软件也比较少,尤其是音视频类的软件,但是,这并不代表就完全没有.下面介绍一款强大的音视频即时通讯平台给大家,它就是--Anychat ...

最新文章

  1. 上海.NET技术交流会
  2. 人脸识别应用场景不断拓展 刷脸要方便更要安全
  3. JS break语句和continue语句
  4. 开发常识 持续更新~~
  5. php iframe 上传图片,利用iframe+php实现图片的上传
  6. python opengl 截图_初试PyOpenGL二 (Python+OpenGL)基本地形生成与高度检测
  7. Linux计算求取文件长度
  8. 计算机视觉那些事儿(1):基本任务
  9. 关于Linux自带的python2.6.6升级到2.7.10版本步骤详解及pip、ipython的安装
  10. 设计模式学习笔记七:常用设计模式原则总结
  11. android html 换行_android TextView怎么设置个别字体颜色并换行?
  12. c++ struct与class
  13. 酒店ETL管理解决方案
  14. 远程审批、远程会议及远程培训就用天翼云办公
  15. 腾讯云COS全球加速让全球用户加速访问
  16. 分享123个ASP整站程序源码,总有一款适合您
  17. 【MATLAB】最速下降方法
  18. uniapp开发即时通讯聊天app,纯nvue仿微信,前后端开源
  19. vue的mounted和created方法的执行
  20. 中心透视投影和鱼眼投影的区别(Central perspective projection vs. fisheye projection)

热门文章

  1. mac用什么软件测试硬盘好坏,谁说果粉不在意性能?6款macOS下硬盘测速软件介绍...
  2. 汽车车载智能终端T-BOX
  3. linux 蓝牙 iphone,Linux On iPhone 7 现在可运行 Wayland
  4. R语言使用dplyr包计算dataframe分组聚合四分位距IQR值(四分位距(interquartile range, IQR),又称四分差)
  5. Shim、Polyfill
  6. html css前端框架,GitHub - zyj1022/wee: WEE—简单快速的响应式HTML/CSS前端框架
  7. C语言中.和-的区别
  8. Android Studio升级到3.5之后xml格式化问题
  9. 【高等数学】第二章 导数与微分——第一节 导数的概念
  10. 软件测试岗位面试经验分享