2019独角兽企业重金招聘Python工程师标准>>>

基础配置,应用spring integration模块集成mqtt协议通道

applicationContext-mqtt.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"xmlns:mqtt="http://www.springframework.org/schema/integration"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/integrationhttp://www.springframework.org/schema/integration/spring-integration.xsd"><int:channel id="initMsgChannel" /><int:outbound-channel-adapter ref="mqttInitMsg"channel="initMsgChannel" /><int:channel id="ctrlMsgChannel" />    <int:outbound-channel-adapter ref="sendCtrlMsg"channel="ctrlMsgChannel" /><int:channel id="hostMsgChannel" />  <int:outbound-channel-adapter ref="sendHostMsg"channel="hostMsgChannel" /></beans>

mqtt客户端连接工厂,实现初始化配置类接口,目的是能够在启动时加载

MqttClientFactoryBean.java

package youway.service.mqtt;import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;public class MqttClientFactoryBean implements InitializingBean,FactoryBean<IMqttClient>{private static String TCP_PROTOCOL = "tcp://";private static String SSL_PROTOCOL = "ssl://";private String protocol = TCP_PROTOCOL;private boolean useSsl = false;private String host;private int port = 1883;private String clientId = buildClientId();private MqttClientPersistence mqttClientPersistence;private String username, password;private MqttConnectOptions mqttConnectOptions;private Boolean cleanSession = null;public MqttClientFactoryBean() {}public MqttClientFactoryBean(String host) {setup(host, this.username, this.password);}public MqttClientFactoryBean(String host, String u, String p) {setup(host, u, p);}public MqttClientFactoryBean(String host, int port, String u, String p) {setup(host, u, p);this.setPort(port);}public void setup(String h, String u, String p) {setHost(h);setUsername(u);setPassword(p);}public void setCleanSession(boolean cleanSession) {this.cleanSession = cleanSession;}public void setPassword(String p) {this.password = p;}public void setUsername(String u) {this.username = u;}public void setMqttConnectOptions(MqttConnectOptions mqttConnectOptions) {this.mqttConnectOptions = mqttConnectOptions;}public void setClientId(String c) {this.clientId = c;}public void setProtocol(String protocol) {this.protocol = protocol;}public void setUseSsl(boolean useSsl) {this.useSsl = useSsl;}public void setHost(String host) {this.host = host;}public String getHost() {return host;}public void setPort(int port) {this.port = port;}public void setMqttClientPersistence(MqttClientPersistence mqttClientPersistence) {this.mqttClientPersistence = mqttClientPersistence;}@Overridepublic IMqttClient getObject() throws Exception {String serverUri = buildServerUri();MqttClient client = this.mqttClientPersistence == null ?new MqttClient(serverUri, clientId) :new MqttClient(serverUri, clientId, mqttClientPersistence);MqttConnectOptions connectOptions = this.buildMqttConnectionOptions();if (null != connectOptions) {client.connect(connectOptions);} else {client.connect();}return client;}@Overridepublic Class<?> getObjectType() {return IMqttClient.class;}@Overridepublic boolean isSingleton() {return true;}@Overridepublic void afterPropertiesSet() throws Exception {Assert.hasText(this.protocol, String.format("you must specify a non-null protocol value (either %s or %s)", SSL_PROTOCOL, TCP_PROTOCOL));Assert.isTrue(this.protocol.equalsIgnoreCase(SSL_PROTOCOL) || this.protocol.equalsIgnoreCase(TCP_PROTOCOL), "");Assert.hasText(this.clientId, "your clientId must be non-null");Assert.hasText(this.host, "you must specify a valid host");Assert.isTrue(this.port > 0, "you must specify a valid port");boolean connectionOptionsAreCorrectlySpecified =this.mqttConnectOptions != null && weShouldCreateConnectionOptions();Assert.isTrue(!connectionOptionsAreCorrectlySpecified,String.format("you must specify an instance of %s for the 'buildMqttConnectionOptions' attribute" +" OR any of the following options ('cleanSession', 'username', 'password'), but not both!", MqttConnectOptions.class.getName()));}protected String buildServerUri() {if (this.useSsl) {this.protocol = SSL_PROTOCOL;}return this.protocol + this.host + ":" + this.port;}protected boolean weShouldCreateConnectionOptions() {return (this.cleanSession != null || StringUtils.hasText(this.username) || StringUtils.hasText(this.password));}protected String buildClientId() {String user = System.getProperty("user.name");int totalLength = 23;int userLength = user.length();if (userLength > 10) {user = user.substring(0, 10);}String clientId = user + System.currentTimeMillis();Assert.isTrue(clientId.length() <= totalLength);return clientId;}protected MqttConnectOptions buildMqttConnectionOptions() {MqttConnectOptions connectOptions = null;if (weShouldCreateConnectionOptions()) {connectOptions = new MqttConnectOptions();connectOptions.setCleanSession(this.cleanSession);connectOptions.setUserName(this.username);connectOptions.setPassword(this.password.toCharArray());} else if (this.mqttConnectOptions != null) {connectOptions = this.mqttConnectOptions;}return connectOptions;}
}

