SpringBoot-ActiveMq

一个简单的整合案例, 初步实现 activemqqueue 队列模式 和 topic 订阅模式 (普通订阅, 持久订阅)

准备工作

  • springboot2.0.3
  • 如果要使用 activemq pool 需要另外加入 activemq-pool
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--  启用JMS 的池化, 就一定要加上这个 jar--><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId></dependency>

简介

spring-jms-*.RELEASE.jar 有提供 JmsMessagingTemplate 给我们使用, 只需要在配置类上加上 @EnableJms 即可,无特殊要求,我们直接拿来用即可.

queue模式 与 topic模式

**queue模式** 点对点的发送信息, 点对点通信,每个消息只有一个消费者,消息保证送达,离线消费者可以在下次上线后收到之前积压的消息。
案例: 如果生产者产生了100条消息,那么两个消费同时在的话,会分工合作来接收这100条消息。就是每个消费者接收到50条来处理。

**topic模式** 广播形式(主题模式) 当前有几个客户端在线,就发送几条广播给客户端。 案例 如果生产者产生了100条消息,消费者在还没有订阅这个主题之前,是不会接收到这100条消息的。
消费者只有在订阅了某个主题消息之后,生产者产生的消息 才会被接收处理。如果又两个消费者同时订阅了这个主题消息,生产者在产生100条消息时,两个消费者会同时分别接收到这100条消息。

发送

package com.example.dxp.web;import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import javax.jms.Destination;/*** @author carzy* @date 2018/08/01*/
@RestController
public class TestController {private JmsMessagingTemplate jmsTemplate;@Autowiredpublic void setJmsTemplate(JmsMessagingTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}@GetMapping("jms/queue")public void jmsQueueTemplate(@RequestParam String value) {Destination destination = new ActiveMQQueue("queueTest");this.jmsTemplate.convertAndSend(destination, value);}@GetMapping("jms/topic")public void jmsTopicTemplate(@RequestParam String value) {// 可以将以下步骤封装进service 层, 并暴露出一个 destinationName 和 message 出来Destination destination = new ActiveMQTopic("topicTest");this.jmsTemplate.convertAndSend(destination, value);}}

发送 queue模式 消息

JmsMessagingTemplate 通过 convertAndSend() 方法将消息发送至 activemq 中, 这个方法其中有一个重载方法需要我们穿入一个 Destination 接口对象,而这个 对象就决定的了我们的消息
发送到哪里, 如上代码,我们用的是 Destination 的一个实现类 ActiveMQQueue, 表示我们的消息将发送到 queues下. 代项目启动,发送成功了, 可在浏览器上访问在 activemq 服务的 8161 端口,
Queues 也签下可看见对应的 queue name, queueTest

发送 广播形式 消息

queue模式 类似, 无非是将 Destination 的实现类换成 ActiveMQTopic, 在浏览器的 Topics 也签也可看见名为 topicTest 的topic 通道

监听

系统默认采用的是 queue模式 模式, 所以我们需要做一下设置,来更改默认的配置

package com.example.dxp;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;import javax.jms.ConnectionFactory;/*** @author carzy* @date 2018/08/01*/
@Configuration
@EnableJms
public class JmsConfig {@Bean("jmsQueueListenerContainerFactory")public JmsListenerContainerFactory jmsQueueListenerContainerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置连接数factory.setConcurrency("3-10");//重连间隔时间factory.setRecoveryInterval(1000L);factory.setPubSubDomain(false);return factory;}@Bean("jmsTopicListenerContainerFactory")public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//重连间隔时间factory.setPubSubDomain(true);return factory;}}

监听 queue模式 消息

我们定义了一个自己的 监听工厂, 并且设置它的 factory.setPubSubDomain(false), 表示它是queue模式下的监听工厂.然后我们在使用 @JmsListener 进行监听的时候,设置上 这个工厂即可

package com.example.dxp.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/*** @author carzy* @date 2018/08/01*/
@Component
public class QueueReceiver {@JmsListener(destination = "queueTest", containerFactory = "jmsQueueListenerContainerFactory")public void receive(String msg) {System.out.println("queue1 监听到的消息内容为: " + msg);}@JmsListener(destination = "queueTest", containerFactory = "jmsQueueListenerContainerFactory")public void receive2(String msg) {System.out.println("queue2 监听到的消息内容为: " + msg);}
}

监听 广播形式 消息

与上面相反的地方就是 factory.setPubSubDomain(true) 表示它是一个 topic模式下的监听工厂,在监听的时候,只需要将 @JmsListenercontainerFactory 指向它即可

package com.example.dxp.jms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/*** @author carzy* @date 2018/08/01*/
@Component
public class TopicReceiver {@JmsListener(destination = "topicTest", containerFactory = "jmsTopicListenerContainerFactory")public void receive(String msg) {System.out.println("topicTest1 监听到的消息内容为: " + msg);}@JmsListener(destination = "topicTest", containerFactory = "jmsTopicListenerContainerFactory")public void receive2(String msg) {System.out.println("topicTest2 监听到的消息内容为: " + msg);}
}

监听 广播形式 消息, 持久订阅

