大家好,我是雄雄,欢迎关注微信公众号雄雄的小课堂

现在是:2023年3月5日19:03:49

前言

在上一篇文章中,我介绍了如何在服务器中安装emqx消息服务器,这是在操作mqtt协议的时候必不可少的步骤,今天我们就来看看如何将mqtt服务集成到springboot项目中。

刚开始在集成的时候,也在网上看了些资料,也遇到了些坑,最后参考的是这篇文章,然后加上自己的简单修改,以及博主的悉心指导,最后终于实现了我预期的效果。
参考文章连接:点击这里

注意,在实现mqtt的时候,一定要先启动emqx消息服务器的服务,关于emqx的安装与使用,可以移步到这里

点击这里
下面我们来看看实现代码。


springboot项目中集成mqtt服务

为了模拟的更加真实点儿,我这边做了两个端,分别是客户端服务端,代码基本都一样,客户端就是将服务端复制过来改了下关键部分。

服务端

一、在pom文件中引入所需依赖。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.20</version></dependency><!--工具类--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.3</version></dependency><!--mqtt相关配置--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId><version>5.5.9</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>6.0.2</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><version>5.3.20</version></dependency><!-- 集成redis依赖  --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

除了springboot 的依赖,其他的都有注释,如果mqtt的依赖引入报错的话,在重新引入一下就行.

二、在application.yml文件中加入mqtt的配置。

## MQTT配置
mqtt:host: tcp://127.0.0.1:1883userName: admin1passWord: 1234567qos: 1clientId: servetimeout: 10keepalive: 20

三、我这边为了后期编码方便,将一些公共部分都封装成了工具类,分别有redis的,ResponseResult的以及ResultCode

1.redis工具类:

