1、Spring Boot 和 ActiveMQ 、RabbitMQ 简介

  最近因为公司的项目需要用到 Spring Boot , 所以自学了一下, 发现它与 Spring 相比,最大的优点就是减少了配置, 看不到 xml 文件的配置, 而是用 appplication.yml 或者 application.propertites 文件来代替 , 再也不用配置 tomcat 环境了, 因为 spring boot 已经将 tomcat 环境整合到里面了。入门可以去 http://spring.io 官网, 上面有一系列介绍 。

  本次项目开发中还用到了 ActiveMQ 和 RabbitMQ , 这是两个消息队列,我直到完成模块都不能真正理解消息队列。 关于消息队列的定义和使用场景这篇博客写得十分清楚:

https://blog.csdn.net/KingCat666/article/details/78660535,几个不同的消息队列之间的比较 : https://blog.csdn.net/linsongbin1/article/details/47781187。我负责的任务是 Spring Boot 监听 ActiveMQ 中特定的 topic,并将消息使用 RabbitMq 发布出去。

2、配置环境

  2.1 ·使用 maven 构建 Spring Boot 运行环境, 在 pom.xml 文件中加入如下依赖:

<properties>    <project.build.sourceEncoding>UTF8    </project.build.sourceEncoding>    <java.version>1.8</java.version></properties><dependencies>    <!-- Springboot -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>        <version>2.0.7.RELEASE</version>    </dependency>    <!-- rabbitmq -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId>        <version>2.0.7.RELEASE</version>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-integration</artifactId>        <version>2.0.7.RELEASE</version>    </dependency>    <dependency>        <groupId>org.springframework.integration</groupId>        <artifactId>spring-integration-stream</artifactId>        <version>5.0.7.RELEASE</version>    </dependency>    <dependency>        <groupId>org.springframework.integration</groupId>        <artifactId>spring-integration-mqtt</artifactId>        <version>5.0.7.RELEASE</version>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter</artifactId>        <version>2.0.7.RELEASE</version>    </dependency>

    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <version>2.0.7.RELEASE</version>        <scope>test</scope>    </dependency>    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-test -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-test</artifactId>        <version>2.0.7.RELEASE</version>        <scope>test</scope>    </dependency>    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-maven-plugin -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-maven-plugin</artifactId>        <version>2.0.7.RELEASE</version>    </dependency>    <dependency>        <groupId>junit</groupId>        <artifactId>junit</artifactId>        <version>4.10</version>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-test</artifactId>    </dependency>    <dependency>        <groupId>org.springframework</groupId>        <artifactId>spring-test</artifactId>        <version>4.3.7.RELEASE</version>        <scope>compile</scope>    </dependency></dependencies>
<build>    <plugins>        <plugin>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-maven-plugin</artifactId>        </plugin>    </plugins></build>

  2.2 下载并安装配置 active mq 和 rabbitmq 的运行环境

    activemq下载地址如下 : http://activemq.apache.org/download-archives.html

rabbitmq 是使用 erlang 写的, 所以先安装 erlang 环境, 再安装 rabbitmq-server, 现在我将这三个文件整合到了一起, 方便下载 :

链接: https://pan.baidu.com/s/1qdzMpqFwxR78rW7-ABpbCA  提取码: 7aqf 。下载完成以后, 其中比较复杂的是安装 erlang ,安装完以后新建 ERLAGN_HOME 添加到环境变量。                                  将 %ERLANG_HOME%\bin 添加到 path,然后安装 rabbit-server.exe, 安装完以后在进入 rabbit-server\sbin 目录下, 进入命令行,输入 rabbitmq-plugins enable rabbitmq_management 完成安装,

打开 sbin 目录,双击rabbitmq-server.bat , 启动成功之后访问 http://localhost:15672,默认账号密码都属 guest 。

将下载的 activemq 解压到某个目录下,进入该目录输入 cmd ,敲击 bin\activemq start , 有可能会报错,具体错误查看 data\activemq.log 文件。环境搭建成功以后, 开始干!

3、构建项目

  3、1 新建配置文件:

    新建 application.yml 文件,输入:

