有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。最近发现RabbitMQ可以很方便的实现即时通讯功能,如果你没有特殊的业务需求,甚至可以不写后端代码,今天给大家讲讲如何使用RabbitMQ来实现即时通讯

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

MQTT相关概念

  • Publisher(发布者):消息的发出者,负责发送消息。

  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。

  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。

  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。

  • Payload(负载);可以理解为发送消息的内容。

  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0QoS 1QoS 2三个等级,下面分别介绍下:

    • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;

    • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;

    • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

RabbitMQ启用MQTT功能,需要先安装然RabbitMQ然后再启用MQTT插件。

  • 首先我们需要安装并启动RabbitMQ,对RabbitMQ不了解的朋友可以参考《花了3天总结的RabbitMQ实用技巧,有点东西!》;

  • 接下来就是启用RabbitMQ的MQTT插件了,默认是不启用的,使用如下命令开启即可;

rabbitmq-plugins enable rabbitmq_mqtt
  • 开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。

MQTT客户端

我们可以使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是MQTTBox这个客户端工具。

  • 首先下载并安装好MQTTBox,下载地址:http://workswithweb.com/mqttbox.html

  • 点击Create MQTT Client按钮来创建一个MQTT客户端;

  • 接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;

  • 再配置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;

  • 发布者向主题中发布消息,订阅者可以实时接收到。

前端直接实现即时通讯

既然MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,那我们是不是直接使用前端技术也可以实现即时通讯?答案是肯定的!下面我们将通过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!

  • 由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
  • 开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;

  • WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库,项目地址:https://github.com/mqttjs/MQTT.js

  • 实现的功能非常简单,一个单聊功能,需要注意的是配置好MQTT服务的访问地址为:ws://localhost:15675/ws
<!DOCTYPE html><html lang="en"><head>    <meta charset="UTF-8">    <title>Title</title></head><body><div>    <label>目标Topic:<input id="targetTopicInput" type="text"></label><br>    <label>发送消息:<input id="messageInput" type="text"></label><br>    <button onclick="sendMessage()">发送</button>    <button onclick="clearMessage()">清空</button>    <div id="messageDiv"></div></div></body><script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script><script>    //RabbitMQ的web-mqtt连接地址    const url = 'ws://localhost:15675/ws';    //获取订阅的topic    const topic = getQueryString("topic");    //连接到消息队列    let client = mqtt.connect(url);    client.on('connect', function () {        //连接成功后订阅topic        client.subscribe(topic, function (err) {            if (!err) {                showMessage("订阅topic:" + topic + "成功!");            }        });    });    //获取订阅topic中的消息    client.on('message', function (topic, message) {        showMessage("收到消息:" + message.toString());    });     //发送消息    function sendMessage() {        let targetTopic = document.getElementById("targetTopicInput").value;        let message = document.getElementById("messageInput").value;        //向目标topic中发送消息        client.publish(targetTopic, message);        showMessage("发送消息给" + targetTopic + "的消息:" + message);    }     //从URL中获取参数    function getQueryString(name) {        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");        let r = window.location.search.substr(1).match(reg);        if (r != null) {            return decodeURIComponent(r[2]);        }        return null;    }     //在消息列表中展示消息    function showMessage(message) {        let messageDiv = document.getElementById("messageDiv");        let messageEle = document.createElement("div");        messageEle.innerText = message;        messageDiv.appendChild(messageEle);    }     //清空消息列表    function clearMessage() {        let messageDiv = document.getElementById("messageDiv");        messageDiv.innerHTML = "";    }</script></html>
  • 接下来我们订阅不同的主题开启两个页面测试下功能(页面放在了SpringBoot应用的resource目录下了,需要先启动应用再访问):

    • 第一个订阅主题testTopicA,访问地址:http://localhost:8088/page/index?topic=testTopicA

    • 第二个订阅主题testTopicB,访问地址:http://localhost:8088/page/index?topic=testTopicB

  • 之后互相发送消息,让我们来看看效果吧!