package com.hookapi.common;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** @author: muxiongxiong* @date: 2023年02月22日 下午 5:48* 公众号:雄雄的小课堂* 博客:https://blog.csdn.net/qq_34137397* 个人站:http://www.穆雄雄.com* 个人站:http://www.muxiongxiong.cn* @Description: redis工具类*/
@Component
public class RedisUtil {@Autowiredprivate RedisTemplate redisTemplate;/*** 给一个指定的 key 值附加过期时间** @param key* @param time* @return*/public boolean expire(String key, long time) {return redisTemplate.expire(key, time, TimeUnit.SECONDS);}/*** 根据key 获取过期时间** @param key* @return*/public long getTime(String key) {return redisTemplate.getExpire(key, TimeUnit.SECONDS);}/*** 根据key 获取过期时间** @param key* @return*/public boolean hasKey(String key) {return redisTemplate.hasKey(key);}/*** 移除指定key 的过期时间** @param key* @return*/public boolean persist(String key) {return redisTemplate.boundValueOps(key).persist();}//- - - - - - - - - - - - - - - - - - - - -  String类型 - - - - - - - - - - - - - - - - - - - -/*** 根据key获取值** @param key 键* @return 值*/public Object get(String key) {return key == null ? null : redisTemplate.opsForValue().get(key);}/*** 将值放入缓存** @param key   键* @param value 值* @return true成功 false 失败*/public void set(String key, String value) {redisTemplate.opsForValue().set(key, value);}/*** 将值放入缓存并设置时间** @param key   键* @param value 值* @param time  时间(秒) -1为无期限* @return true成功 false 失败*/public void set(String key, String value, long time) {if (time > 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {redisTemplate.opsForValue().set(key, value);}}/*** 批量添加 key (重复的键会覆盖)** @param keyAndValue*/public void batchSet(Map<String, String> keyAndValue) {redisTemplate.opsForValue().multiSet(keyAndValue);}/*** 批量添加 key-value 只有在键不存在时,才添加* map 中只要有一个key存在,则全部不添加** @param keyAndValue*/public void batchSetIfAbsent(Map<String, String> keyAndValue) {redisTemplate.opsForValue().multiSetIfAbsent(keyAndValue);}/*** 对一个 key-value 的值进行加减操作,* 如果该 key 不存在 将创建一个key 并赋值该 number* 如果 key 存在,但 value 不是长整型 ,将报错** @param key* @param number*/public Long increment(String key, long number) {return redisTemplate.opsForValue().increment(key, number);}/*** 对一个 key-value 的值进行加减操作,* 如果该 key 不存在 将创建一个key 并赋值该 number* 如果 key 存在,但 value 不是 纯数字 ,将报错** @param key* @param number*/public Double increment(String key, double number) {return redisTemplate.opsForValue().increment(key, number);}//- - - - - - - - - - - - - - - - - - - - -  set类型 - - - - - - - - - - - - - - - - - - - -/*** 将数据放入set缓存** @param key 键* @return*/public void sSet(String key, String value) {redisTemplate.opsForSet().add(key, value);}/*** 获取变量中的值** @param key 键* @return*/public Set<Object> members(String key) {return redisTemplate.opsForSet().members(key);}/*** 随机获取变量中指定个数的元素** @param key   键* @param count 值* @return*/public void randomMembers(String key, long count) {redisTemplate.opsForSet().randomMembers(key, count);}/*** 随机获取变量中的元素** @param key 键* @return*/public Object randomMember(String key) {return redisTemplate.opsForSet().randomMember(key);}/*** 弹出变量中的元素** @param key 键* @return*/public Object pop(String key) {return redisTemplate.opsForSet().pop("setValue");}/*** 获取变量中值的长度** @param key 键* @return*/public long size(String key) {return redisTemplate.opsForSet().size(key);}/*** 根据value从一个set中查询,是否存在** @param key   键* @param value 值* @return true 存在 false不存在*/public boolean sHasKey(String key, Object value) {return redisTemplate.opsForSet().isMember(key, value);}/*** 检查给定的元素是否在变量中。** @param key 键* @param obj 元素对象* @return*/public boolean isMember(String key, Object obj) {return redisTemplate.opsForSet().isMember(key, obj);}/*** 转移变量的元素值到目的变量。** @param key     键* @param value   元素对象* @param destKey 元素对象* @return*/public boolean move(String key, String value, String destKey) {return redisTemplate.opsForSet().move(key, value, destKey);}/*** 批量移除set缓存中元素** @param key    键* @param values 值* @return*/public void remove(String key, Object... values) {redisTemplate.opsForSet().remove(key, values);}/*** 通过给定的key求2个set变量的差值** @param key     键* @param destKey 键* @return*/public Set<Set> difference(String key, String destKey) {return redisTemplate.opsForSet().difference(key, destKey);}//- - - - - - - - - - - - - - - - - - - - -  hash类型 - - - - - - - - - - - - - - - - - - - -/*** 加入缓存** @param key 键* @param map 键* @return*/public void add(String key, Map<String, String> map) {redisTemplate.opsForHash().putAll(key, map);}/*** 获取 key 下的 所有  hashkey 和 value** @param key 键* @return*/public Map<Object, Object> getHashEntries(String key) {return redisTemplate.opsForHash().entries(key);}/*** 验证指定 key 下 有没有指定的 hashkey** @param key* @param hashKey* @return*/public boolean hashKey(String key, String hashKey) {return redisTemplate.opsForHash().hasKey(key, hashKey);}/*** 获取指定key的值string** @param key  键* @param key2 键* @return*/public String getMapString(String key, String key2) {return redisTemplate.opsForHash().get("map1", "key1").toString();}/*** 获取指定的值Int** @param key  键* @param key2 键* @return*/public Integer getMapInt(String key, String key2) {return (Integer) redisTemplate.opsForHash().get("map1", "key1");}/*** 弹出元素并删除** @param key 键* @return*/public String popValue(String key) {return redisTemplate.opsForSet().pop(key).toString();}/*** 删除指定 hash 的 HashKey** @param key* @param hashKeys* @return 删除成功的 数量*/public Long delete(String key, String... hashKeys) {return redisTemplate.opsForHash().delete(key, hashKeys);}/*** 给指定 hash 的 hashkey 做增减操作** @param key* @param hashKey* @param number* @return*/public Long increment(String key, String hashKey, long number) {return redisTemplate.opsForHash().increment(key, hashKey, number);}/*** 给指定 hash 的 hashkey 做增减操作** @param key* @param hashKey* @param number* @return*/public Double increment(String key, String hashKey, Double number) {return redisTemplate.opsForHash().increment(key, hashKey, number);}/*** 获取 key 下的 所有 hashkey 字段** @param key* @return*/public Set<Object> hashKeys(String key) {return redisTemplate.opsForHash().keys(key);}/*** 获取指定 hash 下面的 键值对 数量** @param key* @return*/public Long hashSize(String key) {return redisTemplate.opsForHash().size(key);}//- - - - - - - - - - - - - - - - - - - - -  list类型 - - - - - - - - - - - - - - - - - - - -/*** 在变量左边添加元素值** @param key* @param value* @return*/public void leftPush(String key, Object value) {redisTemplate.opsForList().leftPush(key, value);}/*** 获取集合指定位置的值。** @param key* @param index* @return*/public Object index(String key, long index) {return redisTemplate.opsForList().index("list", 1);}/*** 获取指定区间的值。** @param key* @param start* @param end* @return*/public List<Object> range(String key, long start, long end) {return redisTemplate.opsForList().range(key, start, end);}/*** 把最后一个参数值放到指定集合的第一个出现中间参数的前面,* 如果中间参数值存在的话。** @param key* @param pivot* @param value* @return*/public void leftPush(String key, String pivot, String value) {redisTemplate.opsForList().leftPush(key, pivot, value);}/*** 向左边批量添加参数元素。** @param key* @param values* @return*/public void leftPushAll(String key, String... values) {//        redisTemplate.opsForList().leftPushAll(key,"w","x","y");redisTemplate.opsForList().leftPushAll(key, values);}/*** 向集合最右边添加元素。** @param key* @param value* @return*/public void leftPushAll(String key, String value) {redisTemplate.opsForList().rightPush(key, value);}/*** 向左边批量添加参数元素。** @param key* @param values* @return*/public void rightPushAll(String key, String... values) {//redisTemplate.opsForList().leftPushAll(key,"w","x","y");redisTemplate.opsForList().rightPushAll(key, values);}/*** 向已存在的集合中添加元素。** @param key* @param value* @return*/public void rightPushIfPresent(String key, Object value) {redisTemplate.opsForList().rightPushIfPresent(key, value);}/*** 向已存在的集合中添加元素。** @param key* @return*/public long listLength(String key) {return redisTemplate.opsForList().size(key);}/*** 移除集合中的左边第一个元素。** @param key* @return*/public void leftPop(String key) {redisTemplate.opsForList().leftPop(key);}/*** 移除集合中左边的元素在等待的时间里,如果超过等待的时间仍没有元素则退出。** @param key* @return*/public void leftPop(String key, long timeout, TimeUnit unit) {redisTemplate.opsForList().leftPop(key, timeout, unit);}/*** 移除集合中右边的元素。** @param key* @return*/public void rightPop(String key) {redisTemplate.opsForList().rightPop(key);}/*** 移除集合中右边的元素在等待的时间里,如果超过等待的时间仍没有元素则退出。** @param key* @return*/public void rightPop(String key, long timeout, TimeUnit unit) {redisTemplate.opsForList().rightPop(key, timeout, unit);}
}

