这几天弄了下mqtt ,发现有很多问题,网上搜不到什么解决办法,所以自己记录下来,也让初识mqtt的人少走一些坑,关于我写的不对的也希望看到的人能指出来互相学习下

安装

说到mqtt,首先肯定要安装了,安装什么的地址:http://activemq.apache.org/ap...
我本地是Windows的环境,所以装的是Windows版本,这里是第一个注意的地方,因为后面使用的时候windows和linux的有一些不同

下载完成之后就是解压安装了,这里解压完成之后进入bin目录下,自己用cmd或者直接进去在此处打开命令窗口也行,然后运行apollo.cmd 创建一个服务实例我的实例名称是mybroker所以命令是 apollo.cmd create mybroker,这个名称自己可以随便指定

创建完实例后发现bin 目录下多了一个文件夹,这个文件夹就是你实例名称,进入文件夹运行
.apollo-broker.cmd run 命令

这样就启动成功了

启动成功可以去http://localhost:61680/console/index.html看看,登录账号和密码在mybrokeretcusers.properties文件中找到输入就可以进去了

页面上有连接信息和订阅主题的一些对应信息,有兴趣的自己看下,后面也会讲到的

使用

安装成功接下来就是使用了,首先创建一个maven工程,引入配置

    <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

由于我们后面处理订阅消息的消费者打印的日志是用了slf4j为了方便也引入了lombok的配置 :

    <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>

引入完成以后就可以开始准备开始使用mqtt了
这里为了方便维护和配置我把一些配置参数放在了properties文件里面:

#MQTT配置信息spring.mqtt.username=adminspring.mqtt.password=passwordspring.mqtt.url=tcp://localhost:61613spring.mqtt.client.id=clientIdspring.mqtt.server.id=serverIdspring.mqtt.default.topic=topic

这里我遇到了一个坑,专门注释了,就是订阅端订阅消息的id 和 发布端发布消息的id 一定不能一样,这样会导致mqtt识别到两个一样的id,消息一发就断开连接了,订阅端总是收不到消息,这个问题我找了好长时间都不知道问题出在哪,刚接触的很容易搞错,第二个问题就是mqtt的服务器连接地址,在Windows和linux下tcp的端口是不一样的,在启动的apollo的日志中可以看出来

监听的tcp端口是61613,看别人很多的demo上都是1883,如果一直连不上,原因可能是因为这个

接下来就是spring.mqtt.default.topic 配置了,这个是mqtt订阅和推送的消息主题,既然你想发消息那么订阅消息的主题和发布消息的主题一致才能收到消息,和rabbitmq一样

然后就是客户端

@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;@Beanpublic MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{hostUrl});mqttConnectOptions.setKeepAliveInterval(2);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}}  

这里有点问题,如果你是复制我的代码的话MessageHandler 这个类是没有的需要自己手动导包,看了源码发现这里需要的是一个消息处理的handler需要是org.springframework.messaging.MessageHandler的实现,直接导入这个包就行了

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {void sendToMqtt(String data);void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}

这个是消息发送接口,需要发送消息的时候直接调用就行了,提供了几个重载方法payload或者data是发送消息的内容
topic是消息发送的主题,这里可以自己灵活定义,也可以使用默认的主题,就是配置文件的主题,qos是mqtt 对消息处理的几种机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
当然,这三种模式下的性能肯定也不一样,qos=0是最好的,2是最差的 ,有兴趣的可以去详细了解我在这不多赘述

上面就完成了消息的发送,可以去http://localhost:61680/console/index.html看看消息的记录,这里我写了一个接口调用sendToMqtt方法发送一条消息

会看到收到有两个主题,我的是因为我订阅了两个主题所以上面显示的是两个,我的刚才发布消息的主题是too所以打开会看到too有消息送达过来

如果你还没写订阅方的话consumers是没有的,现在显示我发了7条消息,证明发送成功了

接下来就是订阅方,为了方便我就直接写在启动类上了,没有用到所有的配置

@SpringBootApplication
@EnableAutoConfiguration
public class MytestApplication {public static void main(String[] args) {SpringApplication.run(MytestApplication.class, args);}@Value("${spring.mqtt.server.id}")private String serverId;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setServerURIs("tcp://localhost:61613");factory.setUserName("admin");factory.setPassword("password");return factory;}//     consumer 订阅者监听消息@Beanpublic IntegrationFlow mqttInFlow() {return IntegrationFlows.from(mqttInbound()).transform(p -> p + ", received from MQTT").handle(logger()).get();}private LoggingHandler logger() {LoggingHandler loggingHandler = new LoggingHandler("INFO");loggingHandler.setLoggerName("siSample");return loggingHandler;}@Beanpublic MessageProducerSupport mqttInbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,mqttClientFactory(), "too");adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);return adapter;}}