在SpringBoot中使用

没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲如何在SpringBoot应用中使用MQTT。

  • 首先我们需要在pom.xml中添加MQTT相关依赖;
<!--Spring集成MQTT--><dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-mqtt</artifactId></dependency>
  • application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;
rabbitmq:  mqtt:    url: tcp://localhost:1883    username: guest    password: guest    defaultTopic: testTopic
  • 编写一个Java配置类从配置文件中读取配置便于使用;
/** * MQTT相关配置 * Created by macro on 2020/9/15. */@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig {    /**     * RabbitMQ连接用户名     */    private String username;    /**     * RabbitMQ连接密码     */    private String password;    /**     * RabbitMQ的MQTT默认topic     */    private String defaultTopic;    /**     * RabbitMQ的MQTT连接地址     */    private String url;}
  • 添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;
/** * MQTT消息订阅者相关配置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig {    @Autowired    private MqttConfig mqttConfig;     @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }     @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",                        mqttConfig.getDefaultTopic());        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //设置消息质量:0->至多一次;1->至少一次;2->只有一次        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }     @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {             @Override            public void handleMessage(Message<?> message) throws MessagingException {                //处理订阅消息                log.info("handleMessage : {}",message.getPayload());            }         };    }}
  • 添加MQTT消息发布者相关配置;
/** * MQTT消息发布者相关配置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig {     @Autowired    private MqttConfig mqttConfig;     @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] { mqttConfig.getUrl()});        options.setUserName(mqttConfig.getUsername());        options.setPassword(mqttConfig.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }     @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler =                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());        return messageHandler;    }     @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }}
  • 添加MQTT网关,用于向主题中发送消息;
/** * MQTT网关,通过接口将数据传递到集成流 * Created by macro on 2020/9/15. */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {    /**     * 发送消息到默认topic     */    void sendToMqtt(String payload);     /**     * 发送消息到指定topic     */    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);     /**     * 发送消息到指定topic并设置QOS     */    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
  • 添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;
/** * MQTT测试接口 * Created by macro on 2020/9/15. */@Api(tags = "MqttController", description = "MQTT测试接口")@RestController@RequestMapping("/mqtt")public class MqttController {     @Autowired    private MqttGateway mqttGateway;     @PostMapping("/sendToDefaultTopic")    @ApiOperation("向默认主题发送消息")    public CommonResult sendToDefaultTopic(String payload) {        mqttGateway.sendToMqtt(payload);        return CommonResult.success(null);    }     @PostMapping("/sendToTopic")    @ApiOperation("向指定主题发送消息")    public CommonResult sendToTopic(String payload, String topic) {        mqttGateway.sendToMqtt(payload, topic);        return CommonResult.success(null);    }}
  • 调用接口向主题中发送消息进行测试;

  • 后台成功接收到消息并进行打印。
2020-09-17 14:29:01.689  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息2020-09-17 14:29:06.101  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息2020-09-17 14:29:07.384  INFO 11192 --- [ubscriberClient] c.m.mall.tiny.config.MqttInboundConfig   : handleMessage : 来自网页上的消息

总结

消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!

项目源码地址

https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt

【备:其他参考网址
SpringBoot集成MQTT (超稳定)
MQTT协议,终于有人讲清楚了