    /*** 持久订阅*/@Bean("jmsTopicListenerContainerFactory2")public JmsListenerContainerFactory jmsTopicListenerContainerFactory2(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//重连间隔时间factory.setRecoveryInterval(1000L);factory.setPubSubDomain(true);// 给订阅者一个名字,并开启持久订阅factory.setClientId("client_id");factory.setSubscriptionDurable(true);return factory;}

jmsConfig 中 配置类中加入 bean, 与 普通订阅 基本相同, 出别在与 factory.setClientId("client_id"), 和 factory.setSubscriptionDurable(true). 未订阅者设置一个 客户端ID,并且开启持久订阅的功能,
这样在的话, 在你订阅某一个主题之后, mq服务会记住你的 ClientId, 下次即使你不在线, 消息会被持久化到硬盘, 等你上线后再发送给你. 监听如下:

@Component
public class Topic2Receiver {@JmsListener(destination = "topicTest", containerFactory = "jmsTopicListenerContainerFactory2")public void receive(String msg) {System.out.println("这是持久订阅: " + msg);}}

测试

普通订阅 和 列队模式

可通过浏览器访问 jms/queue?value=testjms/topic?value=test 来出发消息的发送,来观察控制台获取信息的情况

持久化的测试

  • 第一次启动, 会在浏览器 ActiveMQ 管理界面的 Subscribers 也签多出一个 ClientID 名为 client_id 的订阅者. Active Durable Topic Subscribers 表示当前持久订阅者在线, Offline Durable Topic Subscribers 表示当前
    持久订阅者不在线.
  • 关闭你的服务, 注释掉 Topic2Receiver 中订阅的方法, 再次启动项目. Offline Durable Topic Subscribers 中会出现你的 clientID, 表示这个订阅者下线了. 你通过 jms/topic?value=test 这个接口发送消息, 会发现普通订阅者 正常的收到消息
  • 关闭服务, 打开刚才注释调的代码. 重新启动项目, 会发现上一次你发送的消息, 这个时候才收到

springBoot2 整合activemq相关推荐

  1. SpringBoot2.0源码分析(二):整合ActiveMQ分析

    SpringBoot具体整合ActiveMQ可参考:SpringBoot2.0应用(二):SpringBoot2.0整合ActiveMQ ActiveMQ自动注入 当项目中存在javax.jms.Me ...

  2. SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.21 SpringBoot 整合 ActiveMQ

    SpringBoot [黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)] SpringBoot 开发实用篇 文章目录 Spring ...

  3. Spring整合ActiveMQ完成消息队列MQ编程

    <–start–> 第一步:新建一个maven,将工程命名为activeMQ_spring.在pom.xml文件中导入相关jar包. ①spring开发和测试相关的jar包: spring ...

  4. springboot2整合mysql5_SpringBoot2整合SSM框架详解

    SpringBoot2整合SSM框架详解 发布时间:2019-01-15 21:33, 浏览次数:1218 , 标签: SpringBoot SSM <>开发环境 * 开发工具:Eclip ...

  5. 实战07_SSM整合ActiveMQ支持多种类型消息

    接上一篇:企业实战06_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572147 1.S ...

  6. 实战06_SSM整合ActiveMQ支持多种类型消息

    接上一篇:企业实战05_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572129 1.S ...

  7. 实战05_SSM整合ActiveMQ支持多种类型消息

    接上一篇:实战04_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572124 1.Str ...

  8. 04_SSM整合ActiveMQ支持多种类型消息

    接上一篇:企业实战03_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572104 1.S ...

  9. 实战03_SSM整合ActiveMQ支持多种类型消息

    接上一篇:企业实战02_SSM整合ActiveMQ支持多种类型消息https://blog.csdn.net/weixin_40816738/article/details/100572053 1.S ...

最新文章

  1. OSPF-网络类型(ip ospf network ?)
  2. UVA 11255 Necklace
  3. Lucene从入门到进阶(6.6.0版本)
  4. 基于八叉树快速分类的Shear-Warp交互式体绘制算法
  5. Oracle拆分字符串函数与执行调用
  6. 获取存储过程返回值及代码中获取返回值
  7. putty连虚拟机中Linux出现Access Denied
  8. c语言学生对老师的评教系统,学生对老师的评价
  9. 网络管理员&MCSE2003之2:使用虚拟机Vmware建立多电脑网络环境
  10. 微软不允许用户卸载 Chromium Edge 浏览器
  11. 代码之美~利用构造方法实现模块的解耦
  12. webservice 教程学习系列(八)——wsdl文档深入分析
  13. nothing else left on those streets
  14. NOI 1.11(02)二分法求函数的零点
  15. 英伟达辟谣 RTX 3060 被破解传闻
  16. CDH安装指南——酒仙网技术
  17. Matlab数值分析编程:牛顿下山法解方程
  18. python数字2大写汉字转换(金额转换)
  19. 独孤思维:赚钱项目的内卷和躺平
  20. 2020年复旦大学计算机学院夏令营经历

热门文章

  1. Aleo隐私智能合约__火箭准备发射
  2. 项目管理中,几种工作量评估方法
  3. 苹果cmsv10首涂模板第十四套酷黑渐变特色自适应高端模板
  4. 红队渗透靶场之SickOs1.1
  5. asp.net 汉字转拼音的车祸现场
  6. springBoot实验填报系统
  7. 多媒体技术在大学计算机教学中,多媒体技术在计算机教学中的应用探究
  8. 2021中国西部CIO年会正式启动
  9. 啄木鸟Python文章链接
  10. mysql下载与安装教程