文章目录

  • 一、MQTT说明
    • 1.1、mqtt文档
    • 1.2、MQTT消息服务质量
      • 1.1.1、归纳
  • 二、MQTT环境搭建
  • 三、boot集成原生mqtt
    • 1.1、项目结构
    • 1.2、依赖
    • 1.3、application.properties配置
    • 1.4、实体类
    • 1.5、mqtt配置类
    • 1.6、mqtt发布接口
    • 1.7、mqtt接收消息
    • 1.8、集成Swagger2配置
    • 1.9、mqtt测试类
    • 1.10、测试效果

一、MQTT说明

1.1、mqtt文档

官网:https://mqtt.org/
仅供参考:https://www.emqx.com/zh/mqtt

1.2、MQTT消息服务质量

MQTT规定了3种消息等级

  1. QoS 0:
    消息最多传递一次,不需要客户端给与回复,如果当时客户端不可用,则会丢失该消息。

  2. QoS 1:
    a、消息传递至少 1 次,发布者会发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内要收到接收者的应答,发布者若没收到应答,会将消息的 DUP 置为 1 并重发消息
    b、所以Qos 1消息级别取决于接受者在规定时间内给与发布者反馈,若没有反馈,则会再次接受到消息。

  3. QoS
    a、消息仅传送一次,发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。
    b、发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
    c、QoS 2 消息的核心是接收者给发布者反馈两次接收结果,相当于一次接收,一次确认接收

1.1.1、归纳

  1. QoS 0 消息只发一次,不在乎是否被别的客户端收到,只要发送了就算结束。
  2. QoS 1 消息需要消息接收者在规定时间内给予反馈,结束的标志是在发送后规定时间内收到反馈,否则就会一直发送。
  3. QoS 2 消息需要发送者和接收者双方互相进行消息确认,只要有一方没有确定就不会结束。

二、MQTT环境搭建

有2种方式
1、原生mqtt
2、rabbitmq的mqtt插件


第一种:centos、Ubuntu 安装mqtt和使用https://blog.csdn.net/qq_44413835/article/details/120606097

mqtt客户端下载
我是使用MQTTBox: https://dl.pconline.com.cn/download/1323304.html

mqttx下载:https://mqttx.app/zh

第二种:安装rabbitmq在开启mqtt插件-好处rabbitmq有web管理平台
注明:如果不会使用rabbitmq查看我的消息队列的专栏,里面有集成篇

docker安装rabbitmq
https://blog.csdn.net/qq_44413835/article/details/123648048

进入docker-rabbitmq容器

docker exec  -it rabbitmq  /bin/bash

安装后开启mqq插件

# 打开rabbitmq_mqtt
rabbitmq-plugins enable rabbitmq_mqtt
#打开rabbitmq_web_mqtt
rabbitmq-plugins enable rabbitmq_web_mqtt

如图:

三、boot集成原生mqtt

1.1、项目结构

版本boot:2.3.6.RELEASE、web工程

1.2、依赖

 <!--集成MQTT-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--开启流支持-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId>
</dependency><!--gson序列化工具-->
<dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId>
</dependency><!--lombok依赖-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency><!--Swagger-UI API文档生产工具-->
<dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.7.0</version>
</dependency>
<dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.7.0</version>
</dependency><!--健康检查-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency><!--SpringBoot配置处理器,自定义配置项-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional>
</dependency>

1.3、application.properties配置

spring.application.name=mqtt_demo
server.port=8080
# --------------mqtt配置-----------------------------
# 默认接受消息的主题--指定多个多级主题【物联网数据主题、对话主题-聊天室】
mqtt.receiver.defaultTopic=receive_iot_topic/#,receive_chat_topic/#
# 默认发送消息的主题
mqtt.sender.defaultTopic=test_send
# mqtt发送者的id
mqtt.sender.clientId=mqttProducer
# mqtt接收者的id-随机id来拼串
mqtt.receiver.clientId=${random.value}
# 地址和用户名密码
mqtt.url=tcp://服务器ip地址:1883
mqtt.username=用户名
mqtt.password=密码

1.4、实体类

IotData

