基于 MQTT 可以实现很多场景,例如现在使用比较多的物联网,还有消息的实时推送。联网的设备连接上 apollo 服务器以后,一直监听 apollo 推送过来的信令/消息即可。

1、web 服务端向联网的设备推送信令/消息,上述截图的流程(1.1-1.2)。

1.1、web 服务端向 apollo 服务器发送信令/消息。

1.2、联网的设备通过订阅的主题,收到 web 服务端推送的信令/消息。

2、联网的设备 1 向联网的设备 2 发送信令/消息,上述截图的流程(2.1-2.4)。

2.1、设备 1 向 apollo 服务器发送接收方为设备 2 的消息/信令。

2.2、设备 2 向 web 服务端发起登录。

2.3、设备 2 在 web 服务端登录成功后,设备 2 与 apollo 服务器建立长连接。

2.4、设备 2 通过订阅的主题,收到设备 1 推送的信令/消息。

现在,整体结构已经比较明显了,接下来就会介绍 web 服务端的实现

基于 MQTT 长连接的 web 端实现

基于《Springboot 集成 MQTT —— 搭建 apollo 服务器(Windows)》 一文中搭建的 apollo 服务器,web 端需要配置 apollo 的连接。

标记部分的内容如下

# 用户名

mqtt.username=admin

# 密码

mqtt.password=password

mqtt.url=tcp://127.0.0.1:61613

# 生产者客户端 ID

mqtt.send.clientId=mqttSendClient

# 消费者客户端 ID

mqtt.recv.clientId=mqttRecvClient

添加上述配置的时候,应该会发现,有些内容不会自动提示,我们需要手动的将上述配置信息配置到 MQTT 客户端的实例中。新增 MqttConfig 类。

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.core.MessageProducer;

import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;

import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;

import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.MessageHandler;

import org.springframework.messaging.MessagingException;

@Configuration

public class MqttConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);

public static final String CHANNEL_RECV = "recvMsgChannel"; // 订阅消息的信道

public static final String CHANNEL_SEND = "sendMsgChannel"; // 发布消息的信道

public static final String TOPIC = "topic";

@Value("${mqtt.username}")

private String username;

@Value("${mqtt.password}")

private String password;

@Value("${mqtt.url}")

private String url;

@Value("${mqtt.send.clientId}")

private String senderClientId;

@Value("${mqtt.recv.clientId}")

private String recverClientId;

// MQTT 客户端的连接器哦诶之

@Bean

public MqttConnectOptions getMqttConnectOptions() {

MqttConnectOptions options = new MqttConnectOptions();

// 是否清空 session。false:服务器会保留客户端的连接记录,true:每次连接服务器都以新身份连接

options.setCleanSession(true);

options.setUserName(username); // 连接用户

options.setPassword(password.toCharArray()); // 连接密码

options.setServerURIs(url.split(",")); // 连接的服务器 url

options.setConnectionTimeout(10); // 超时时间(单位:s)

options.setKeepAliveInterval(20); // 保活心跳(单位:s),此方法没有重连机制

return options;

}

// 构造 MQTT 客户端

@Bean

public MqttPahoClientFactory mqttClientFactory() {

DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

factory.setConnectionOptions(getMqttConnectOptions());

return factory;

}

// MQTT 生产者客户端

@Bean

@ServiceActivator(inputChannel = CHANNEL_SEND)

public MessageHandler mqttMsgSend() {

MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(senderClientId, mqttClientFactory());

messageHandler.setAsync(true);

return messageHandler;

}

// MQTT 消息订阅绑定(消费者)

@Bean

public MessageProducer mqttMsgRecv() {

// 可以订阅多个 Topic 的消息

MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(

recverClientId, mqttClientFactory(), TOPIC.split(","));

adapter.setCompletionTimeout(5000);

adapter.setConverter(new DefaultPahoMessageConverter());

adapter.setQos(2);

adapter.setOutputChannel(mqttInboundChannel()); // 设置订阅通道

return adapter;

}

// MQTT信息通道(消费者)

@Bean(name = CHANNEL_RECV)

public MessageChannel mqttInboundChannel() {

return new DirectChannel();

}

// MQTT消息处理器(消费者,用于服务端自发自收的测试)

@Bean

@ServiceActivator(inputChannel = CHANNEL_RECV)

public MessageHandler handler() {

return new MessageHandler() {

@Override

public void handleMessage(Message> message) throws MessagingException {

LOGGER.error("msg:{}", message.getPayload());

}

};

}

}

新增一个消息发送接口 IMqttSender 。

import com.hosh.tech.config.MqttConfig;

import org.springframework.integration.annotation.MessagingGateway;

import org.springframework.integration.mqtt.support.MqttHeaders;

import org.springframework.messaging.handler.annotation.Header;

import org.springframework.stereotype.Component;

/**

* MQTT生产者消息发送接口

* MessagingGateway要指定生产者的通道名称

*/

@Component

@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_SEND)

