springboot集成MQTT步骤

1. 引入pom依赖

   <!-- mqtt --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

2. application.yml

## MQTT##
mqtt:host: tcp://192.168.10.198:1883userName: rootpassWord: 123456qos: 1clientId: ClientId_local #ClientId_local必须唯一 比如你已经定了叫ABC  那你就一直叫ABC  其他地方就不要使用ABC了timeout: 10keepalive: 20topic1: A/pick/warn/#  #符号是代表整个warn下面的全部子主题 没有理解的话 可以百度仔细理解一下topic2: A/cmd/resptopic3: ABCFtopic4: ABCH

application.properties

## MQTT##
mqtt.host=tcp://192.168.10.198:1883
mqtt.clientId=ClientId_local
mqtt.username=admin
mqtt.password=123456
mqtt.timeout=10
mqtt.keepalive=20
mqtt.topic1=A/pick/warn/#

3. MqttConfiguration.java

import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/**1. @author WXY2. @date 2022/6/29 20:42*/
@Configuration
public class MqttConfiguration {private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);@Value("${mqtt.host}")String host;@Value("${mqtt.username}")String username;@Value("${mqtt.password}")String password;@Value("${mqtt.clientId}")String clientId;@Value("${mqtt.timeout}")int timeOut;@Value("${mqtt.keepalive}")int keepAlive;@Value("${mqtt.topic1}")String topic1;@Value("${mqtt.topic2}")String topic2;@Value("${mqtt.topic3}")String topic3;@Value("${mqtt.topic4}")String topic4;@Bean//注入springpublic MyMQTTClient myMQTTClient() {MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);for (int i = 0; i < 10; i++) {try {myMQTTClient.connect();//不同的主题//   myMQTTClient.subscribe(topic1, 1);//   myMQTTClient.subscribe(topic2, 1);//   myMQTTClient.subscribe(topic3, 1);//   myMQTTClient.subscribe(topic4, 1);return myMQTTClient;} catch (MqttException e) {log.error("MQTT connect exception,connect time = " + i);try {Thread.sleep(2000);} catch (InterruptedException e1) {e1.printStackTrace();}}}return myMQTTClient;}public String getTopic1() {return topic1;}public void setTopic1(String topic1) {this.topic1 = topic1;}public String getTopic2() {return topic2;}public void setTopic2(String topic2) {this.topic2 = topic2;}public String getTopic3() {return topic3;}public void setTopic3(String topic3) {this.topic3 = topic3;}public String getTopic4() {return topic4;}public void setTopic4(String topic4) {this.topic4 = topic4;}
}

4. MyMQTTClient.java

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/**1. @author WXY2. @date 2022/6/29 20:43*/
public class MyMQTTClient {private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTClient.class);private static MqttClient client;private String host;private String username;private String password;private String clientId;private int timeout;private int keepalive;public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {this.host = host;this.username = username;this.password = password;this.clientId = clientId;this.timeout = timeOut;this.keepalive = keepAlive;}public static MqttClient getClient() {return client;}public static void setClient(MqttClient client) {MyMQTTClient.client = client;}/*** 设置mqtt连接参数** @param username* @param password* @param timeout* @param keepalive* @return*/public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);options.setCleanSession(true);options.setAutomaticReconnect(true);return options;}/*** 连接mqtt服务端,得到MqttClient连接对象*/public void connect() throws MqttException {if (client == null) {client = new MqttClient(host, clientId, new MemoryPersistence());client.setCallback(new MyMQTTCallback(MyMQTTClient.this));}MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);if (!client.isConnected()) {client.connect(mqttConnectOptions);} else {client.disconnect();client.connect(mqttConnectOptions);}LOGGER.info("MQTT connect success");//未发生异常,则连接成功}/*** 发布,默认qos为0,非持久化** @param pushMessage* @param topic*/public void publish(String pushMessage, String topic) {publish(pushMessage, topic, 0, false);}/*** 发布消息** @param pushMessage* @param topic* @param qos* @param retained:留存*/public void publish(String pushMessage, String topic, int qos, boolean retained) {MqttMessage message = new MqttMessage();message.setPayload(pushMessage.getBytes());message.setQos(qos);message.setRetained(retained);MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);if (null == mqttTopic) {LOGGER.error("topic is not exist");}MqttDeliveryToken token;//Delivery:配送synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充try {token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件token.waitForCompletion(1000L);} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}}/*** 订阅某个主题** @param topic* @param qos*/public void subscribe(String topic, int qos) {try {MyMQTTClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 取消订阅主题** @param topic 主题名称*/public void cleanTopic(String topic) {if (client != null && client.isConnected()) {try {client.unsubscribe(topic);} catch (MqttException e) {e.printStackTrace();}} else {System.out.println("取消订阅失败!");}}
}