订阅方法类

MqttMessageHandler.java

package youway.service.mqtt;import java.io.UnsupportedEncodingException;import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.joda.time.DateTime;
import org.springframework.integration.Message;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.util.Assert;import youway.service.mqtt.msg.IMsgHandle;
import youway.service.mqtt.msg.impl.AaaMsgHandle;
import youway.service.mqtt.msg.impl.BbbMsgHandle;import com.fuzhi.util.SpringContextUtils;public class MqttMessageHandler extends AbstractMessageHandler implementsMqttCallback {private IMqttClient client;private String topic;private boolean messagesRetained;private QualityOfService qualityOfService = QualityOfService.AT_LEAST_ONCE;public MqttMessageHandler() {}public MqttMessageHandler(IMqttClient client, String topic) {setClient(client);setTopic(topic);}@Overrideprotected void onInit() throws Exception {Assert.notNull(this.client, String.format("you must specify a valid %s instance! ",MqttClient.class.getName()));Assert.hasText(this.topic, "you must specify a 'topic'");Assert.notNull(this.qualityOfService, String.format("you must specify a non-null instance of the %s enum.",QualityOfService.class.getName()));}public void setClient(IMqttClient client) {this.client = client;}public void setQualityOfService(QualityOfService qualityOfService) {this.qualityOfService = qualityOfService;}public void setMessagesRetained(boolean messagesRetained) {this.messagesRetained = messagesRetained;}public void setTopic(String topic) {this.topic = topic;}public String getTopic() {return topic;}public IMqttClient getClient() {return client;}@Overrideprotected void handleMessageInternal(Message<?> message) throws Exception {Object payload = message.getPayload();Assert.isTrue(payload instanceof byte[], String.format("the payload for %s must be of type byte[]", getClass().getName()));byte[] payloadOfBytes = (byte[]) payload;client.publish(this.topic, payloadOfBytes,this.qualityOfService.ordinal(), this.messagesRetained);// client.subscribe(MqttHeaders.TOPIC);client.subscribe("a");client.subscribe("b");client.setCallback(this);}@Overridepublic void connectionLost(Throwable arg0) {// 处理重连logger.debug("开始重连......");String time = (new DateTime()).toString();while (true) {try {client.connect();break;} catch (MqttSecurityException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}MqttService.initMessage("重连:"+ time);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// TODO Auto-generated method stub}@Overridepublic void messageArrived(String topic, MqttMessage msg) {try {String content = new String(msg.getPayload(), "UTF-8");logger.debug("主题:" + topic + "  内容:" + content);IMsgHandle msgHandle = getHandle(topic);msgHandle.handle(topic, content);logger.debug("..................消息处理完成................");} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}/*** 根据接收的topic,生成对应的消息处理对象* * @param topic* @return*/public IMsgHandle getHandle(String topic) {IMsgHandle msgHandle = null;switch (topic) {case "/a":msgHandle = SpringContextUtils.getBean("AaaMsgHandle",AaaMsgHandle.class);break;case "/b":msgHandle = SpringContextUtils.getBean("BbbMsgHandle",BbbMsgHandle.class);break;return msgHandle;}
}

客户端总体调用处理

MqttService.java

package youway.service.mqtt;import static org.slf4j.LoggerFactory.getLogger;import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.slf4j.Logger;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;public class MqttService {private static final Logger logger = getLogger(MqttService.class);private static AnnotationConfigApplicationContext context1 = new AnnotationConfigApplicationContext(MqttConfiguration.class);private static AnnotationConfigApplicationContext context2 = new AnnotationConfigApplicationContext(MqttConfiguration.class);private static AnnotationConfigApplicationContext context3 = new AnnotationConfigApplicationContext(MqttConfiguration.class);private static final String TOPIC_CTRL_MSG = "/ctrlMsg";private static final String TOPIC_HOST_MSG = "/hostMsg";public static void initMessage(String content) {reConnect(context1); //重连logger.debug("------------initMessage start---------------");MessageChannel messageChannel = context1.getBean("initMsgChannel",MessageChannel.class);messageChannel.send(MessageBuilder.withPayload(content.getBytes()).build());}/*** 发送控制方案信息通道* * @param content* @throws Exception*/public static void sendCtrlMsg(String content) {reConnect(context2); //重连logger.debug("------------sendCtrlMsg ---------------");MessageChannel messageChannel = context2.getBean("ctrlMsgChannel",MessageChannel.class);messageChannel.send(MessageBuilder.withPayload(content.getBytes()).build());}/*** 发送主机信息通道* * @param content* @throws Exception*/public static void sendHostMsg(String content) {reConnect(context3); //重连logger.debug("------------sendHostMsg ---------------");MessageChannel messageChannel = context3.getBean("hostMsgChannel",MessageChannel.class);messageChannel.send(MessageBuilder.withPayload(content.getBytes()).build());}@Configuration@ImportResource("classpath*:/applicationContext-mqtt.xml")public static class MqttConfiguration {@Beanpublic MqttClientFactoryBean mqttClientFactoryBean() {return new MqttClientFactoryBean("mqttx.gzdfbz.com"); }@Beanpublic MqttMessageHandler mqttInitMsg(IMqttClient client) {return new MqttMessageHandler(client, "/status");}@Beanpublic MqttSendingMessageHandler sendCtrlMsg(IMqttClient client) {return new MqttSendingMessageHandler(client, TOPIC_CTRL_MSG);}@Beanpublic MqttSendingMessageHandler sendHostMsg(IMqttClient client) {return new MqttSendingMessageHandler(client, TOPIC_HOST_MSG);}}/*** 重连* @throws InterruptedException*/private static void reConnect(AnnotationConfigApplicationContext context){while(context==null){logger.debug("-----reConnect()--------");context = new AnnotationConfigApplicationContext(MqttConfiguration.class);try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}
}

在监听器中加入启动方法

WebRootPathListener.java

package youway.web.listener;import javax.servlet.ServletContextEvent;import org.joda.time.DateTime;
import org.springframework.web.context.ContextLoaderListener;import youway.service.mqtt.MqttService;
import youway.util.IConstants;
/*** 获得webroot的物理路径.* @author youway**/
public class WebRootPathListener extends ContextLoaderListener {public void contextDestroyed(ServletContextEvent sce) {  }public void contextInitialized(ServletContextEvent sce) {  String webRootPath = sce.getServletContext().getRealPath("/");  System.setProperty("webRoot.path" , webRootPath);  try {System.out.println("MQTT线程启动......");String time = new DateTime().toString("yyyy-MM-dd HH:mm");MqttService.initMessage(time);} catch (Exception e) {e.printStackTrace();}}
}

转载于:https://my.oschina.net/youway/blog/521197

在spring web中启动mqtt相关推荐

  1. Spring Boot(号称Java当前最流行的开发框架) 中启动HTTPS

    Spring Boot(号称Java当前最流行的开发框架) 中启动HTTPS 说实话啊,这个框架是比较简单,但是数据库操作还是那么恶心,好比16岁的花姑娘配了一个80岁的老头,关于这一块,我会单独发布 ...

  2. java 方式配置ssm,关于SSM以及Spring boot中对于Spring MVC配置的问题

    SSM中 Spring MVC配置 传统的web.xml配置 web.xml contextConfigLocation classpath*:applicationContext.xml org.s ...

  3. Spring Cloud 中文文档

    Spring Cloud 官方文档 Spring Cloud为开发人员提供了用于快速构建分布式系统中某些常见模式的工具(例如,配置管理,服务发现,断路器,智能路由,微代理,控制总线).分布式系统的协调 ...

  4. spring 定时器注释_带注释的控制器– Spring Web / Webflux和测试

    spring 定时器注释 Spring Webflux和Spring Web是两个完全不同的Web堆栈. 但是, Spring Webflux继续支持基于注释的编程模型 使用这两个堆栈定义的端点可能看 ...

  5. 带注释的控制器– Spring Web / Webflux和测试

    Spring Webflux和Spring Web是两个完全不同的Web堆栈. 但是, Spring Webflux继续支持基于注释的编程模型 使用这两个堆栈定义的端点可能看起来相似,但是测试该端点的 ...

  6. Spring Boot中Web应用的统一异常处理

    为什么80%的码农都做不了架构师?>>>    我们在做Web应用的时候,请求处理过程中发生错误是非常常见的情况.Spring Boot提供了一个默认的映射:/error,当处理中抛 ...

  7. 模拟Spring如何在WEB中运行

    Spring在web中配置和普通的Java程序中有所区别,总结一下主要表现在以下几个方面: ①jar包不同,需要引入两个web的jar包 ②需要考虑IOC容器创建的时间 非 WEB 应用在 main ...

  8. spring boot中servlet启动原理

    启动过程及原理 1 spring boot 应用启动运行run方法 StopWatch stopWatch = newStopWatch();stopWatch.start();Configurabl ...

  9. Spring Boot————Web应用启动时自动执行ApplicationListener用法

    原文:<web服务启动spring自动执行ApplicationListener的用法> 引言 我们知道,一般来说一个项目启动时需要加载或者执行一些特殊的任务来初始化系统,通常的做法就是用 ...

最新文章

  1. 高速电路中的AC耦合电容
  2. android文字广告的循环滚动,android怎样写一个循环文字滚动的TextView
  3. 深入浅出解释FFT(七)——fft求频谱图和功率谱密度图
  4. Java 14 Hotspot 虚拟机垃圾回收调优指南!
  5. 一发模拟水题但是RE,暑假抽个时间改一改、、
  6. byte 类型比较_Java Grammar:数据类型
  7. iptv直播源m3u_Padavan 单线复用实现拨号上网加IPTV 操作记录
  8. linux 添加链接与删除链接(ln命令的用法)
  9. JAVA 获取系统环境变量
  10. Microsoft Caffe(msCaffe)无GPU快速配置
  11. WIN10和WIN11修改C盘用户文件夹名称
  12. mac启动台(launchpad)图标大小调整
  13. 23.1 智能DNS
  14. c++win32项目 如何显示后再删除一个绘图_50个CAD绘图小技巧,来get成倍提高绘图效率...
  15. 如何让你的程序员不要厌倦工作?
  16. 解决Linux服务器时差问题
  17. (result, consumed) = self._buffer_decode(data, self.errors, final)
  18. python数字金额转换为中文大写金额(角、分)
  19. 如何学好c++,还是好好看书籍吧!
  20. rz cz命令未找到

热门文章

  1. android selector下的设置背景属性值
  2. 简单说一下什么是回流和重绘
  3. Alpha发布用户使用报告
  4. stick footers布局
  5. 双人五子棋对战(需要EasyX图像库)
  6. 《Android开发艺术探索》读书笔记 (3) 第3章 View的事件体系
  7. linux7安装haproxy,Centos7 源码编译安装haproxy
  8. java actor akka_Actor 模型及Akka简介
  9. ajax的几种格式,jQuery-----jQuery的几种ajax获取json格式数据的方法
  10. java简单的面试题目_简单的面试题目,大跌眼镜的结果