2.ResponseResult统一返回结果类

package com.hookapi.common;import lombok.Data;import java.util.HashMap;
import java.util.Map;import static com.hookapi.common.ResultCode.*;/**
* @Description: TODO
* @author: 穆雄雄
* @date: 2023/3/5 下午 7:20
* @Return: 统一返回结果类
*/
@Data
public class ResponseResult {/*** 消息内容*/private String message;/*** 响应码:参考`ResultCode`*/private Integer code;/*** 响应中的数据*/private Object data;private Map<String,Object> extra = new HashMap<>();public ResponseResult putExtra(String key, Object value) {this.extra.put(key, value);return this;}public static ResponseResult error(String message) {return new ResponseResult(FAILURE.getCode(), message, null);}public static ResponseResult error() {return new ResponseResult(FAILURE.getCode(), ERROR.getDesc(), null);}public static ResponseResult error(Integer code, String message) {return new ResponseResult(code, message, null);}public static ResponseResult success() {return new ResponseResult(SUCCESS.getCode(), SUCCESS.getDesc(), null);}public static ResponseResult success(Object data) {return new ResponseResult(SUCCESS.getCode(),SUCCESS.getDesc(), data);}public static ResponseResult success(String message, Object data) {return new ResponseResult(SUCCESS.getCode(), message, data);}public static ResponseResult success(Integer code, String message, Object data) {return new ResponseResult(code, message, data);}public static ResponseResult success(Integer code, String message) {return new ResponseResult(code, message,null);}public ResponseResult(Integer code, String msg, Object data) {this.code = code;this.message = msg;this.data = data;}
}