5. MyMQTTCallback.java

import cn.hutool.core.util.CharsetUtil;
import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;/*** @author WXY* @date 2022/6/29 20:43*/
public class MyMQTTCallback implements MqttCallbackExtended {//手动注入private MqttConfiguration mqttConfiguration = SpringUtils.getBean(MqttConfiguration.class);private static final Logger log = LoggerFactory.getLogger(MyMQTTCallback.class);private MyMQTTClient myMQTTClient;public MyMQTTCallback(MyMQTTClient myMQTTClient) {this.myMQTTClient = myMQTTClient;}/*** 丢失连接,可在这里做重连* 只会调用一次** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());long reconnectTimes = 1;while (true) {try {if (MyMQTTClient.getClient().isConnected()) {//判断已经重新连接成功  需要重新订阅主题 可以在这个if里面订阅主题  或者 connectComplete(方法里面)  看你们自己选择log.warn("mqtt reconnect success end  重新连接  重新订阅成功");return;}reconnectTimes+=1;log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);MyMQTTClient.getClient().reconnect();} catch (MqttException e) {log.error("mqtt断连异常", e);}try {Thread.sleep(5000);} catch (InterruptedException e1) {}}}/*** @param topic* @param mqttMessage* @throws Exception* subscribe后得到的消息会执行到这里面*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));//发布消息主题if (topic.equals("embed/resp")){Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));//你自己的业务接口insertCmdResults(maps);}//接收报警主题if (topic.equals("embed/warn")){Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));//你自己的业务接口insertPushAlarm(maps);}}/***连接成功后的回调 可以在这个方法执行 订阅主题  生成Bean的 MqttConfiguration方法中订阅主题 出现bug *重新连接后  主题也需要再次订阅  将重新订阅主题放在连接成功后的回调 比较合理* @param reconnect* @param serverURI*/@Overridepublic  void  connectComplete(boolean reconnect,String serverURI){log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");//订阅主题myMQTTClient.subscribe(mqttConfiguration.topic1, 1);myMQTTClient.subscribe(mqttConfiguration.topic2, 1);myMQTTClient.subscribe(mqttConfiguration.topic3, 1);myMQTTClient.subscribe(mqttConfiguration.topic4, 1);}/*** 消息到达后* subscribe后,执行的回调函数** @param s* @param mqttMessage* @throws Exception*//*** publish后,配送完成后回调的方法** @param iMqttDeliveryToken*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());}}

6. MqttMsg.java

/*** @author WXY* @date 2022/6/29 20:44*/
public class MqttMsg {private String name = "";private String content = "";private String time = "";public String getName() {return name;}public void setName(String name) {this.name = name;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public String getTime() {return time;}public void setTime(String time) {this.time = time;}@Overridepublic String toString() {return "MqttMsg{" +"name='" + name + '\'' +", content='" + content + '\'' +", time='" + time + '\'' +'}';}
}

7. MqttController.java

import com.gjwl.common.core.mqtt.MqttMsg;
import com.gjwl.common.core.mqtt.MyMQTTClient;
import net.sf.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;/*** @author WXY* @date 2022/6/29 20:44*/
@RestController
@RequestMapping("/sun/mqtt")
public class MqttController {@Autowiredprivate MyMQTTClient myMQTTClient;@Value("${mqtt.topic1}")String topic1;@Value("${mqtt.topic2}")String topic2;@Value("${mqtt.topic3}")String topic3;@Value("${mqtt.topic4}")String topic4;Queue<String> msgQueue = new LinkedList<>();int count = 1;@PostMapping("/getMsg")@ResponseBodypublic synchronized void mqttMsg(MqttMsg mqttMsg) {System.out.println("队列元素数量:" + msgQueue.size());System.out.println("***************" + mqttMsg.getName() + ":" + mqttMsg.getContent() + "****************");//时间格式化SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String time = df.format(new Date());mqttMsg.setTime(time);mqttMsg.setContent(mqttMsg.getContent() + "——后台编号:" + count);count++;//map转jsonJSONObject json = JSONObject.fromObject(mqttMsg);String sendMsg = json.toString();System.out.println(sendMsg);//队列添加元素boolean flag = msgQueue.offer(sendMsg);if (flag) {//发布消息  topic2 是你要发送到那个通道里面的主题 比如我要发送到topic2主题消息myMQTTClient.publish(msgQueue.poll(), topic2);System.out.println("时间戳" + System.currentTimeMillis());}System.out.println("队列元素数量:" + msgQueue.size());}
}

8.SpringUtils.java

import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import com.gjwl.common.utils.StringUtils;/*** spring工具类 方便在非spring管理环境中获取bean* * @author wxy*/
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware
{/** Spring应用上下文环境 */private static ConfigurableListableBeanFactory beanFactory;private static ApplicationContext applicationContext;@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {SpringUtils.beanFactory = beanFactory;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringUtils.applicationContext = applicationContext;}/*** 获取对象** @param name* @return Object 一个以所给名字注册的bean的实例* @throws org.springframework.beans.BeansException**/@SuppressWarnings("unchecked")public static <T> T getBean(String name) throws BeansException{return (T) beanFactory.getBean(name);}/*** 获取类型为requiredType的对象** @param clz* @return* @throws org.springframework.beans.BeansException**/public static <T> T getBean(Class<T> clz) throws BeansException{T result = (T) beanFactory.getBean(clz);return result;}/*** 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true** @param name* @return boolean*/public static boolean containsBean(String name){return beanFactory.containsBean(name);}/*** 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)** @param name* @return boolean* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException**/public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException{return beanFactory.isSingleton(name);}/*** @param name* @return Class 注册对象的类型* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException**/public static Class<?> getType(String name) throws NoSuchBeanDefinitionException{return beanFactory.getType(name);}/*** 如果给定的bean名字在bean定义中有别名,则返回这些别名** @param name* @return* @throws org.springframework.beans.factory.NoSuchBeanDefinitionException**/public static String[] getAliases(String name) throws NoSuchBeanDefinitionException{return beanFactory.getAliases(name);}/*** 获取aop代理对象* * @param invoker* @return*/@SuppressWarnings("unchecked")public static <T> T getAopProxy(T invoker){return (T) AopContext.currentProxy();}/*** 获取当前的环境配置,无配置返回null** @return 当前的环境配置*/public static String[] getActiveProfiles(){return applicationContext.getEnvironment().getActiveProfiles();}/*** 获取当前的环境配置,当有多个环境配置时,只获取第一个** @return 当前的环境配置*/public static String getActiveProfile(){final String[] activeProfiles = getActiveProfiles();return StringUtils.isNotEmpty(activeProfiles) ? activeProfiles[0] : null;}
}

8.测试

发送和接收 springboot后台日志

springboot集成mqtt(超级无敌详细)相关推荐

