RabbitMQ的MQTT插件实现即时通讯

MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
MQTT相关概念
Publisher(发布者):消息的发出者,负责发送消息。
Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
Payload(负载);可以理解为发送消息的内容。
QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
QoS 2(Exactly Once):只有一次,确保消息只到达一次。
启用MQTT
需要先安装RabbitMQ,这里就不做介绍了,百度安装教程即可
启用RabbitMQ的MQTT插件了,默认是不启用的,在RabbitMQ的安装目录下的sbin目录里在地址栏输入cmd并回车启动命令行,使用如下命令开启即可;

rabbitmq-plugins enable rabbitmq_mqtt

开启成功后查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。
记得重启rabbitmq-service

MQTT客户端
这里我们使用的是MQTTBox,首先下载并安装好MQTTBox,下载地址:http://workswithweb.com/mqttbox.html
点击Create MQTT Client按钮来创建一个MQTT客户端;

接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;

再配置一个订阅者,订阅者订阅demoTopicA这个主题,我们会向这个主题发送消息;
配置好之后点击Subscribe

点击Publish发送消息,客端收到消息

前端直接实现即时通讯
既然MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,我将通过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!
由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;

rabbitmq-plugins enable rabbitmq_web_mqtt

开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;

WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库,项目地址:https://github.com/mqttjs/MQTT.js
接下来我们订阅不同的主题开启两个页面测试下功能(页面放在了SpringBoot应用的resource目录下了,需要先启动应用再访问):


在SpringBoot中使用
没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了

在pom.xml中添加MQTT相关依赖;

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>

在application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;

rabbitmq:mqtt:url: tcp://localhost:1883username: guestpassword: guestdefaultTopic: demoTopic

编写一个Java配置类从配置文件中读取配置便于使用;

@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {/*** RabbitMQ连接用户名*/private String username;/*** RabbitMQ连接密码*/private String password;/*** RabbitMQ的MQTT默认topic*/private String defaultTopic;/*** RabbitMQ的MQTT连接地址*/private String url;
}

添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;

@Slf4j
@Configuration
public class MqttInboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",mqttConfig.getDefaultTopic());adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());//设置消息质量:0->至多一次;1->至少一次;2->只有一次adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {//处理订阅消息log.info("handleMessage : {}",message.getPayload());}};}
}

添加MQTT消息发布者相关配置;

@Configuration
public class MqttOutboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] { mqttConfig.getUrl()});options.setUserName(mqttConfig.getUsername());options.setPassword(mqttConfig.getPassword().toCharArray());factory.setConnectionOptions(options);return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publisherClient", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
}

添加MQTT网关,用于向主题中发送消息;

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {/*** 发送消息到默认topic*/void sendToMqtt(String payload);/*** 发送消息到指定topic*/void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);/*** 发送消息到指定topic并设置QOS*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;

@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;/*** 向默认主题发送消息* @param payload* @return*/@PostMapping("/sendToDefaultTopic")public CommonResult sendToDefaultTopic(String payload) {System.out.println("payload:"+payload);mqttGateway.sendToMqtt(payload);return CommonResult.success(null);}/*** 向指定主题发送消息* @param payload* @param topic* @return*/@PostMapping("/sendToTopic")public CommonResult sendToTopic(String payload, String topic) {mqttGateway.sendToMqtt(payload, topic);return CommonResult.success(null);}
}

postman测试

后台接收消息打印

消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!