package sqy.bean;import com.google.gson.annotations.SerializedName;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Date;@Component
public class IotData implements Serializable {@SerializedName("deviceid")String deviceid;//设备id@SerializedName("sensorid")String sensorid;//数据id@SerializedName("types")String types;//设备来源@SerializedName("loraid")String loraid;//loraid硬件的id@SerializedName("createtime")Date createtime;//创建时间@SerializedName("temp")float temp;//温度@SerializedName("humi")float humi;//湿度@SerializedName("light")float light;//光敏//get/set/tostring省略...

api响应的实体类

package sqy.rvo;/*** @author suqinyi* @Date 2022/4/15* 通用返回对象*/
public class ApiResult<T> {private long code;private String message;private T data;private final static long SUCCESS_CODE=1000;private final static long FAIL_CODE=2000;protected ApiResult() {}protected ApiResult(long code, String message, T data) {this.code = code;this.message = message;this.data = data;}/*** 成功返回结果*/public static <T> ApiResult<T> success(T data, String message) {return new ApiResult<T>(SUCCESS_CODE, message, data);}/*** 失败返回结果*/public static <T> ApiResult<T> failed(String message) {return new ApiResult<T>(FAIL_CODE, message, null);}//get/set/tostring省略...}

1.5、mqtt配置类

MqttConfig

package sqy.config.mqtt;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;
import sqy.service.mqtt.MqttCaseServiceImpl;/*** @author suqinyi* @Date 2022/4/15* mqtt的配置类*/
@Configuration
public class MqttConfig {/*** 发布的bean名称*/public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";public static final String CHANNEL_NAME_IN = "mqttInboundChannel";// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息private static final byte[] WILL_DATA;static {WILL_DATA = "offline".getBytes();}@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.url}")private String url;@Value("${mqtt.sender.clientId}")private String clientsId;@Value("${mqtt.sender.defaultTopic}")private String defaultsTopic;@Value("${mqtt.receiver.clientId}")private String clientcId;@Value("${mqtt.receiver.defaultTopic}")private String defaultcTopic;/*** MQTT连接器选项*/@Beanpublic MqttConnectOptions getSenderMqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();// 设置连接的用户名System.out.println(username);if (!username.trim().equals("")) {options.setUserName(username);}// 设置连接的密码options.setPassword(password.toCharArray());// 设置连接的地址options.setServerURIs(new String[]{url});// 设置超时时间 单位为秒options.setConnectionTimeout(100);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线// 但这个方法并没有重连的机制options.setKeepAliveInterval(30);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。options.setWill("willTopic", WILL_DATA, 2, false);return options;}/*** MQTT客户端*/@Beanpublic MqttPahoClientFactory senderMqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getSenderMqttConnectOptions());return factory;}/*** MQTT消息处理器(生产者)*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientsId, senderMqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultsTopic);return messageHandler;}/*** MQTT信息通道(生产者)*/@Bean(name = CHANNEL_NAME_OUT)public MessageChannel mqttOutboundChannel() {DirectChannel channel = new DirectChannel();return channel;}/*** MQTT信息通道(消费者)*/@Bean(name = CHANNEL_NAME_IN)public MessageChannel mqttInboundChannel() {DirectChannel channel = new DirectChannel();return channel;}/*** MQTT消息订阅绑定(消费者)*/@Beanpublic MessageProducer inbound() {String[] receiverTopics = StringUtils.split(defaultcTopic, ",");// 可以同时消费(订阅)多个TopicMqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter("re" + clientcId, senderMqttClientFactory(),receiverTopics);adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);// 设置订阅通道adapter.setOutputChannel(mqttInboundChannel());return adapter;}/*** MQTT消息处理器(消费者)*/@Bean@ServiceActivator(inputChannel = CHANNEL_NAME_IN)public MessageHandler handler() {MqttCaseServiceImpl service = new MqttCaseServiceImpl();return service;}
}

1.6、mqtt发布接口

IMqttSender

package sqy.mqtt;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import sqy.config.mqtt.MqttConfig;    /*** MQTT生产者消息发送接口* 通过接口将数据传递到集成流*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {/*** 发送信息到MQTT服务器** @param data 发送的文本*/void sendToMqtt(String data);/*** 发送信息到MQTT服务器** @param topic   主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送信息到MQTT服务器** @param topic   主题* @param qos     对消息处理的几种机制。*                0 表示的是订阅者没收到消息不会再次发送,消息会丢失。*                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。*                2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}

1.7、mqtt接收消息

MqttCaseServiceImpl

package sqy.service.mqtt;import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
import sqy.bean.IotData;/*** MQTT接收消息*/
@Service
public class MqttCaseServiceImpl implements MessageHandler {/*** MessageHeaders:* public static final String PREFIX = "mqtt_";* public static final String QOS = "mqtt_qos";* public static final String ID = "mqtt_id";* public static final String RECEIVED_QOS = "mqtt_receivedQos";* public static final String DUPLICATE = "mqtt_duplicate";* public static final String RETAINED = "mqtt_retained";* public static final String RECEIVED_RETAINED = "mqtt_receivedRetained";* public static final String TOPIC = "mqtt_topic";* public static final String RECEIVED_TOPIC = "mqtt_receivedTopic";* public static final String MESSAGE_EXPIRY_INTERVAL = "mqtt_messageExpiryInterval";* public static final String TOPIC_ALIAS = "mqtt_topicAlias";* public static final String RESPONSE_TOPIC = "mqtt_responseTopic";* public static final String CORRELATION_DATA = "mqtt_correlationData";*/@AutowiredGson gson;@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = (String) message.getHeaders().get("mqtt_receivedTopic");String payload = (String) message.getPayload();System.out.println("headers:" + topic + " 接收的数据:" + payload);if (topic.contains("receive_iot_topic")) {System.out.println("硬件的信息的主题");IotData entity = gson.fromJson(payload, IotData.class);if (entity!=null){//不是心跳数据if (!entity.getTypes().equals("heartbeat")) {//判断硬件来源if (entity.getTypes().equals("esp32")) {System.out.println("来着esp32的数据");//写入数据库 ...}}}else {System.out.println("序列化失败");}}if (topic.contains("receive_chat_topic")) {System.out.println("对话的主题");//...构建聊天室.....//...相互订阅发送消息就可以了....//...逻辑代码...}}
}

1.8、集成Swagger2配置

Swagger2Config

package sqy.config.swagger;import io.swagger.annotations.Api;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;/*** @author suqinyi* @Date 2022/4/15* Swagger2API文档的配置* http://localhost:8081/swagger-ui.html*/
@Configuration
@EnableSwagger2
public class Swagger2Config {@Beanpublic Docket createRestApi(){return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()//为当前包下controller生成API文档.apis(RequestHandlerSelectors.basePackage("sqy.controller"))//为有@Api注解的Controller生成API文档.apis(RequestHandlerSelectors.withClassAnnotation(Api.class)).paths(PathSelectors.any()).build();}private ApiInfo apiInfo() {return new ApiInfoBuilder().title("SwaggerUI演示").description("mqtt-demo").contact("sqy").version("1.0").build();}
}

1.9、mqtt测试类

MqttController

package sqy.controller;import com.google.gson.Gson;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import sqy.bean.IotData;
import sqy.mqtt.IMqttSender;
import sqy.rvo.ApiResult;/*** @author suqinyi* @Date 2022/4/15* MQTT测试接口*/
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {@AutowiredIMqttSender mqttSender;@AutowiredGson gson;//这个是外面配置文件里面的设置的接收主题之一private final static String SEND_TOPIC_PREFIX = "receive_iot_topic/";@ApiOperation("向指定主题发送消息")@PostMapping("/sendToTopic")public ApiResult sendToTopic(String topic, String payload) {/*** 想接收方方法消息-主题:receive_iot_topic/#,receive_chat_topic/#*/mqttSender.sendToMqtt(topic,payload);System.out.println("发送成功=>" + "主题:" + topic + "  载荷:" + payload);return ApiResult.success(null, "发送成功");}/*** 127.0.0.1:8081/mqtt/control_command* post、json* {*   "createtime": "2022-04-17T07:02:23.707Z",*   "deviceid": "001设备",*   "humi": 30,*   "light": 55,*   "loraid": "r001",*   "sensorid": "123456789",*   "temp": 100,*   "types": "esp32"* }*/@ApiOperation("模拟硬件发送的数据或控制指令")@PostMapping("/control_command")public ApiResult controlCommand(@RequestBody IotData iotData) {String deviceId = iotData.getDeviceid();// 前缀 + 设备号String topic = SEND_TOPIC_PREFIX + deviceId;String payload=gson.toJson(iotData);mqttSender.sendToMqtt( topic,payload);System.out.println("发送成功=>" + "主题:" + topic + "  载荷:" + payload);return ApiResult.success(null, "发送成功");}}

1.10、测试效果

  1. 登入swagger测试:http://localhost:8081/swagger-ui.html#/
  2. 或用post测试

或者使用postman测试:

127.0.0.1:8081/mqtt/control_command

{"deviceid": "001设备","humi": 30,"light": 55,"loraid": "r001","sensorid": "123456789","temp": 100,"types": "esp32","createtime": "2022-04-17T07:02:23.707Z"
}

如图:

postman测试图:

后台打印

mqttbox订阅接收

swagger测试gif图:

springboot集成mqtt相关推荐

  1. appollo消息服务器,Springboot 集成 MQTT —— web 服务端实现(apollo 客户端)-Go语言中文社区...

    基于 MQTT 可以实现很多场景,例如现在使用比较多的物联网,还有消息的实时推送.联网的设备连接上 apollo 服务器以后,一直监听 apollo 推送过来的信令/消息即可. 1.web 服务端向联 ...

  2. springboot集成mqtt(超级无敌详细)

    springboot集成MQTT步骤 1. 引入pom依赖 <!-- mqtt --><dependency><groupId>org.springframewor ...

  3. 【MQTT】SpringBoot集成MQTT

    写在前面的话: 计划梳理MQTT集成至Java.Vue的系列文档,详见收录专栏. 该示例文章,已将相关方法封装至工具类,已实现断线重连,已将相关参数提取至配置文件. -- 真积力久则入 目录 一.前情 ...

  4. springboot集成mqtt相关配置+案例

    sdk版本 spring 6.0 springboot 3.x 官网配置说明 spring 官方说明 https://docs.spring.io/spring-integration/referen ...

  5. springboot集成MQTT协议实现消息实时推送(未实现版)

    <!--mqtt依赖包--><dependency><groupId>org.springframework.integration</groupId> ...

  6. paho架构_基于paho springboot集成mqtt实现消息的发布订阅 - 人人都是架构师

    1.首先添加pom依赖 org.springframework.boot spring-boot-starter-integration org.springframework.integration ...

  7. 支付宝小程序集成MQTT

    支付宝小程序集成MQTT 1. 前言 ​ 由于支付宝只支持websocket连接,在尝试了很多npm安装mqtt亦或是使用paho-mqtt.js.重新编译过后的mqtt.js多方无果后,最终决定自己 ...

  8. 小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!

    大家好,我是雄雄,欢迎关注微信公众号雄雄的小课堂 现在是:2023年3月5日19:03:49 前言 在上一篇文章中,我介绍了如何在服务器中安装emqx消息服务器,这是在操作mqtt协议的时候必不可少的 ...

  9. Springboot 集成 mosquito MQTT服务

    Springboot 集成 mosquito MQTT服务 实现消息订阅与发布. pom依赖 <dependency><groupId>org.springframework. ...

最新文章

  1. 《OpenCV3编程入门》学习笔记5 Core组件进阶(一)访问图像中的像素
  2. svn 目录结构 trunk java_如何彻底删除SVN中的文件和文件夹(附恢复方法)
  3. 【Spring实战】—— 5 设值注入
  4. java xml 学习_java学习(四)xml
  5. python断点续传代码
  6. PyCharm如何集成PyQt
  7. Web Dynpro ABAP---ALV控件的使用
  8. android studio 连不上设备,Android Studio-设备已连接但“脱机”
  9. 一刀传世网页破天服务器同步,一刀传世破天1844服开服时间表_一刀传世新区开服预告_第一手游网手游开服表...
  10. Linux手机适配,nginx同时适配PC版和手机移动版
  11. html5 canvas 图像预览,html5-canvas 加载并显示图像
  12. matplotlib制作多张图
  13. crontab导致磁盘空间满问题的解决
  14. python进阶学习之路
  15. SpringBoot——springboot SPI原理与实战
  16. Linux驱动开发|音频驱动
  17. 依次计算一系列给定字符串的字母值,字母值为字符串中每个字母对应的编号值(A对应1,B对应2,以此类推,不区分大小写字母,非字母字符对应的值为0)的总和
  18. 【CSS3】Advanced1:Rounded Corners
  19. 手绘图说电子元器件-集成电路
  20. 解决chm文档字体太小的问题

热门文章

  1. java没有manifest.mf_java - IntelliJ IDEA中错误的Manifest.mf创建.j
  2. matlab怎样查看图像的动态范围,图像处理之动态范围压缩
  3. c++ std::move详解
  4. RPC框架项目的学习
  5. 最佳Linux防病毒软件推荐!
  6. 集群Slurm使用教程
  7. jQuery.parseJSON() 函数详解
  8. 抖音旅游营销,如何分析你的竞争对手?
  9. AUTOCAD 常用命令
  10. ViSP学习笔记(一):Ubuntu环境下ViSP安装