appollo消息服务器,Springboot 集成 MQTT —— web 服务端实现(apollo 客户端)-Go语言中文社区...
基于 MQTT 可以实现很多场景,例如现在使用比较多的物联网,还有消息的实时推送。联网的设备连接上 apollo 服务器以后,一直监听 apollo 推送过来的信令/消息即可。
1、web 服务端向联网的设备推送信令/消息,上述截图的流程(1.1-1.2)。
1.1、web 服务端向 apollo 服务器发送信令/消息。
1.2、联网的设备通过订阅的主题,收到 web 服务端推送的信令/消息。
2、联网的设备 1 向联网的设备 2 发送信令/消息,上述截图的流程(2.1-2.4)。
2.1、设备 1 向 apollo 服务器发送接收方为设备 2 的消息/信令。
2.2、设备 2 向 web 服务端发起登录。
2.3、设备 2 在 web 服务端登录成功后,设备 2 与 apollo 服务器建立长连接。
2.4、设备 2 通过订阅的主题,收到设备 1 推送的信令/消息。
现在,整体结构已经比较明显了,接下来就会介绍 web 服务端的实现
基于 MQTT 长连接的 web 端实现
基于《Springboot 集成 MQTT —— 搭建 apollo 服务器(Windows)》 一文中搭建的 apollo 服务器,web 端需要配置 apollo 的连接。
标记部分的内容如下
# 用户名
mqtt.username=admin
# 密码
mqtt.password=password
mqtt.url=tcp://127.0.0.1:61613
# 生产者客户端 ID
mqtt.send.clientId=mqttSendClient
# 消费者客户端 ID
mqtt.recv.clientId=mqttRecvClient
添加上述配置的时候,应该会发现,有些内容不会自动提示,我们需要手动的将上述配置信息配置到 MQTT 客户端的实例中。新增 MqttConfig 类。
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
@Configuration
public class MqttConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
public static final String CHANNEL_RECV = "recvMsgChannel"; // 订阅消息的信道
public static final String CHANNEL_SEND = "sendMsgChannel"; // 发布消息的信道
public static final String TOPIC = "topic";
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.send.clientId}")
private String senderClientId;
@Value("${mqtt.recv.clientId}")
private String recverClientId;
// MQTT 客户端的连接器哦诶之
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 是否清空 session。false:服务器会保留客户端的连接记录,true:每次连接服务器都以新身份连接
options.setCleanSession(true);
options.setUserName(username); // 连接用户
options.setPassword(password.toCharArray()); // 连接密码
options.setServerURIs(url.split(",")); // 连接的服务器 url
options.setConnectionTimeout(10); // 超时时间(单位:s)
options.setKeepAliveInterval(20); // 保活心跳(单位:s),此方法没有重连机制
return options;
}
// 构造 MQTT 客户端
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
// MQTT 生产者客户端
@Bean
@ServiceActivator(inputChannel = CHANNEL_SEND)
public MessageHandler mqttMsgSend() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(senderClientId, mqttClientFactory());
messageHandler.setAsync(true);
return messageHandler;
}
// MQTT 消息订阅绑定(消费者)
@Bean
public MessageProducer mqttMsgRecv() {
// 可以订阅多个 Topic 的消息
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
recverClientId, mqttClientFactory(), TOPIC.split(","));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInboundChannel()); // 设置订阅通道
return adapter;
}
// MQTT信息通道(消费者)
@Bean(name = CHANNEL_RECV)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
// MQTT消息处理器(消费者,用于服务端自发自收的测试)
@Bean
@ServiceActivator(inputChannel = CHANNEL_RECV)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
LOGGER.error("msg:{}", message.getPayload());
}
};
}
}
新增一个消息发送接口 IMqttSender 。
import com.hosh.tech.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* MQTT生产者消息发送接口
* MessagingGateway要指定生产者的通道名称
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_SEND)
public interface IMqttSender {
/**
* 发送信息到MQTT服务器(实现发送全用户消息)
* @param data 消息内容
*/
void sendToMqtt(String data);
/**
* 发送信息到 MQTT 服务器(实现发送公告类消息——P2M)
* @param topic 主题
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/**
* 发送信息到 MQTT 服务器(实现发送点对点的消息)
* @param topic 主题
* @param qos 对消息处理的几种机制
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息
* 2 多了一次去重的动作,确保订阅者收到的消息有一次
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
测试 web 服务端的推送
web 服务端的发送消息的实现
为了方便测试,消息为自发自收,收消息的实现已经在 MqttConfig 中给出
通过 http 请求触发自发自收的过程,可以看到如下的打印信息
appollo消息服务器,Springboot 集成 MQTT —— web 服务端实现(apollo 客户端)-Go语言中文社区...相关推荐
- ssm配置socket_ssm框架中集成websocket实现服务端主动向客户端发送消息
找了很多配置文档及实例说明,也还是没能成功,最终在csdn博客中发现了基于stomp的消息推送的文章, 下面整理自csdn博客,https://blog.csdn.net/u013627689/art ...
- android mqtt服务器搭建,Mqtt从服务端到Android客户端搭建(mqtt服务端搭建)
一.简介 MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议. 此处不再引入官方文字描述,以个人开发认识浅谈一下 本文分为两部分: 1. M ...
- mongodb消息服务器,win10 MongoDB 3.6 服务端配置
MongoDB 3.6 开始,默认绑定 localhost ,如果要作为远程服务,需要做一些配置才行 开启临时 mongo 服务 进入 C:\Program Files\MongoDB\Server\ ...
- java+创建metaq生产者_微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置-Go语言中文社区...
参考资料 windows下配置rocketMQ 解压缩 系统环境变量配置 变量名:ROCKETMQ_HOME 变量值:MQ解压路径MQ文件夹名 启动NAMESERVER Cmd命令框执行进入至'MQ文 ...
- springboot集成mqtt(超级无敌详细)
springboot集成MQTT步骤 1. 引入pom依赖 <!-- mqtt --><dependency><groupId>org.springframewor ...
- Spring集成和Web服务
本文是我们名为" Spring Integration for EAI "的学院课程的一部分. 在本课程中,向您介绍了企业应用程序集成模式以及Spring Integration如 ...
- springboot集成mqtt相关配置+案例
sdk版本 spring 6.0 springboot 3.x 官网配置说明 spring 官方说明 https://docs.spring.io/spring-integration/referen ...
- Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件
场景 MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻 ...
- SpringBoot(23) 集成socket.io服务端和客户端实现通信
一.前言 websocket和socket.io区别? websocket 一种让客户端和服务器之间能进行双向实时通信的技术 使用时,虽然主流浏览器都已经支持,但仍然可能有不兼容的情况 适合用于cli ...
最新文章
- Python+OpenCV实现AI人脸识别身份认证系统(3)—训练人脸识别模型
- 查找只有一个字母不相同的单词
- 科大讯飞刷新SQuAD 2.0问答榜纪录,机器阅读理解全面超越人类
- javascript高级教程
- Anaconda 默认环境
- linux删除mysql安装_Linux安装删除MySQL
- Codevs 3002 石子归并 3(DP四边形不等式优化)
- 计算机科学与技术专业导论_教育部最新公布!西安工业大学新增4个本科专业!...
- 使C#代码现代化——第四部分:类型
- Android移动端测试——adb、monkey
- 命令让手机临时root_Linux 最常用命令(简单易学,但能解决 95% 以上的问题)
- 4.7 ResNet CNN、tensorflow实现——python实战
- bzoj1574[Usaco2009 Jan]地震损坏Damage*
- WinForm自定义ListBox显示样式
- IDM如何设置深色模式
- HTML站内搜索引擎
- Vue单页面应用性能优化实践
- IDLE的介绍和使用
- [GXYCTF 2019]Ping Ping Ping
- 写给大学时期自己的寄语
热门文章
- 联邦学习 | 无处不在的隐私泄露!
- vscode之 wget下载文件报错:ERROR: cannot verify data.vision.ee.ethz.ch‘s certificate
- 抖音做我女朋友的 vbs 脚本
- 7000字详解数据指标体系如何从设计到落地
- aws php 上传文件 限制大小_php如何实现文件上传下载-PHP问题
- python1到100奇数相加_Python:从inpu将奇数相加
- 通用局部搜索算法之WALKSAT
- Mysql聚簇索引和非聚簇索引原理(数据库)
- 【R】R语言指定包安装目录
- 深度学习自学第四周:近几年的经典神经网络结构