RabbitMQ实现即时通讯-MQTT协议相关推荐

  1. RabbitMQ的MTQQ插件实现即时通讯

    RabbitMQ的MQTT插件实现即时通讯 MQTT协议 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish ...

  2. 即时通讯有标准 IM的四种即时通讯协议简介

    IM(Instant Messaging)正在被广泛地采用,特别是在公司与它们的客户互动联接方案上.为了解决即时通讯的标准问题,IETF成立了专门的工作小组,研究和开发与IM相关的协议. 目前IM有四 ...

  3. 2022-2028年中国即时通讯市场投资分析及前景预测报告

    [报告类型]产业研究 [出版时间]即时更新(交付时间约3个工作日) [发布机构]智研瞻产业研究院 [报告格式]PDF版 本报告介绍了即时通讯行业相关概述.中国即时通讯行业运行环境.分析了中国即时通讯行 ...

  4. Tigase进行即时通讯的实现

    文章目录 Tigase介绍 XMPP与MQTT协议介绍 安装Tigase 通过smack进行使用 导入maven 实现 结语 Tigase介绍 它是基于XMPP协议开发的服务器,与openfire类似 ...

  5. 关于 IMPP/XMPP/SIMPLE 这几种即时通讯协议和NAT traversal

    IM(Instant Messaging)正在被广泛地采用,特别是在公司与它们的客户互动联接方案上.为了解决即时通讯的标准问题,IETF成立了专门的工作小组,研究和开发与IM相关的协议. 目前IM有四 ...

  6. 主流的四种IM(IM:instant messaging,即时消息)协议

    转载:http://hi.baidu.com/zhaojinwei1986/blog/item/fe3b9f12d5784e24dd5401db.html XMPP(Extensible Messag ...

  7. 在VUE中利用MQTT协议实现即时通讯

    前言 建议先阅读: 在Node.js下运用MQTT协议实现即时通讯及离线推送 以前尝试在vue中用上mqtt,了解到mqtt实质上是基于websocket进行数据通信,所以上文中在node下实现的服务 ...

  8. NB-IOT模块与MQTT.fx使用MQTT协议通讯

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级消息协议,MQTT最大优 ...

  9. 兼具高效与易用,融云 IM 即时通讯长连接协议设计思路

    无论是 PC 端还是移动端,接入网络实现通信都需要建立双端的连接.关注[融云全球互联网通信云]了解更多 客户端和服务端建立连接后不断开,然后进行通信(也就是发送报文)的方式就是长连接. 与之相反,短连 ...

最新文章

  1. session过期设置
  2. C语言string.h常用函数总结
  3. [转]IE11下Forms身份认证无法保存Cookie的问题
  4. Javascript原型钩沉
  5. python通信原理_用python通过原始套接字发送scapy包
  6. ttysac1 java_基于Android的串口聊天室 (基于tiny4412) 一
  7. 常用机器学习算法汇总
  8. 基金前端代码和后端代码的区别 基金后端代码和基金前端代码区别
  9. sql 删除重复数据 只留一条
  10. ssr使用mysql数据库_MySQL数据库安装与配置详解
  11. matlab统计颗粒数,一种基于Matlab的谷物颗粒计数方法
  12. python reset_在python中创建'reset'方法以重置已编辑的字符串
  13. python 兼职多少钱一小时_无印良品兼职一小时多少钱?看完后就清楚了!
  14. 在用argparse的add_argument添加运行参数时,(bool类型)参数不生效
  15. 服务器导出word文档中有乱码,使用Aspose.word DOC转PDF文件乱码问题-Doc文件
  16. 蓝桥杯: Cowboys
  17. 2019.1.12日 PYTHON多线程爬虫笔记
  18. 数据集标注(在线标注,方便快捷)/YOLOV5自建数据集
  19. C++基础实例-文件Io等(5)
  20. Rust crates源国内加速镜像配置说明

热门文章

  1. linux下google浏览器字体不清晰,google浏览器的字体模糊的原因是什么_怎么解决 - 驱动管家...
  2. 高德地图marker屏蔽Label
  3. [结构光三维重建] 2、基于结构光的三维重建系统工作原理总结
  4. [大学物理实验-5]波尔共振实验
  5. vue 项目中页面打印实现(去除页眉页脚)
  6. 微信二维码扫描下载APK
  7. 部署Python的框架下的web app的详细教程
  8. java将小写数字变成大写输出
  9. java实现i18n国际化
  10. SAP中汇率取值选择逻辑分析测试