  1. SpringBoot 自动配置原理(超级无敌详细)-2

    SpringBoot 自动配置原理(超级无敌详细)-1 2.自动配置的实现 刚刚我们整体的过了一下主配置文件是如何实现的,但我们还没深入的研究如何实现自动装配功能.我们回到这个文件下,找一个具体的自动 ...

  2. appollo消息服务器,Springboot 集成 MQTT —— web 服务端实现(apollo 客户端)-Go语言中文社区...

    基于 MQTT 可以实现很多场景,例如现在使用比较多的物联网,还有消息的实时推送.联网的设备连接上 apollo 服务器以后,一直监听 apollo 推送过来的信令/消息即可. 1.web 服务端向联 ...

  3. hadoop +hbase+zookeeper 伪分布安装(超级无敌详细)

    hadoop +hbase+zookeeper 伪分布安装(超级无敌详细) hadoop 配置 图片打不开的可以点击下方链接直接去图床查看,辣鸡CSDN 安装jdk sudo apt update// ...

  4. SpringBoot集成163邮件发送详细配置,从163邮箱开始配置

    SpringBoot集成163邮件发送详细配置,从163邮箱开始配置 1.登录163邮箱 2.配置163邮箱 3.开始编写SpringBoot代码 1.创建SpringBoot项目然后引入依赖 2.编 ...