com:mqtt:inbound:url: tcp://127.0.0.1:1883clientId: familyServerIntopics: hello,topicoutbound:urls: tcp://127.0.0.1:1883clientId: familyServerOuttopic: topic1spring:rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: rootvirtualHost: /listener:concurrency: 2max-concurrency: 2main:web-application-type: nonemqtt:username: admin
#MQTT-密码password: admin
#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613url: tcp://127.0.0.1:1883
#MQTT-连接服务器默认客户端IDclient:id: mqttId
#MQTT-默认的消息推送主题,实际可在调用接口时指定default:topic: topic
#连接超时completionTimeout: 3000

  3.2 新建配置类 MQttSenderConfig.java

在这里主要配置了 connectionFactory 和 channelFactory , 值得注意的是在方法 handler() 里面通过监听信道 mqttOutboundChannel 获得了 topic 并将其转发给 RabbitMQ 队列中, topicSender.send(message.getPayload().toString()); 这一行代码将消息发送到 RabbitMQ 队列中 、/*

/*** 〈一句话功能简述〉<br> * 〈MQTT发送消息配置〉** @author root* @create 2018/12/20* @since 1.0.0*/
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;@Value("${spring.mqtt.completionTimeout}")private int completionTimeout ;   //连接超时
    @Autowiredprivate TopicSender topicSender;@Beanpublic MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{hostUrl});mqttConnectOptions.setKeepAliveInterval(2);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}//mqttOutboundChannel
    @Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}//接收通道
    @Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}//配置client,监听的topic
    @Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),"topic","hello");adapter.setCompletionTimeout(completionTimeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttOutboundChannel());return adapter;}//通过通道获取数据
  @Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
//                String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());if("hello".equalsIgnoreCase(topic)){System.out.println("hello,fuckXX," + message.getPayload().toString());topicSender.send(message.getPayload().toString());}else if("topic".equalsIgnoreCase(topic)){System.out.println("topic,fuckXX," + message.getPayload().toString());topicSender.send(message.getPayload().toString());}}};}}

  3.2 新建配置类 RabbitConfig.java

配置了两个队列 rabbittopic 和 rabbittopic.queue2 , 申明了消息交换器 topicExchange, 通过 key 来绑定, 关于 key 和 路由绑定参考这篇文章 : https://www.jianshu.com/p/04f443dcd8bd 。

@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {//声明队列
    @Beanpublic Queue queue1() {return new Queue("rabbitopic", true); // true表示持久化该队列
    }@Beanpublic Queue queue2() {return new Queue("rabbitopic.queue2", true);}//声明交互器
    @BeanTopicExchange topicExchange() {return new TopicExchange("topicExchange");}//绑定
    @Beanpublic Binding binding1() {return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");}@Beanpublic Binding binding2() {return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");}@Beanpublic DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();factory.setMessageConverter(new MappingJackson2MessageConverter());return factory;}//queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象
    @Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// factory.setPrefetchCount(5);//指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//MANUAL:将ACK修改为手动确认,避免消息在处理过程中发生异常造成被误认为已经成功消费的假象。//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}@Overridepublic void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());}
}

  3.3 新建MqttGateway.java

  新建 MqttGateWay 接口,设置默认的信道 。

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}

余下代码就不再一一往上贴了 : 具体 demo:https://github.com/blench/mqtt.git