3.ResultCode响应码枚举 - 可参考HTTP状态码的语义

package com.hookapi.common;/**
* @Description: TODO
* @author: 穆雄雄
* @date: 2023/3/5 下午 7:20
* @Return: 响应码枚举 - 可参考HTTP状态码的语义
*/
public enum ResultCode {//成功SUCCESS( 200, "SUCCESS" ),//失败FAILURE( 400, "FAILURE" ),/*** qq登录错误*/QQ_LOGIN_ERROR(53001, "qq登录错误"),/*** 微博登录错误*/WEIBO_LOGIN_ERROR(53002, "微博登录错误"),GITEE_LOGIN_ERROR(53002, "gitee登录错误"),// 系统级别错误码ERROR(-1, "操作异常"),ERROR_DEFAULT(500,"系统繁忙,请稍后重试"),NOT_LOGIN(401, "请先登录!"),NO_PERMISSION(-7,"无权限"),ERROR_PASSWORD(-8,"用户帐号或者密码错误!"),DISABLE_ACCOUNT(-9,"帐号已被禁用!"),EMAIL_DISABLE_LOGIN(-12,"该邮箱账号已被管理员禁止登录!"),// 服务层面EMAIL_ERROR(-10,"邮箱格式不对,请检查后重试!"),EMAIL_IS_EXIST(-11,"该邮箱已注册,请直接登录!"),PASSWORD_ILLEGAL(-13,"密码格式不合法!"),ERROR_EXCEPTION_MOBILE_CODE(10003,"验证码不正确或已过期,请重新输入"),FILE_UPLOAD_WAY_ERROR(10004,"文件上传方式不合法"),FILE_UPLOAD_ERROR(10005,"上传文件失败"),ERROR_USER_NOT_EXIST(10009, "用户不存在"),ERROR_MUST_REGISTER(10017,"请先注册帐号!"),PARAMS_ILLEGAL(10018,"参数不合法!!"),CATEGORY_IS_EXIST(10019,"该分类名称已存在!"),CATEGORY_IS_TOP(10020,"该分类已经在顶端!!"),DATA_TAG_IS_EXIST(10021,"该数据标签已存在!"),CRAWLING_ARTICLE_FAILED(10022,"抓取文章失败!"),ARTICLE_NOT_EXIST(10023,"数据库未存在该文章!");public int code;public String desc;ResultCode(int code, String desc) {this.code = code;this.desc = desc;}public int getCode() {return code;}public void setCode(int code) {this.code = code;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}
}

