springBoot2 整合activemq
SpringBoot-ActiveMq
一个简单的整合案例, 初步实现 activemq
的 queue
队列模式 和 topic
订阅模式 (普通订阅, 持久订阅)
准备工作
- springboot2.0.3
- 如果要使用
activemq pool
需要另外加入activemq-pool
包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 启用JMS 的池化, 就一定要加上这个 jar--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId></dependency>
简介
spring-jms-*.RELEASE.jar
有提供 JmsMessagingTemplate
给我们使用, 只需要在配置类上加上 @EnableJms
即可,无特殊要求,我们直接拿来用即可.
queue模式 与 topic模式
**queue模式**
点对点的发送信息
, 点对点通信,每个消息只有一个消费者,消息保证送达,离线消费者可以在下次上线后收到之前积压的消息。
案例:
如果生产者产生了100条消息,那么两个消费同时在的话,会分工合作来接收这100条消息。就是每个消费者接收到50条来处理。
**topic模式**
广播形式(主题模式)
当前有几个客户端在线,就发送几条广播给客户端。 案例
如果生产者产生了100条消息,消费者在还没有订阅这个主题之前,是不会接收到这100条消息的。
消费者只有在订阅了某个主题消息之后,生产者产生的消息 才会被接收处理。如果又两个消费者同时订阅了这个主题消息,生产者在产生100条消息时,两个消费者会同时分别接收到这100条消息。
发送
package com.example.dxp.web;import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import javax.jms.Destination;/*** @author carzy* @date 2018/08/01*/
@RestController
public class TestController {private JmsMessagingTemplate jmsTemplate;@Autowiredpublic void setJmsTemplate(JmsMessagingTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}@GetMapping("jms/queue")public void jmsQueueTemplate(@RequestParam String value) {Destination destination = new ActiveMQQueue("queueTest");this.jmsTemplate.convertAndSend(destination, value);}@GetMapping("jms/topic")public void jmsTopicTemplate(@RequestParam String value) {// 可以将以下步骤封装进service 层, 并暴露出一个 destinationName 和 message 出来Destination destination = new ActiveMQTopic("topicTest");this.jmsTemplate.convertAndSend(destination, value);}}
发送 queue模式
消息
JmsMessagingTemplate
通过 convertAndSend()
方法将消息发送至 activemq
中, 这个方法其中有一个重载方法需要我们穿入一个 Destination
接口对象,而这个 对象就决定的了我们的消息
发送到哪里, 如上代码,我们用的是 Destination
的一个实现类 ActiveMQQueue
, 表示我们的消息将发送到 queues
下. 代项目启动,发送成功了, 可在浏览器上访问在 activemq
服务的 8161
端口,
在 Queues
也签下可看见对应的 queue name, queueTest
发送 广播形式
消息
与 queue模式
类似, 无非是将 Destination
的实现类换成 ActiveMQTopic
, 在浏览器的 Topics
也签也可看见名为 topicTest
的topic 通道
监听
系统默认采用的是 queue模式
模式, 所以我们需要做一下设置,来更改默认的配置
package com.example.dxp;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;import javax.jms.ConnectionFactory;/*** @author carzy* @date 2018/08/01*/
@Configuration
@EnableJms
public class JmsConfig {@Bean("jmsQueueListenerContainerFactory")public JmsListenerContainerFactory jmsQueueListenerContainerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("3-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setPubSubDomain(false);return factory;}@Bean("jmsTopicListenerContainerFactory")public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//重连间隔时间factory.setPubSubDomain(true);return factory;}}
监听 queue模式
消息
我们定义了一个自己的 监听工厂
, 并且设置它的 factory.setPubSubDomain(false)
, 表示它是queue模式下的监听工厂.然后我们在使用 @JmsListener
进行监听的时候,设置上 这个工厂即可
package com.example.dxp.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/*** @author carzy* @date 2018/08/01*/
@Component
public class QueueReceiver {@JmsListener(destination = "queueTest", containerFactory = "jmsQueueListenerContainerFactory")public void receive(String msg) {System.out.println("queue1 监听到的消息内容为: " + msg);}@JmsListener(destination = "queueTest", containerFactory = "jmsQueueListenerContainerFactory")public void receive2(String msg) {System.out.println("queue2 监听到的消息内容为: " + msg);}
}
监听 广播形式
消息
与上面相反的地方就是 factory.setPubSubDomain(true)
表示它是一个 topic模式下的监听工厂,在监听的时候,只需要将 @JmsListener
的 containerFactory
指向它即可
package com.example.dxp.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/*** @author carzy* @date 2018/08/01*/
@Component
public class TopicReceiver {@JmsListener(destination = "topicTest", containerFactory = "jmsTopicListenerContainerFactory")public void receive(String msg) {System.out.println("topicTest1 监听到的消息内容为: " + msg);}@JmsListener(destination = "topicTest", containerFactory = "jmsTopicListenerContainerFactory")public void receive2(String msg) {System.out.println("topicTest2 监听到的消息内容为: " + msg);}
}
监听 广播形式
消息, 持久订阅
/*** 持久订阅*/@Bean("jmsTopicListenerContainerFactory2")public JmsListenerContainerFactory jmsTopicListenerContainerFactory2(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//重连间隔时间factory.setRecoveryInterval(1000L);factory.setPubSubDomain(true);// 给订阅者一个名字,并开启持久订阅factory.setClientId("client_id");factory.setSubscriptionDurable(true);return factory;}
在 jmsConfig
中 配置类中加入 bean
, 与 普通订阅
基本相同, 出别在与 factory.setClientId("client_id")
, 和 factory.setSubscriptionDurable(true)
. 未订阅者设置一个 客户端ID,并且开启持久订阅的功能,
这样在的话, 在你订阅某一个主题之后, mq服务会记住你的 ClientId
, 下次即使你不在线, 消息会被持久化到硬盘, 等你上线后再发送给你. 监听如下:
@Component
public class Topic2Receiver {@JmsListener(destination = "topicTest", containerFactory = "jmsTopicListenerContainerFactory2")public void receive(String msg) {System.out.println("这是持久订阅: " + msg);}}
测试
普通订阅 和 列队模式
可通过浏览器访问 jms/queue?value=test
和 jms/topic?value=test
来出发消息的发送,来观察控制台获取信息的情况
持久化的测试
- 第一次启动, 会在浏览器
ActiveMQ
管理界面的Subscribers
也签多出一个ClientID
名为client_id
的订阅者.Active Durable Topic Subscribers
表示当前持久订阅者在线,Offline Durable Topic Subscribers
表示当前
持久订阅者不在线. - 关闭你的服务, 注释掉
Topic2Receiver
中订阅的方法, 再次启动项目.Offline Durable Topic Subscribers
中会出现你的clientID
, 表示这个订阅者下线了. 你通过jms/topic?value=test
这个接口发送消息, 会发现普通订阅者
正常的收到消息 - 关闭服务, 打开刚才注释调的代码. 重新启动项目, 会发现上一次你发送的消息, 这个时候才收到
springBoot2 整合activemq相关推荐
- SpringBoot2.0源码分析(二):整合ActiveMQ分析
SpringBoot具体整合ActiveMQ可参考:SpringBoot2.0应用(二):SpringBoot2.0整合ActiveMQ ActiveMQ自动注入 当项目中存在javax.jms.Me ...
- SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.21 SpringBoot 整合 ActiveMQ
SpringBoot [黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)] SpringBoot 开发实用篇 文章目录 Spring ...
- Spring整合ActiveMQ完成消息队列MQ编程
<–start–> 第一步:新建一个maven,将工程命名为activeMQ_spring.在pom.xml文件中导入相关jar包. ①spring开发和测试相关的jar包: spring ...
- springboot2整合mysql5_SpringBoot2整合SSM框架详解
SpringBoot2整合SSM框架详解 发布时间:2019-01-15 21:33, 浏览次数:1218 , 标签: SpringBoot SSM <>开发环境 * 开发工具:Eclip ...
- 实战07_SSM整合ActiveMQ支持多种类型消息
接上一篇:企业实战06_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572147 1.S ...
- 实战06_SSM整合ActiveMQ支持多种类型消息
接上一篇:企业实战05_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572129 1.S ...
- 实战05_SSM整合ActiveMQ支持多种类型消息
接上一篇:实战04_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572124 1.Str ...
- 04_SSM整合ActiveMQ支持多种类型消息
接上一篇:企业实战03_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572104 1.S ...
- 实战03_SSM整合ActiveMQ支持多种类型消息
接上一篇:企业实战02_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572053 1.S ...
最新文章
- OSPF-网络类型(ip ospf network ?)
- UVA 11255 Necklace
- Lucene从入门到进阶(6.6.0版本)
- 基于八叉树快速分类的Shear-Warp交互式体绘制算法
- Oracle拆分字符串函数与执行调用
- 获取存储过程返回值及代码中获取返回值
- putty连虚拟机中Linux出现Access Denied
- c语言学生对老师的评教系统,学生对老师的评价
- 网络管理员&MCSE2003之2:使用虚拟机Vmware建立多电脑网络环境
- 微软不允许用户卸载 Chromium Edge 浏览器
- 代码之美~利用构造方法实现模块的解耦
- webservice 教程学习系列(八)——wsdl文档深入分析
- nothing else left on those streets
- NOI 1.11(02)二分法求函数的零点
- 英伟达辟谣 RTX 3060 被破解传闻
- CDH安装指南——酒仙网技术
- Matlab数值分析编程:牛顿下山法解方程
- python数字2大写汉字转换(金额转换)
- 独孤思维:赚钱项目的内卷和躺平
- 2020年复旦大学计算机学院夏令营经历