RabbitMQ的MTQQ插件实现即时通讯相关推荐

  1. RabbitMQ实现即时通讯-MQTT协议

    有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功.最近发现RabbitMQ可以很 ...

  2. Udesk即时通讯网页插件发送咨询对象(一、使用内嵌代码)

    作者:张振琦 最近接到了一个客户工单,咨询是否可以在即时通讯的聊天窗口里发送商品信息.Udesk即时通讯网页插件是提供了这个功能的,叫做咨询对象.我也整理了一下,网页插件实现发送咨询对象可分为三种方式 ...

  3. 微信小程序嵌入Udesk即时通讯网页插件

    作者:张振琦 小程序除了使用原生通讯方式接入Udesk,还可以使用web-view组件嵌入Udesk即时通讯网页插件的专用链接来实现. 原生接入方式可以参考以下文章 <Udesk微信小程序即时通 ...

  4. Android嵌入Udesk即时通讯网页插件(一、入门)

    作者:张振琦 Android 系统上实现Udesk即时通讯,除了使用Udesk提供的原生sdk以外还可以嵌入Udesk提供的即时通讯网页插件.Udesk即时通讯网页插件的内容,大家可以参考<Ud ...

  5. Udesk即时通讯网页插件离线消息推送

    作者:张振琦 Udesk即时通讯网页插件提供了,当客户离线后,客服回复的消息可以推送到我们自己的一个服务地址上的功能.这样就给了我们很大的发挥空间,我们可以使用短信.邮件等方式提醒客户上线查看消息,或 ...

  6. Udesk即时通讯(IM)网页插件入门

    作者:张振琦 Udesk能够支持APP,微信,企业微信,微信小程序,微博,web页面,六大即时通讯渠道. web渠道只需要管理员在后台建立即时通讯网页插件,然后将代码嵌入网页即可实现.在此基础上,还提 ...

  7. Udesk即时通讯网页插件: 按钮设置

    作者:张振琦 前篇我们介绍了Udesk即时通讯网页插件的创建以及如何使用.在网页插件的管理页面内除了基本信息还有其他的一些选项卡,按钮设置.窗口设置.邀请设置.导航菜单和其他设置. 本篇来介绍一下按钮 ...

  8. Udesk即时通讯网页插件专用链接传参介绍

    作者:张振琦 在<Udesk即时通讯(IM)网页插件入门>中说到了即时通讯的专用链接,也演示了浏览器直接访问和网页iframe加载的效果.本篇来介绍一下,专用链接支持的参数,通过给专用链接 ...

  9. 即时通首页html代码,Udesk即时通讯(IM)网页插件入门

    Udesk即时通讯(IM)网页插件入门 Udesk即时通讯(IM)网页插件入门 作者:张振琦 Udesk能够支持APP,微信,企业微信,微信小程序,微博,web页面,六大即时通讯渠道. web渠道只需 ...

最新文章

  1. 基于jquery的serializeArray
  2. poj 2492A Bug's Life(并查集)
  3. Tomcat - ClassFormatException的解决方法
  4. 每日一皮:用户永远不知道怎么用我们的产品...
  5. htm tt cite em u
  6. rest 接口怎么传list_如何设计一个优雅的RESTFUL的接口
  7. 大二上学期做的不入眼的导航系统。
  8. 20140923 cin.get() getline cin
  9. jenkins job config.xml结构
  10. C#学习笔记(十):反射
  11. throw new exception后程序不停止_Java之Exception剖析
  12. 使用Arcpy进行数据批处理-批量裁剪
  13. stm32 OV7670/摄像头模块颜色区域定位(腐蚀中心算法)
  14. idea添加scala环境_Scala篇:Scala环境及IDEA配置
  15. Linux 查看日志命令
  16. java有道翻译_java实现有道翻译爬虫
  17. pnuts系统的局限性_水平系统的局限性
  18. 状态码 https dns解析过程
  19. [转]将106键盘布局(日式键盘布局)改为101键盘布局(美式键盘布局)(
  20. 苹果发布蓝牙耳机新固件,耳机Find My功能越发普及

热门文章

  1. kali安装步骤失败
  2. 小程序下拉框自定义样式picker修改样式picker-view修改样式
  3. Failed connect to git hub.com:443; Connection refused解决方法
  4. 2022年质量员-装饰方向-通用基础(质量员)考试题库及模拟考试
  5. 机器学习与人脸识别3:人脸检测算法综述
  6. 华创期货:止损还是抗单决定亏损还是盈利
  7. 想裸辞的N个瞬间,裸辞后又陷入焦虑!
  8. mysql 周边x公里_mysql – 在“X”公里(或英里)内寻找城市
  9. 2021广联达暑期实习笔试C++
  10. 校运会计算机专业口号,大学运动会口号