好了,上面这些都是些工具类,如果对于你来说没用,那就不用管,继续下面的。

四、封装的业务实体类:MqttMsg

package com.hookapi.entity;import lombok.Data;/*** @author: muxiongxiong* @date: 2023年02月18日 下午 3:09* 公众号:雄雄的小课堂* 博客:https://blog.csdn.net/qq_34137397* 个人站:http://www.穆雄雄.com* 个人站:http://www.muxiongxiong.cn* @Description: 实体类*/
@Data
public class MqttMsg {/*** 名称*/private String name = "";/*** 内容*/private String content = "";/*** 时间*/private String time = "";/*** 主题*/private String topicName = "";/*** 发送的qos数字* QoS0,At most once,至多一次;* QoS1,At least once,至少一次;* QoS2,Exactly once,确保只有一次。*/private int qos ;}

五、mqtt配置类MqttConfiguration

package com.hookapi.mqtt;import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: muxiongxiong* @date: 2023年02月18日 下午 3:04* 公众号:雄雄的小课堂* 博客:https://blog.csdn.net/qq_34137397* 个人站:http://www.穆雄雄.com* 个人站:http://www.muxiongxiong.cn* @Description: 类的描述*/
@Configuration
public class MqttConfiguration {private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);@Value("${mqtt.host}")String host;@Value("${mqtt.username}")String username;@Value("${mqtt.password}")String password;@Value("${mqtt.clientId}")String clientId;@Value("${mqtt.timeout}")int timeOut;@Value("${mqtt.keepalive}")int keepAlive;/*** 注入spring*/@Beanpublic MyMQTTClient myMQTTClientBean() {MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);for (int i = 0; i < 10; i++) {try {myMQTTClient.connect();return myMQTTClient;} catch (MqttException e) {log.error("MQTT connect exception,connect time = " + i);try {Thread.sleep(2000);} catch (InterruptedException e1) {e1.printStackTrace();}}}return myMQTTClient;}}

六、mqtt回调类:MyMQTTCallback,该类主要是在执行完操作之后进行回调,比如订阅消息、发送消息等都会回调此类。