  5. 【超级无敌详细的黑马前端笔记!即时更新~】

    [超级无敌详细的黑马前端笔记!即时更新~] 这个笔记,是我自己在同步学习黑马前端使用的,不可以商用哦 学习路径 基础概念铺垫(了解) 认识网页 五大浏览器和渲染引擎 Web标准 HTML初体验 HTM ...

  6. 小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!

    大家好,我是雄雄,欢迎关注微信公众号雄雄的小课堂 现在是:2023年3月5日19:03:49 前言 在上一篇文章中,我介绍了如何在服务器中安装emqx消息服务器,这是在操作mqtt协议的时候必不可少的 ...

  7. SpringBoot 自动配置原理(超级无敌详细)-1

    Spring Boot @SpringBootApplication 该注解标注在 某个类上, 说明该类为 SpringBoot的主配置类,SpringBoot 就应该运行这个类的main()方法来启 ...

  8. 【MQTT】SpringBoot集成MQTT

    写在前面的话: 计划梳理MQTT集成至Java.Vue的系列文档,详见收录专栏. 该示例文章,已将相关方法封装至工具类,已实现断线重连,已将相关参数提取至配置文件. -- 真积力久则入 目录 一.前情 ...

  9. springboot集成mqtt

    文章目录 一.MQTT说明 1.1.mqtt文档 1.2.MQTT消息服务质量 1.1.1.归纳 二.MQTT环境搭建 三.boot集成原生mqtt 1.1.项目结构 1.2.依赖 1.3.appli ...

最新文章

  1. java查看文件夹下文件夹大小,java 获取文件夹大小,文件大小,文件个数
  2. [NHibernate]基本配置与测试
  3. 让PasswordRecovery控件使用Email地址找回密码
  4. 某谷 P1654 OSU!
  5. 天凉了,大家多穿衣服
  6. VirtualBox安装完Linux却进不了系统
  7. SSH+Oracle10G抛Disabling contextual LOB creation as createClob() m
  8. 计算机网络---ICMP、IGMP协议
  9. 右手残疾学计算机学什么专业好,我是右手和右脚残疾 左手和左脚好的 可以学残疾人驾照吗...
  10. fiddler显示服务器IP
  11. 数据库面试题(答案)
  12. php框架 tp laravel,TP框架和Laravel框架的区别是什么
  13. org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.itheima.
  14. win7 共享wifi 手机如何上网
  15. U盘格式化导致存储空间变小的解决方法汇总
  16. delphi显示jpg、png、gif图片
  17. Node.js脚本项目合集(一):Node.js+FFmpeg实现批量从B站导出离线缓存视频到mp4格式,mp4转mp3,实现听歌自由
  18. 如何加入到 wuhan2020 开源项目,打赢这场没有硝烟的战争?
  19. 每日一题 No.4 男女搭配干活不累
  20. python本科毕设_关于本科毕设选题请教问题

热门文章

  1. 学习OpenCV:滤镜系列(11)——高反差保留 (6.30修改版)
  2. 全网最简约的Anaconda+Python3.7安装教程Win10(百分百成功)
  3. LODOP纸张/打印机/份数/打印方向/双面打印 简短问答
  4. 【小程序项目开发-- 京东商城】uni-app开发之配置 tabBar 窗口样式
  5. mysql删除数据后释放磁盘空间
  6. 大数据分析建模思路技巧和算法的特征
  7. android reset无命令,wiping_手机出现wiping data无命令然后就关不了机了
  8. html链接外部css代码,html如何调用外部css?
  9. Firefox 31~34远程命令执行漏洞的分析
  10. 客如云×OceanBase:分布式云升级助力客如云降本增效