<!--mqtt依赖包--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency>
# Mqtt配置
mqtt:serverURIs: tcp://localhost:1883username: adminpassword: publicclient:id: ${random.value}topic: topic_default

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;@Slf4j
@Configuration
@IntegrationComponentScan
@Getter
@Setter
public class MqttConfig {public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";public static final String INPUT_CHANNEL = "mqttInputChannel";public static final String SUB_TOPICS = "PSimulation,Pressure,PSimulationPump,PSimulationPressure," +"PSimulationValve,PSimulationFlow,FSimulation,FSimulationPump,FSimulationPressure," +"FSimulationValve,FSimulationFlow,leak,blast";@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.serverURIs}")private String hostUrl;@Value("${mqtt.client.id}")private String clientId;@Value("${mqtt.topic}")private String defaultTopic;@PostConstructpublic void init() {log.debug("username:{} password:{} hostUrl:{} clientId :{} ",this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);}@Beanpublic MqttPahoClientFactory clientFactory() {final MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{hostUrl});options.setUserName(username);options.setPassword(password.toCharArray());final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(options);return factory;}@Bean(value = OUTBOUND_CHANNEL)public MessageChannel mqttOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)public MessageHandler mqttOutbound() {final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());handler.setDefaultQos(1);handler.setDefaultRetained(false);handler.setDefaultTopic(defaultTopic);handler.setAsync(false);handler.setAsyncEvents(false);return handler;}/*** MQTT消息接收处理*///接收通道@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}//配置client,监听的topic@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", clientFactory(), SUB_TOPICS.split(","));adapter.setCompletionTimeout(3000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}//通过通道获取数据@Bean@ServiceActivator(inputChannel = INPUT_CHANNEL)public MessageHandler handler() {return message -> {String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();log.info("topic: {}", topic);String[] topics = SUB_TOPICS.split(",");for (String t : topics) {if (t.equals(topic)) {log.info("payload: {}", message.getPayload().toString());}}};}}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;@Configuration
public class IotMqttSubscriberConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setServerURIs(mqttConfig.getHostUrl());return factory;}@Beanpublic MessageChannel iotMqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(iotMqttInputChannel());return adapter;}/*** @author xiaofu* @description 消息订阅* @date 2020/6/8 18:20*/@Bean@ServiceActivator(inputChannel = "iotMqttInputChannel")public MessageHandler handlerTest() {return message -> {try {String string = message.getPayload().toString();System.out.println("接收到消息:" + string);} catch (MessagingException ex) {//logger.info(ex.getMessage());}};}
}

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {// 向默认的 topic 发送消息void sendMessage2Mqtt(String payload);// 向指定的 topic 发送消息void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);// 向指定的 topic 发送消息,并指定服务质量参数void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
public class IotMqttProducerConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setServerURIs(mqttConfig.getHostUrl());return factory;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "iotMqttInputChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId(), mqttClientFactory());messageHandler.setAsync(false);messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());return messageHandler;}
}
@PostMapping("/send")@ApiOperation(value = "发送消息到指定主题")public String send(String message,String topic) {// 发送消息到指定主题mqttGateway.sendMessage2Mqtt(topic, 1, message);return "send topic: " + topic+ ", message : " + message;}

EMQX下载:https://www.emqx.io/zh
EMQX登录界面:http://localhost:18083/
参考文章:https://mp.weixin.qq.com/s?__biz=MzAxNTM4NzAyNg==&mid=2247486353&idx=1&sn=02371acc8048cb15f29285f0505c4958&chksm=9b859b6cacf2127a2799d28830dc6ef0560764d4d29b5d3c742dd7489d66365691c662e70ce6&token=632378703&lang=zh_CN#rd;
https://blog.csdn.net/f4233441/article/details/121829743
对EMQX使用不熟练,未实现相应功能。

springboot集成MQTT协议实现消息实时推送(未实现版)相关推荐

  1. php消息实时推送技术,基于HTTP协议之WEB消息实时推送技术原理及实现

    很早就想写一些关于网页消息实时推送技术方面的文章,但是由于最近实在忙,没有时间去写文章.本文主要讲解基于 HTTP1.1 协议的 WEB 推送的技术原理及实现.本人曾经在工作的时候也有做过一些用到网页 ...

  2. java消息推送怎么实现_PHP实现的消息实时推送功能

    本文实例讲述了PHP实现的消息实时推送功能.分享给大家供大家参考,具体如下: 入口文件index.html <!DOCTYPE HTML> <html> <head> ...

  3. 消息推送服务器推pc,PC浏览器消息实时推送的解决方案 ——EPush推送平台

    原标题:PC浏览器消息实时推送的解决方案 --EPush推送平台 陈华 研发工程师,2014入职去哪儿网.参与研发的EPush推送平台,增强了订单推送的时效性,提高了酒店自助订单处理率.最近负责CEQ ...

  4. html站内消息列表,WebSocket实现站内消息实时推送

    关于WebSocket WebSocket是HTML5 开始提供的一种在单个TCP连接上进行全双工通讯的协议.什么是全双工?就是在同一时间可以发送和接收消息,实现双向通信,比如打电话.WebSocke ...

  5. RabbitMQ(九):RabbitMQ 延迟队列,消息延迟推送(Spring boot 版)

    应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持 ...

  6. 基于RabbitMQ 的 Web MQTT插件进行前端消息实时推送

    目录 RabbitMQ What is AMQP, MQTT, STOMP ? How to use RabbitMQ with MQTT ? 1. Docker 安装RabbitMQ 2. MQTT ...

  7. EventBridge 实践场景:流计算 Oceanus 告警消息实时推送

    导语 本文演示了如何捕获流计算 Oceanus (Flink) 集群状态变更,并通过事件总线(EventBridge)发送到企业微信或钉钉.飞书客户端. 背景介绍 监控与报警系统对于业务生产环境来说是 ...

  8. 飞鸽快信微信消息-LINUX服务器告警、状态监控预警、程序报警提醒等消息实时推送工具

    一.产品介绍: 飞鸽快信微信消息是一款基于微信的实时消息推送产品,可应用于服务器告警.LINUX服务器日志.状态监控异常提醒.阀值预警.程序报错报警提醒等场景,使用微信消息代替短信发送警告通知,实现精 ...

  9. php弹幕技术轮询,PHP实现长轮询消息实时推送功能代码

    入口文件index.html 反ajax推送 .send{color:#555;text-align: left;} .require{color:blue;text-align: right;} . ...

最新文章

  1. parse_str与http_build_query的使用
  2. 餐厅点餐系统:测试与部署
  3. python实现常见排序算法
  4. 将html表格导出到excel表格,table2excel-将HTML表格内容导出到Excel中_html/css_WEB-ITnose...
  5. 华师计算机基础在线作业秋,18秋华师《计算机基础》在线作业(20210408185935).pdf...
  6. RxHttp 一条链发送请求之强大的Param类(三)
  7. 【英语学习】【Daily English】U13 Holiday L04 How was it?
  8. 如何使用JMeter发送Post请求
  9. 关于TensorFlow的MNIST数据集下载脚本input_data.py的坑
  10. 一步一步手绘Spring AOP运行时序图(Spring AOP 源码分析)
  11. Hadoop面向行和面向列格式详解
  12. hdoj1159:Common Subsequence(dp基础题-最长公共子序列LCS)
  13. java 解析cron_Quartz 源码解析(六) —— 解析Cron表达式
  14. 头条 上传图片大小_遇到不会注册今日头条号,这么办?
  15. webpack5从零搭建一个项目
  16. 微信公众号二维码生成
  17. 以“实景+科幻三维建模渲染”,助力“实景三维中国建设”
  18. Jmeter中运行按钮点了没反应
  19. 关于获取安卓APP素材的方法
  20. Java基础面试题1:面向对象的思想

热门文章

  1. 学习汇编语言的重要性
  2. 下载spotify音乐_如何将Google Maps音乐控件用于Spotify,Apple Music或Google Play音乐
  3. 今日芯声 | 不寒而栗!针孔摄像头画质竟然是4K高清的
  4. mysql Can‘t open and lock privilege tables: Table ‘.\mysql\db‘ is marked as crashed .. repair failed
  5. 人际沟通与社交媒体(媒介沟通的优缺点)
  6. mybatis的原理详解
  7. win10读取不了U盘或者移动硬盘的解决方法
  8. 私有文件服务器,私有云文件服务器
  9. power bi PP页面(power bi Desktop)
  10. unrar - 解压rar文件