package com.hookapi.mqtt;import cn.hutool.core.util.CharsetUtil;
import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;/*** @author: muxiongxiong* @date: 2023年02月18日 下午 3:07* 公众号:雄雄的小课堂* 博客:https://blog.csdn.net/qq_34137397* 个人站:http://www.穆雄雄.com* 个人站:http://www.muxiongxiong.cn* @Description: 回调类*/
public class MyMQTTCallback  implements MqttCallbackExtended {/*** 手动注入*///private final MqttConfiguration mqttConfiguration = SpringUtils.getBean(MqttConfiguration.class);private static final Logger log = LoggerFactory.getLogger(MyMQTTCallback.class);private MyMQTTClient myMQTTClient;public MyMQTTCallback(MyMQTTClient myMQTTClient) {this.myMQTTClient = myMQTTClient;}/*** 丢失连接,可在这里做重连* 只会调用一次** @param throwable*/@Overridepublic void connectionLost(Throwable throwable) {log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());long reconnectTimes = 1;while (true) {try {if (MyMQTTClient.getClient().isConnected()) {//判断已经重新连接成功  需要重新订阅主题 可以在这个if里面订阅主题  或者 connectComplete(方法里面)  看你们自己选择log.warn("mqtt reconnect success end  重新连接  重新订阅成功");return;}reconnectTimes+=1;log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);MyMQTTClient.getClient().reconnect();} catch (MqttException e) {log.error("mqtt断连异常", e);}try {Thread.sleep(5000);} catch (InterruptedException e1) {}}}/*** @param topic* @param mqttMessage* @throws Exception* subscribe后得到的消息会执行到这里面* 订阅者收到消息之后执行*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("服务端发完消息之后调用");log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));}/***连接成功后的回调 可以在这个方法执行 订阅主题  生成Bean的 MqttConfiguration方法中订阅主题 出现bug*重新连接后  主题也需要再次订阅  将重新订阅主题放在连接成功后的回调 比较合理* @param reconnect* @param serverURI*/@Overridepublic  void  connectComplete(boolean reconnect,String serverURI){log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");//订阅主题(可以在这里订阅主题)}/*** * 消息到达后* subscribe后,执行的回调函数* publish后,配送完成后回调的方法** @param iMqttDeliveryToken*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用");log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());}}

七、创建具体的业务实现类MyMQTTClient,此类中主要包含了订阅主题创建连接去掉订阅发送消息等操作。

package com.hookapi.mqtt;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @author: muxiongxiong* @date: 2023年02月18日 下午 3:05* 公众号:雄雄的小课堂* 博客:https://blog.csdn.net/qq_34137397* 个人站:http://www.穆雄雄.com* 个人站:http://www.muxiongxiong.cn* @Description: 客户端*/
@Slf4j
public class MyMQTTClient {private static MqttClient client;private String host;private String username;private String password;private String clientId;private int timeout;private int keepalive;public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {this.host = host;this.username = username;this.password = password;this.clientId = clientId;this.timeout = timeOut;this.keepalive = keepAlive;}public static MqttClient getClient() {return client;}public static void setClient(MqttClient client) {MyMQTTClient.client = client;}/*** 设置mqtt连接参数**/public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);options.setCleanSession(true);options.setAutomaticReconnect(true);return options;}/*** 连接mqtt服务端,得到MqttClient连接对象*/public void connect() throws MqttException {if (client == null) {client = new MqttClient(host, clientId, new MemoryPersistence());client.setCallback(new MyMQTTCallback(MyMQTTClient.this));}MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);if (!client.isConnected()) {client.connect(mqttConnectOptions);} else {client.disconnect();client.connect(mqttConnectOptions);}//未发生异常,则连接成功log.info("MQTT connect success");}/*** 发布,默认qos为0,非持久化**/public void publish(String pushMessage, String topic,int qos) {publish(pushMessage, topic, qos, false);}/*** 发布消息** @param pushMessage* @param topic* @param qos* @param retained:留存*/public void publish(String pushMessage, String topic, int qos, boolean retained) {MqttMessage message = new MqttMessage();message.setPayload(pushMessage.getBytes());message.setQos(qos);message.setRetained(retained);MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);if (null == mqttTopic) {log.error("主题没有找到");}//Delivery:配送MqttDeliveryToken token;//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充synchronized (this) {try {//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件token = mqttTopic.publish(message);token.waitForCompletion(1000L);} catch (MqttException e) {e.printStackTrace();}}}/*** 订阅某个主题** @param topic* @param qos*/public void subscribe(String topic, int qos) {try {MyMQTTClient.getClient().subscribe(topic, qos);log.info("订阅主题"+topic+"成功!");} catch (MqttException e) {e.printStackTrace();}}/*** 取消订阅主题** @param topic 主题名称*/public void cleanTopic(String topic) {if (client != null && client.isConnected()) {try {client.unsubscribe(topic);} catch (MqttException e) {e.printStackTrace();}} else {log.info("取消订阅主题失败!");}}
}

八、最后我们写个控制器测试一下:MqttController

