Spring framework(10):集成 JMS 异步消息队列(ActiveMQ)
JMS 和 Apache-ActiveMQ 简介
JMS 具有以下优势
- 通信的异步性,客户端获取信息不需要主动发送请求,由 JMS 中间件自动推送信息;
- 消息发送的准确性,JMS 中间件可以保证信息只会发送一次,不会发送重复信息;
JMS 的消息传输模型有以下 2 种
- 每一个消息只有一个接收者(当多个接收者使用同一个消息队列时,接收消息是竞争式的);
- 发布者和接收者没有时间依赖,当消息发送者发送消息时,即使但是接收者不运行,当只要接收者上线,就可以接收到信息;
- 当接收者接收到消息时,会发送确认通知;
- 一个消息可以传递给多个消息接收者(当多个接收者接收到同一个消息时,每一个接收者都会接收到一份该消息的拷贝);
- 发布者和接收者有时间依赖,只有当客户端创建订阅后,才可以开始接收到消息(而且是当前开始消息发送者发布的消息),同时订阅者需要一直保持活动状态才可以接收消息;
- JMS 允许发布订阅者创建一个可持久化的订阅,这样即使订阅者没有上线,也可以在上线后接收到消息;
ActiveMQ的安装和部署
<broker>
....
<!--增加用户认证-->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="assad" password="assad123" groups="admins" />
</users>
</simpleAuthenticationPlugin>
</plugins>
</broker>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://127.0.0.1:23333?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
......
</transportConnectors>
- TextMessager:传输文本对象;
- MapMessager:传输 Map 类型对象;
- ObjectMessager:传输其他 Object,调用该 Object 的序列化方法序列化为二进制字节传输;
Spring 集成 JMS(ActiveMQ)
异步模型快速示例(queue/topic)
//spring_sample 核心依赖
compile 'org.springframework:spring-core:4.3.11.RELEASE'
compile 'org.springframework:spring-beans:4.3.11.RELEASE'
//Apache-ActiveMQ ( JMS API 的一种思想) 依赖,Spring 相关支持依赖
compile 'org.apache.activemq:activemq-all:5.15.3'
compile 'org.apache.activemq:activemq-pool:5.15.3'
compile 'org.springframework:spring-jms:4.3.11.RELEASE'
服务端
<!--ActiveMQ 相关配置-->
<!--第三方工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://127.0.0.1:23333"
p:userName="assad"
p:password="assad123"
p:trustAllPackages="true"
p:useAsyncSend="true"/>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
p:connectionFactory-ref="targetConnectionFactory"
p:maxConnections="100" />
<!--spring 管理真正 ConnectionFactory 的连接工厂,使用PooledConnectionFactory 封装过的 targetConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"
p:targetConnectionFactory-ref="pooledConnectionFactory"/>
<!--目的地配置: queue(point-to-point模式) -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!--目的地配置:Topic(发布/订阅模式)-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>
<!--配置 Spring jsmTemplate 模板,用于发送和接收消息 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="connectionFactory">
<!--配置默认目的地(queue 或 topic)-->
<property name="defaultDestination" ref="destinationQueue" />
<!--使用spring提供的默认消息转换器,也可以装载自己实现的消息转化器-->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
package basic_sample.server;
import bean.User;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
//演示各种类型消息的发送
public class MessageProducer {
private static JmsTemplate jmsTemplate = null;
private static final Logger log = LogManager.getLogger();
public static void main(String[] args){
//获取 JmsTemplate 对象
jmsTemplate = new ClassPathXmlApplicationContext("basic_sample/appContext-server.xml").getBean("jmsTemplate",JmsTemplate.class);
//发送文本信息
String message = "basic_sample.server: Hello world!";
jmsTemplate.convertAndSend(message);
log.debug(message);
//发送 Map 类型信息
Map<String,Integer> info = new HashMap<>();
info.put("Guangdong",64815);
info.put("Jiangshu",62604);
info.put("Shangdong",54868);
info.put("Zhijiang",36958);
jmsTemplate.convertAndSend(info);
log.debug(info.keySet().stream().map(key -> key+":"+info.get(key)).collect(Collectors.joining(", ")));
//发送 Object 类型信息(User对象)
User user = new User();
user.setId(20137);
user.setName("assad");
user.setCity("Guangzhou");
jmsTemplate.convertAndSend(user);
log.debug(user);
}
}
客户端
<!--ActiveMQ 相关配置-->
<!--第三方工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://127.0.0.1:23333"
p:userName="assad"
p:password="assad123"
p:trustAllPackages="true" />
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
p:connectionFactory-ref="targetConnectionFactory"
p:maxConnections="100" />
<!--spring 管理真正 ConnectionFactory 的连接工厂,使用PooledConnectionFactory 封装过的 targetConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"
p:targetConnectionFactory-ref="pooledConnectionFactory"/>
<!--目的地配置: queue(point-to-point模式) -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!--目的地配置:Topic(发布/订阅模式)-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>
<!--配置 Spring jsmTemplate 模板,用于发送和接收消息 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="connectionFactory">
<!--配置默认目的地(queue 或 topic)-->
<property name="defaultDestination" ref="destinationQueue" />
<!--使用spring提供的默认消息转换器,也可以装载自己实现的消息转化器-->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
package basic_sample.client;
import bean.User;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
//演示接收各种类型的消息
public class MessageConsumer {
private static JmsTemplate jmsTemplate = null;
private static final Logger log = LogManager.getLogger();
public static void main(String[] args){
//创建 JmsTemplate 对象
jmsTemplate = new ClassPathXmlApplicationContext("basic_sample/appContext-client.xml").getBean("jmsTemplate",JmsTemplate.class);
//接收文本对象
String msg = (String) jmsTemplate.receiveAndConvert();
log.debug(msg);
//接收 Map 对象
Map<String,Integer> info = (HashMap<String,Integer>) jmsTemplate.receiveAndConvert();
log.debug(info.keySet().stream().map(key -> key+":"+info.get(key)).collect(Collectors.joining(", ")));
//接收 Object 类型对象
User user = (User) jmsTemplate.receiveAndConvert();
log.debug(user);
}
}
客户端使用监听器的方式接收信息
package persistence_sample.client;
import javax.jms.*;
public class MyListener implements MessageListener {
private static final Logger log = LogManager.getLogger();
@Override
public void onMessage(Message message) {
try {
//接收文本对象
String msg = (String) jmsTemplate.receiveAndConvert();
log.debug(msg);
//接收 Map 对象
Map<String,Integer> info = (HashMap<String,Integer>) jmsTemplate.receiveAndConvert();
log.debug(info.keySet().stream().map(key -> key+":"+info.get(key)).collect(Collectors.joining(", ")));
//接收 Object 类型对象
User user = (User) jmsTemplate.receiveAndConvert();
log.debug(user);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
<!--ActiveMQ 相关配置-->
<!--第三方工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://127.0.0.1:23333"
p:userName="assad"
p:password="assad123"
p:trustAllPackages="true" />
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
p:connectionFactory-ref="targetConnectionFactory"
p:maxConnections="100" />
<!--spring 管理真正 ConnectionFactory 的连接工厂,使用PooledConnectionFactory 封装过的 targetConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"
p:targetConnectionFactory-ref="pooledConnectionFactory"/>
<!--目的地配置: queue(point-to-point模式) -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!--装载自定义消息监听器-->
<bean id="myTopicListener" class="persistence_sample.client.TopicListener" />
<!--消息监听容器-->
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="destinationQueue"
p:receiveTimeout="1000"
p:messageListener-ref="myTopicListener">
<!--messageListener-ref 配置消息监听器-->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
public class StartReceiver {
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext("persistence_sample/appContext-client.xml");
while(true){
}
}
}
Topic 消息持久化配置
服务端
<!--ActiveMQ 相关配置-->
<!--第三方工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://127.0.0.1:23333"
p:userName="assad"
p:password="assad123"
p:trustAllPackages="true" />
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
p:connectionFactory-ref="targetConnectionFactory"
p:maxConnections="100" />
<!--spring 管理真正 ConnectionFactory 的连接工厂,使用PooledConnectionFactory 封装过的 targetConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"
p:targetConnectionFactory-ref="pooledConnectionFactory"/>
<!--目的地配置:queue(发布/订阅模式)-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic-persistence" />
</bean>
<!--配置 Spring jsmTemplate 模板,配置为 Topic 模式 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="connectionFactory"
p:defaultDestination-ref="destinationTopic"
p:pubSubDomain="true"
p:sessionAcknowledgeMode="1"
p:explicitQosEnabled="true"
p:deliveryMode="2"
p:receiveTimeout="10000">
<!--以上配置中,explicitQosEnabled用于生效deliveryMode配置
deliveryMode 为发送模式,1 为非持久,2 为持久-->
<!--使用spring提供的默认消息转换器-->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
ackage persistence_sample.server;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
public class MessageProducer {
private static JmsTemplate jmsTemplate ;
private static final Logger log = LogManager.getLogger();
public static void main(String[] args) throws InterruptedException {
jmsTemplate = new ClassPathXmlApplicationContext("persistence_sample/appContext-server.xml").getBean("jmsTemplate",JmsTemplate.class);
//发送20条信息
for(int i=1; i<=20; i++){
String message = String.format("Message@<%d>",i);
jmsTemplate.convertAndSend(message);
log.debug(message);
Thread.sleep(900);
}
}
}
客户端
<!--ActiveMQ 相关配置-->
<!--第三方工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://127.0.0.1:23333"
p:userName="assad"
p:password="assad123"
p:trustAllPackages="true" />
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"
p:connectionFactory-ref="targetConnectionFactory"
p:maxConnections="100" />
<!--spring 管理真正 ConnectionFactory 的连接工厂,使用PooledConnectionFactory 封装过的 targetConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"
p:clientId="client_001"
p:targetConnectionFactory-ref="pooledConnectionFactory"/>
<!--目的地配置:Topic(发布/订阅模式)-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic-persistence" />
</bean>
<!--装载自定义消息监听器-->
<bean id="myTopicListener" class="persistence_sample.client.TopicListener" />
<!--消息监听容器-->
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="destinationTopic"
p:pubSubDomain="true"
p:subscriptionDurable="true"
p:clientId="client_001"
p:durableSubscriptionName="client_001"
p:receiveTimeout="1000"
p:messageListener-ref="myTopicListener">
<!--pubSubDomain开启订阅模式,subscriptionDurable设置允许持久化,
clientId、durableSubscriptionName 设置接收者ID-->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
package persistence_sample.client;
import javax.jms.*;
public class TopicListener implements MessageListener {
private static final Logger log = LogManager.getLogger();
@Override
public void onMessage(Message message) {
try {
String msg = ((TextMessage)message).getText();
log.debug(msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package persistence_sample.client;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;
public class StartReceiver {
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext("persistence_sample/appContext-client.xml");
while(true){
}
}
}
ActiveMQ 的配置
- AMQ 文件储存;
- KahaDB 文件储存;
- LevelDB 储存
- JDBC 数据库持久化;
<broker>
....
<!--持久化配置-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
....
</broker>
Spring framework(10):集成 JMS 异步消息队列(ActiveMQ)相关推荐
- 八.利用springAMQP实现异步消息队列的日志管理
经过前段时间的学习和铺垫,已经对spring amqp有了大概的了解.俗话说学以致用,今天就利用springAMQP来完成一个日志管理模块.大概的需求是这样的:系统中有很多地方需要记录操作日志,比如登 ...
- 异步消息队列Celery
Celery是异步消息队列, 可以在很多场景下进行灵活的应用.消息中包含了执行任务所需的的参数,用于启动任务执行, suoy所以消息队列也可以称作 在web应用开发中, 用户触发的某些事件需要较长事件 ...
- C#实现异步消息队列
C#实现异步消息队列 原文:C#实现异步消息队列 拿到新书<.net框架设计>,到手之后迅速读了好多,虽然这本书不像很多教程一样从头到尾系统的讲明一些知识,但是从项目实战角度告诉我们如何使 ...
- redis stream java消息队列_Redis 异步消息队列与延时队列
消息中间件,大家都会想到 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能.这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力.但是这种属 ...
- Redis异步消息队列
一.异步消息队列介绍 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦.所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列.同时由于使用了消 ...
- 消息队列 - ActiveMQ
消息队列 - ActiveMQ 一.入门概述 1.在什么场景下使用消息中间件,为什么使用 2. 消息队列是什么 3. 去哪下 二.ActiveMQ的安装和控制台 1.Linux安装 2.Apache ...
- CentOS源码安装消息队列ActiveMQ
消息队列ActiveMQ介绍 JMS全称:Java Message Service中文:Java消息服务.JMS是java的一套API标准,最初的目的是为了是应用程序能够访问现有的MOM系统(MOM是 ...
- java分布式面试题之消息队列ActiveMQ部分
java分布式面试题之消息队列ActiveMQ部分 java分布式面试题之消息队列ActiveMQ部分 1.如何使用ActiveMQ解决分布式事务? 在互联网应用中,基本都会有用户注册的功能.在注册的 ...
- Spring Boot:使用Rabbit MQ消息队列
综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以 ...
最新文章
- 百度搜索 带网页特效的关键词
- Linux配置静态IP地址
- oracle 控制文件作用是什么,Oracle控制文件(controlfile)作用
- 大于等于符号_英语标点符号怎么读,这下全知道了!
- 判断iframe是否加载完毕的方法(兼容ie和Firefox)
- HDU5875 - Function
- 科沃斯扫地机器人风扇模块_扫地机器人不能开机,不能关机,风扇不转
- arduino esp8266_Arduino-httpupdate-OTA-esp8266升级探险记
- opencv python3 找图片不同_使用OpenCV和Python查找图片差异
- 计算器html js php代码,html+js实现简单的计算器代码(加减乘除)
- 昆仑通态MCGS与三菱FX3U 485BD方式通讯案例 实现昆仑通态触摸屏与三菱FX3U的485BD板通过485方式进行连接
- 线性表——顺序表——时间复杂度计算
- ISIS协议基础知识
- 开集识别(Open Set Recognition)
- 利用神经网络识别12306验证码—(六)模型应用以及12306实战测试
- 【python】根据图片链接(地址)抓取图片
- 合肥工业大学的计算机专业的导师,合肥工业大学计算机与信息学院硕士生导师:程运安副教授...
- 书单 | 做数字化转型,离不开这10本书!
- Python-OpenCV 实现美图秀秀视频剪辑效果【转场】
- win7笔记本外接显示器html,window7笔记本外接显示器只显示一个屏幕怎么设置
热门文章
- 微信小程序自定义菜单tabbar后初次进入小程序会出现两个tabbar
- IDEA代码字体颜色恢复为默认颜色
- 如何通过腾讯云短信实现发送验证码并校验验证码以实现登录功能
- vue 移动端与PC端的响应式布局整理
- 进销存 SPU和SKU
- 微信公众号迁移丨如何迁移微信公众号 最详细公众号迁移流程和方法
- java登录密码验证失败_java – Spring Security:如果身份验证失败,则重定向到登录页面...
- 抖音上的python课程_用 Python 下载抖音无水印视频
- 重庆青年建立个人网站 月收入约5万元
- Android 打包32位和64位兼容包