这里订阅的主题可以指定,我订阅的是刚才发的too主题,还有订阅方的id 别和发送方的id 一样
重新启动项目,发送消息,会发现控制台已经打印出消息

代表订阅方已经成功收到消息,同时

也显示消息订阅方和记录,至此一个完整的消息发送和订阅完成,比较简单,但是一不留神很容易出现问题,希望能帮助到新入门的人

关于spring boot集成MQTT的一写新人问题相关推荐

  1. Spring Boot集成Swagger导入YApi@无界编程

    接口APi开发现状 现在开发接口都要在类似YApi上写文档,这样方便不同的团队之间协作,同步更新接口,提高效率. 但是如果接口很多,你一个个手工在YApi去录入无疑效率很低. 如果是使用Spring ...

  2. spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例...

    本文介绍spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例 集成swagger--对于做前后端分离的项目,后端只需要提供接口访问,swagger提供了接口 ...

  3. 6.3 Spring Boot集成mongodb开发

    6.3 Spring Boot集成mongodb开发 本章我们通过SpringBoot集成mongodb,Java,Kotlin开发一个极简社区文章博客系统. 0 mongodb简介 Mongo 的主 ...

  4. Spring Boot集成Hazelcast实现集群与分布式内存缓存

    2019独角兽企业重金招聘Python工程师标准>>> Hazelcast是Hazelcast公司开源的一款分布式内存数据库产品,提供弹性可扩展.高性能的分布式内存计算.并通过提供诸 ...

  5. Spring Boot 集成 Swagger 生成 RESTful API 文档

    原文链接: Spring Boot 集成 Swagger 生成 RESTful API 文档 简介 Swagger 官网是这么描述它的:The Best APIs are Built with Swa ...

  6. Spring Boot集成CKFinder

    2019独角兽企业重金招聘Python工程师标准>>> Spring Boot集成CKFinder,实现浏览功能. 前言 上一篇记录了Spring Boot集成CKEditor,这里 ...

  7. Spring Boot 集成 Druid 监控数据源

    关注"Java后端技术全栈" 回复"面试"获取全套大厂面试资料 Druid 介绍 Druid 是阿里巴巴开源平台上的一个项目,整个项目由数据库连接池.插件框架和 ...

  8. Spring Boot集成Redis缓存之RedisTemplate的方式

    前言 Spring Boot 集成Redis,将自动配置 RedisTemplate,在需要使用的类中注入RedisTemplate的bean即可使用 @Autowired private Redis ...

  9. spring boot—集成log4j2日志框架

    文章目录 市场上的日志框架 spring boot日志框架关系 移除默认日志框架 切换为log4j2日志框架 市场上的日志框架   1)日志门面最常用的是slf4j   2)日志实现最常用的是logb ...

最新文章

  1. 蓝桥杯练习系统习题-算法训练6
  2. Log4net 配置使用总结(一)
  3. 查看ubuntu linux开放的端口以及控制端口范围
  4. 基于java 工单管理_实训任务工单1-2(编写规范Java代码) 实训任务工单1-2(编写规范Java代码).docx_学小易找答案...
  5. LeetCode MySQL 1112. 每位学生的最高成绩
  6. 从wait_type入手模拟SQL Server Lock
  7. 提高django model效率的几个小方法
  8. linux内核disabled,Linux内核关闭IPv6协议的方式
  9. 10.深入分布式缓存:从原理到实践 --- EVCache探秘
  10. centos7 安装ftp服务
  11. 深入理解JVM虚拟机读书笔记——垃圾回收器
  12. Windows桌面美化——记录我的设置
  13. 前女友闺蜜给我发了一个压缩包,居然还带密码?暴力破解ZIP加密文件的密码!
  14. Android中半圆形背景
  15. Android 删除图片后刷新媒体库
  16. matlab怎么加采样开关,开关量采集模块怎么使用?
  17. 记录日记软件哪个好用
  18. esxi云虚拟服务器如何搭建,如何搭建esxi环境?
  19. C语言之memset函数
  20. dart和C语言计算CRC32结果不同

热门文章

  1. 牛津临床和实验室调查手册 Oxford Handbook of Clinical and Laboratory Investigation
  2. python 计算时间重叠_Python基于时间信息(即时、间隔)计算项目之间的相似性...
  3. 启动activemq_浅谈ActiveMQ与使用
  4. python不等式编程_python-指定大于scipy中的不等式
  5. 机器人学习--双目视觉测距
  6. 机器人学习--ROS/AMCL实现初始化粒子撒满整张地图和分步收敛
  7. 北斗导航 | 从存储的log日志(NMEA0183标准输出)中获取经纬度并在地图中绘制坐标轨迹(附Matlab源代码)
  8. 使用宝塔面板进行wordpress建站
  9. 语音识别中强制对齐_语音识别中的标注问题和嵌入式训练
  10. 怎么在页面中使用mixins_模压化粪池使用过程中怎么管理?