package com.hookapi.controller;import com.hookapi.common.RedisUtil;
import com.hookapi.common.ResponseResult;
import com.hookapi.entity.MqttMsg;
import com.hookapi.mqtt.MyMQTTClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;/*** @author: muxiongxiong* @date: 2023年02月18日 下午 3:12* 公众号:雄雄的小课堂* 博客:https://blog.csdn.net/qq_34137397* 个人站:http://www.穆雄雄.com* 个人站:http://www.muxiongxiong.cn* @Description: 类的描述*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {/*** 客户端*/@Autowiredprivate MyMQTTClient myMQTTClient;/*** redis*/@Autowiredprivate RedisUtil redisUtil;/*** 创建主题* @param topicName* @return*/@PostMapping("/createTopic")public ResponseResult createTopic(String user,String topicName){//直接将主题放在缓存中,用的时候从缓存中取出来redisUtil.set(user,topicName);return ResponseResult.success("创建成功,主题为:"+topicName);}/*** 根据用户获取主题* @param user* @return*/@PostMapping("/getTopic")public ResponseResult getTopic(String user){String topicName = redisUtil.get(user).toString();return ResponseResult.success(topicName);}/*** 订阅主题*/@PostMapping("/subscribeTopic")public ResponseResult subscribeTopic(String user){String topicName = redisUtil.get(user).toString();myMQTTClient.subscribe(topicName,1);return ResponseResult.success("订阅"+topicName+"主题成功");}/*** 取消订阅主题*/@PostMapping("/cleanSubscribeTopic")public ResponseResult cleanSubscribeTopic(String user){String topicName = redisUtil.get(user).toString();myMQTTClient.cleanTopic(topicName);return ResponseResult.success("取消订阅"+topicName+"主题成功");}/*** 发送消息*/@PostMapping("/sendMsg")@ResponseBodypublic  synchronized ResponseResult sendMsg(@RequestBody MqttMsg mqttMsg){String result = "给主题:"+mqttMsg.getTopicName()+"发送成功";//发送消息myMQTTClient.publish(mqttMsg.getContent(),mqttMsg.getTopicName(),mqttMsg.getQos());return ResponseResult.success(result);}}

至此,服务端已经全部完成,现在我们实现一下客户端。

客户端

客户端很简单,我们直接将服务端复制一份出来,相当于是两个项目,一个服务端,一个客户端。
然后我们需要改一下下面几个地方。
一、yml文件的配置:

mqtt:host: tcp://127.0.0.1:1883userName: adminpassWord: 123456qos: 1clientId: clienttimeout: 10keepalive: 20

注意一定要改clientId,不然启动的时候会报错连接失败,然后一直在重新连接。

还有端口也记得改哈,不然端口就被占用了,我把端口改成了8093

二、改一下回调类MyMQTTCallback中的messageArrived方法:

  /*** @param topic* @param mqttMessage* @throws Exception* subscribe后得到的消息会执行到这里面* 订阅者收到消息之后执行*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("客户端接收到消息之后调用");log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));}

别的都不用动。

在apipost中调用接口测试。

1.创建主题:
http://localhost:8092/mqtt/createTopic

2.根据用户获取主题:http://localhost:8092/mqtt/getTopic

我们可以看到获取到的主题,就是我们创建的主题mxx

3.服务端订阅主题:http://localhost:8092/mqtt/subscribeTopic

4.客户端也需要订阅主题,不然服务端发送了消息,客户端收不到:http://localhost:8093/mqtt/subscribeTopic

5.服务端发送消息:http://localhost:8092/mqtt/sendMsg

然后我们看看控制台,消息有没有发送过来。

客户端:

服务端:

因为我们服务端和客户端都订阅了mxx主题,所以,发送的消息都可以收到。

小白版的springboot中集成mqtt服务(超级无敌详细),实现不了掐我头!!!相关推荐

  1. SpringBoot 自动配置原理(超级无敌详细)-2

    SpringBoot 自动配置原理(超级无敌详细)-1 2.自动配置的实现 刚刚我们整体的过了一下主配置文件是如何实现的,但我们还没深入的研究如何实现自动装配功能.我们回到这个文件下,找一个具体的自动 ...

  2. SpringBoot 自动配置原理(超级无敌详细)-1

    Spring Boot @SpringBootApplication 该注解标注在 某个类上, 说明该类为 SpringBoot的主配置类,SpringBoot 就应该运行这个类的main()方法来启 ...

  3. java中分割字符串总结 - 超级无敌详细版本。不仅要熟悉各种方法还要做到灵活运用。

    目录 1.split() 2.indexof() 3.lastIndexOf() 4.substing() 小技巧 1.split() 可有两个参数 只有第一个参数,就是以这个符号来分割 例如: St ...

  4. SpringBoot如何集成MQTT消息推送

    1.需求分析 近期笔者项目需要用到mqtt实现消息推送,笔者选择emq作为mqtt服务器载体,上篇笔者讲解了如何在linux中安装mqtt服务:https://blog.csdn.net/zhangx ...

  5. Elasticsearch实践(二)在Springboot微服务中集成搜索服务

    关于如何用Docker搭建Elasticsearch集群环境可以参考前一篇:Elasticsearch实践(一)用Docker搭建Elasticsearch集群.本文主要介绍,如果在Springboo ...

  6. springboot的jsp应该放在哪_在springboot中集成jsp开发

    springboot就是一个升级版的spring.它可以极大的简化xml配置文件,可以采用全注解形式开发,一个字就是很牛. 在springboot想要使用jsp开发,需要集成jsp,在springbo ...

  7. 【苹果imessage群发苹果推位置推】软件安装在系统中集成 USBMuxd 服务

    推荐内容IMESSGAE相关 作者推荐内容 iMessage苹果推软件 *** 点击即可查看作者要求内容信息 作者推荐内容 1.家庭推内容 *** 点击即可查看作者要求内容信息 作者推荐内容 2.相册 ...

  8. vscode中使用git,超级无敌简单

    vscode中使用git,超级无敌简单 一.复制远端地址 https://gitee.com/zhaojia77/react-foot.git 二.打开vscode 1.点击源代码管理--2.点击克隆 ...

  9. springboot集成mqtt(超级无敌详细)

    springboot集成MQTT步骤 1. 引入pom依赖 <!-- mqtt --><dependency><groupId>org.springframewor ...

最新文章

  1. python 如何查看模块所有方法-Python 查看模块的帮助文档,方法和帮助信息
  2. 三维空间碰撞问题;空间中两直线的最短距离及最近点
  3. 计算机职称在线考试报名系统,2020年计算机职称考试网上如何报名
  4. 26条C++的经典语录,哪几句戳中你的心!
  5. 五十个小技巧提高PHP执行效率
  6. Cannot add foreign key constraint 错误
  7. real-time RGB-D camera relocalization
  8. python:遍历文件夹下的所有文件
  9. matlab 图像保存为视频教程,山东大学《数字图像处理(MATLAB)》江铭炎视频教程
  10. C++坦克大战源代码
  11. FFmpeg mxf扩展hdr、bt2020
  12. 软件图标显示不正常的问题
  13. ES6 模板字符串基本用法
  14. AI资源对接需求汇总: 第4期
  15. 诱人福利:猎豹移动雇游轮带全员一块儿航海
  16. c语言 计算平均分
  17. android生成将布局生成海报保存并分享
  18. sdn主要包含哪些接口_解读SDN的东西、南北向接口
  19. 网页制作的形式美的规则
  20. js将HTML导出生成word文档

热门文章

  1. 印度区块链项目Matic Network的应用场景分析
  2. 大数据快速发展,离不开互联网、大计算和云数据的支持
  3. 做算法是屠龙,做工程是狩猎,做数据是养猪!
  4. 公司注册步骤包含的内容,公司注册步骤
  5. 手机连接谷歌浏览器进行联调_如何让任何人将手机连接到您的Google Home
  6. toolbar wpf 按钮带文字_Tob设计:中台设计组件按钮
  7. OPenCV 图像透视变换矫正
  8. 聊天没有表情包被嘲讽,程序员直接用python爬取了十万张表情包
  9. Geography和 Geometry 的区别
  10. gta5线上模式进不去云服务器,gta5ol线上连不上服务器|云端存档同步发生错误