public interface IMqttSender {

/**

* 发送信息到MQTT服务器(实现发送全用户消息)

* @param data 消息内容

*/

void sendToMqtt(String data);

/**

* 发送信息到 MQTT 服务器(实现发送公告类消息——P2M)

* @param topic 主题

* @param payload 消息内容

*/

void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,

String payload);

/**

* 发送信息到 MQTT 服务器(实现发送点对点的消息)

* @param topic 主题

* @param qos 对消息处理的几种机制

* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失

* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息

* 2 多了一次去重的动作,确保订阅者收到的消息有一次

* @param payload 消息内容

*/

void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,

@Header(MqttHeaders.QOS) int qos,

String payload);

}

测试 web 服务端的推送

web 服务端的发送消息的实现

为了方便测试,消息为自发自收,收消息的实现已经在 MqttConfig 中给出

通过 http 请求触发自发自收的过程,可以看到如下的打印信息

appollo消息服务器,Springboot 集成 MQTT —— web 服务端实现(apollo 客户端)-Go语言中文社区...相关推荐

  1. ssm配置socket_ssm框架中集成websocket实现服务端主动向客户端发送消息

    找了很多配置文档及实例说明,也还是没能成功,最终在csdn博客中发现了基于stomp的消息推送的文章, 下面整理自csdn博客,https://blog.csdn.net/u013627689/art ...

  2. android mqtt服务器搭建,Mqtt从服务端到Android客户端搭建(mqtt服务端搭建)

    一.简介 MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议. 此处不再引入官方文字描述,以个人开发认识浅谈一下 本文分为两部分: 1. M ...

  3. mongodb消息服务器,win10 MongoDB 3.6 服务端配置

    MongoDB 3.6 开始,默认绑定 localhost ,如果要作为远程服务,需要做一些配置才行 开启临时 mongo 服务 进入 C:\Program Files\MongoDB\Server\ ...

  4. java+创建metaq生产者_微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置-Go语言中文社区...

    参考资料 windows下配置rocketMQ 解压缩 系统环境变量配置 变量名:ROCKETMQ_HOME 变量值:MQ解压路径MQ文件夹名 启动NAMESERVER Cmd命令框执行进入至'MQ文 ...

  5. springboot集成mqtt(超级无敌详细)

    springboot集成MQTT步骤 1. 引入pom依赖 <!-- mqtt --><dependency><groupId>org.springframewor ...

  6. Spring集成和Web服务

    本文是我们名为" Spring Integration for EAI "的学院课程的一部分. 在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如 ...

  7. springboot集成mqtt相关配置+案例

    sdk版本 spring 6.0 springboot 3.x 官网配置说明 spring 官方说明 https://docs.spring.io/spring-integration/referen ...

  8. Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件

    场景 MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻 ...

  9. SpringBoot(23) 集成socket.io服务端和客户端实现通信

    一.前言 websocket和socket.io区别? websocket 一种让客户端和服务器之间能进行双向实时通信的技术 使用时,虽然主流浏览器都已经支持,但仍然可能有不兼容的情况 适合用于cli ...

最新文章

  1. Python+OpenCV实现AI人脸识别身份认证系统(3)—训练人脸识别模型
  2. 查找只有一个字母不相同的单词
  3. 科大讯飞刷新SQuAD 2.0问答榜纪录,机器阅读理解全面超越人类
  4. javascript高级教程
  5. Anaconda 默认环境
  6. linux删除mysql安装_Linux安装删除MySQL
  7. Codevs 3002 石子归并 3(DP四边形不等式优化)
  8. 计算机科学与技术专业导论_教育部最新公布!西安工业大学新增4个本科专业!...
  9. 使C#代码现代化——第四部分:类型
  10. Android移动端测试——adb、monkey
  11. 命令让手机临时root_Linux 最常用命令(简单易学,但能解决 95% 以上的问题)
  12. 4.7 ResNet CNN、tensorflow实现——python实战
  13. bzoj1574[Usaco2009 Jan]地震损坏Damage*
  14. WinForm自定义ListBox显示样式
  15. IDM如何设置深色模式
  16. HTML站内搜索引擎
  17. Vue单页面应用性能优化实践
  18. IDLE的介绍和使用
  19. [GXYCTF 2019]Ping Ping Ping
  20. 写给大学时期自己的寄语

热门文章

  1. 联邦学习 | 无处不在的隐私泄露!
  2. vscode之 wget下载文件报错:ERROR: cannot verify data.vision.ee.ethz.ch‘s certificate
  3. 抖音做我女朋友的 vbs 脚本
  4. 7000字详解数据指标体系如何从设计到落地
  5. aws php 上传文件 限制大小_php如何实现文件上传下载-PHP问题
  6. python1到100奇数相加_Python:从inpu将奇数相加
  7. 通用局部搜索算法之WALKSAT
  8. Mysql聚簇索引和非聚簇索引原理(数据库)
  9. 【R】R语言指定包安装目录
  10. 深度学习自学第四周:近几年的经典神经网络结构