4、遇到的错误及解决办法

    1、发送数据后 rabbitmq  一直在接收数据,原因是监听 RabbitMQ 队列消息的方法写错了, 例如:

 @RabbitListener(queues = "rabbitopic")public void processMessage1(String msg) {
//        Message message = rabbitTemplate.receive(10000);System.out.println(" 接收到来自rabbitopic队列的消息:" + msg);return;}

接收监听的方法不能有返回值, 只能为 void .

  2、配置错误, 中途有一次启动失败,是由于代码的配置问题。

最后启动项目, 在 active mq 中新建 topic 和 hello 主题 , 添加测试内容发送。 控制台下可打印出相应的消息 。

5、总结

  虽然这次匆匆忙忙写完了代码,但是对于 RabbitMQ 和 ActiveMQ 只是有了初步的了解, 未来的工作中还会继续学习的 。

参考文档:

https://www.jianshu.com/p/6ca34345b796

https://www.jianshu.com/p/db8391dc1f63

http://blog.sina.com.cn/s/blog_7479f7990100zwkp.html

转载于:https://www.cnblogs.com/zhuixun/p/10149288.html

Spring Boot 监听 Activemq 中的特定 topic ,并将数据通过 RabbitMq 发布出去相关推荐

  1. Spring Boot监听事件同步和异步使用

    废话前言: 代码环境:WIN7+IDEA+JAD1.8+Spring Boot 2.0 首先说一下我为什么使用事件,比如现在创建一个订单但是我创建成功后要给客户发送一条短信和一个邮件提醒,本身没创建订 ...

  2. Spring Boot 监听 Redis Key 失效事件实现定时任务

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:彭超 https://antoniopeng.com 业务场 ...

  3. Spring5源码 - 11 Spring事件监听机制_源码篇

    文章目录 pre 事件监听机制的实现原理[观察者模式] 事件 ApplicationEvent 事件监听者 ApplicationEvent 事件发布者 ApplicationEventMultica ...

  4. Spring5源码 - 13 Spring事件监听机制_@EventListener源码解析

    文章目录 Pre 概览 开天辟地的时候初始化的处理器 @EventListener EventListenerMethodProcessor afterSingletonsInstantiated 小 ...

  5. Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析

    文章目录 Pre 实现原理 应用 配置类 Event事件 事件监听 EventListener 发布事件 publishEvent 源码解析 (反推) Spring默认的事件广播器 SimpleApp ...

  6. Spring事件监听原理

    1 简述Spring的生命周期 不论是Spring的监听机制原理还是Spring AOP的原理,都是依托于Spring的生命周期,所以要了解Spring的监听机制原理就需要先了解Spring的生命周期 ...

  7. spring 事件监听

    用一个简单的例子来实现spring事件监听的功能 这个例子主要功能是,记录那些用户是第一次登入系统,如果用户是第一次登入系统,则调用spring的事件监听,记录这些用户. 主要用到的spring的类和 ...

  8. vue 监听对象里的特定数据

    2019独角兽企业重金招聘Python工程师标准>>> vue  监听对象里的特定数据变化 通常是这样写的,只能监听某一个特定数据 watch: {params: function( ...

  9. Vue.js开发记录--用watch监听对象中属性的变化

    监听对象中所有属性 存在对象obj,若想要监听其中所有的值的变化 watch: {obj: {handler (val) {// coding},deep: true}, } 监听对象中某个属性 如果 ...

  10. Spring Boot Web应用程序中注册 Servlet 的方法实例

    Spring Boot Web应用程序中注册 Servlet 的方法实例 本文实例工程源代码:https://github.com/KotlinSpringBoot/demo1_add_servlet ...

最新文章

  1. (C++)高精度整数的存储、读入、比较和四则运算
  2. python opencv生成 html5 支持的mp4
  3. markdown分享
  4. 考勤系统的业务概念图
  5. URI 和 URL 的区别
  6. 计算机视觉基础-图像处理(边缘检测)cpp+python
  7. GCC、VS对C++标准的支持情况总结(转载)
  8. [C] C语言中的布尔值
  9. 《从0到1:CTFer成长之路》书籍配套题目-[第二章 web进阶]死亡ping命令
  10. jenkins构建报错: ssh: connect to host github.com port 22: Connection timed out
  11. 分享20个增长黑客经典案例。
  12. 华为机试---Word Maze迷宫游戏
  13. 字节跳动 Go RPC 框架 KiteX 性能优化实践
  14. Shiro实现多域名登录界面
  15. 水果店圈子:水果店坏水果应该怎么处理,水果店卖剩下的水果如何处理
  16. 我怎么就被一张照片出卖了?可怕!
  17. SpiffWorkflow定制工作流
  18. 深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析
  19. (你也可以像别人一样对框架底层源码来去自如)23种设计模式之外观模式
  20. uniapp微信小程序引入第三方字体库

热门文章

  1. web前端--面试题
  2. python实现whois查询_python3实现域名查询和whois查询
  3. Qt QLineEdit自带右键菜单的翻译
  4. 谷歌怎么设置下载位置
  5. 工具系列 | FPM进程管理器详解
  6. 如果生活将我们拆散了
  7. 搜狗云输入法,实现原理.
  8. 人口流向数据_中国人口流动数据挖掘分析云平台
  9. 欧洲杯第一周的比赛闲聊
  10. 移动端rem适配(375)设计稿