欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


欢迎跳转到本文的原文链接:https://honeypps.com/mq/spring-activemq-quick-start/

通过前一篇《ActiveMQ简述》大概对ActiveMQ有了一个大概的认识,本篇所阐述的是如何通过Spring继承ActiveMQ进而更有效、更灵活的运用ActiveMQ.

Spring和ActiveMQ整合需要在项目中包含以下这几个jar包(缺一不可):activeio-core-3.1.4.jar,activemq-all-5.13.2.jar,activemq-pool-5.13.2.jar,commons-pool2-2.4.2.jar,这些jar可以在ActiveMQ的安装包中的/lib/optional/下找到,或者从这里下载。

##配置ConnectionFactory
ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,可以缓存Session, MessageProducer和MessageConsumer。

Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是JMS服务厂商提供的,并且需要把它注入到Spring提供的ConnectionFactory中,这里使用ActiveMQ提供的ConnectionFactory,所以定义如下(10.10.195.187是博主的测试ip地址):

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://10.10.195.187:61616" /></bean><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory"/></bean>

ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将Connection, Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们在定义一个ConnectionFactory时这样定义:

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://10.10.195.187:61616" /></bean><bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  ><property name="connectionFactory" ref="targetConnectionFactory" /><property name="maxConnections" value="10"/></bean><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="poolConnectionFactory"/></bean>

这里也可以去掉spring提供的SingleConnectionFactory,类似这样:

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://10.10.195.187:61616" /></bean><bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  ><property name="connectionFactory" ref="targetConnectionFactory" /><property name="maxConnections" value="10"/></bean>

##配置消息发送(生产)者
配置好ConnectionFactory之后我们就需要配置生产者。生产者负责生产消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现,所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象。

spring配置文件中添加:

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="poolConnectionFactory"/></bean>

在Java相关处理文件中添加(这里用的是@Inject注解,当然也可以用@Autowired):

    @Inject@Named("jmsTemplate")private JmsTemplate jmsTemplate;

这样就可以通过jmsTemplate对象来处理相关信息,譬如:

    jmsTemplate.convertAndSend("sqlQueue",sql);

真正利用JmsTemplate进行消息发送的时候,我们需要知道消息发送的目的地,即Destination。在Jms中有一个用来表示目的地的Destination接口,它里面没有任何方法定义,只是用来做一个标志而已。当我们在使用JmsTemplate进行消息发送时没有指定destination的时候将使用默认的Destination。默认Destination可以通过在定义jmsTemplate bean对象时通过属性defaultDestination或defaultDestinationName来进行诸如,defaultDestinationName对于的就是一个普通字符串。在ActiveMQ中实现了两种类型的Destination,一个是点对点的ActiveMQQueue,另一个就是支持订阅-发布模式的ActiveMQTopic。

    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>sqlQueue</value></constructor-arg></bean><bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg><value>topic</value></constructor-arg></bean>

对面上面的那个例子可以在Java程序中添加:

    @Inject@Named("queueDestination")private Destination queueDestination;

进而【jmsTemplate.convertAndSend(“sqlQueue”,sql);】可以改为【jmsTemplate.convertAndSend(queueDestination,sql);】
也可以这样修改jmsTemplate:

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestination" ref="queueDestination"/></bean>

或者:

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory"/><property name="defaultDestinationName" value="sqlQueue"/></bean>

进而在java程序中这样调用:jmsTemplate.convertAndSend(sql);


##配置消息接收(消费)者
生产者往指定目的地Destination发送消息后,接下来就是消费者对指定目的地的消息进行消费了。那么消费者是如何知道有生产者发送消息到指定目的地Destination了呢?这是通过Spring为我们封装的消息监听容器MessageListenerContainer实现的,它负责接收信息,并把接收到的信息分发给真正的MessageListener进行处理。每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听那个JMS服务器,这是通过在配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory,一个是表示监听什么的Destination,一个是接收到消息以后进行消息处理的MessageListener.

Spring为我们听过了两种类型的MessageListenerContainer:SimpleMessageListenerContainer和DefaultMessageListenerContainer。MessageListenerContainer:SimpleMessageListenerContainer会在一开始的时候就创建一个会话Session和消费者Consumer,并且会适用标准的JMS的MessageConsumer.setMessageListener()方法注册监听器让JMS提供调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容J2EE的JMS限制。大多数情况下,我们还是使用DefaultMessageListenerContainer,跟MessageListenerContainer:SimpleMessageListenerContainer想比,它会动态的适应运行时的需求,并且能够参与外部的事务管理。它很好的平衡了JMS提供者要求低,先进功能如事务参与和兼容J2EE环境。

spring配置文件中添加:

    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>sqlQueue</value></constructor-arg></bean><bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="poolConnectionFactory" /><property name="messageListener" ref="jmsQueueReceiver" /><property name="destination" ref="queueDestination" /></bean>

其中的jmsQueueReceiver代码如下:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;import org.springframework.stereotype.Component;@Component("jmsQueueReceiver")
public class JmsQueueReceiver implements MessageListener
{@Overridepublic void onMessage(Message messgae){if(messgae instanceof TextMessage){TextMessage textMessage = (TextMessage) messgae;try{String text = textMessage.getText();System.out.println("######################["+text+"]######################");}catch (JMSException e){e.printStackTrace();}}}
}

##事务管理
Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理。这将允许JMS应用利用Spring的事务管理特性。JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中。JmsTemplate会自动检测这样的事务资源,并对他们进行相应的操作。

在J2EE环境中,ConnectionFactory会池化Connection和Session,这样这些资源将会在整个事务中被有效地重复利用。在一个独立的环境中,使用Spring的SingleConnectionFactory时所有的事务将公用一个Connection,但是每个事务将保留自己独立的Session.

JmsTemplate可以利用JtaTransactionManager和能够进行分布式的JMS ConnectionFactory处理分布式事务。

在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对于的消息监听容器时指定其sessionTransacted属性为true(默认为false):

    <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="messageListener" ref="jmsQueueReceiver" /><property name="destination" ref="queueDestination" /><property name="sessionTransacted" value="true"/></bean>

这样JMS在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时JMS就会对接收到的消息进行回滚。这里需要注意的是对于其他操作如数据库等访问不属于该事务控制。

可以在JmsQueueReceiver中修改一下代码从而检测到发送异常时是否会进行事务回滚:

@Component("jmsQueueReceiver")
public class JmsQueueReceiver implements MessageListener
{@Overridepublic void onMessage(Message messgae){if(messgae instanceof TextMessage){TextMessage textMessage = (TextMessage) messgae;try{String text = textMessage.getText();System.out.println("######################["+text+"]######################");if(true){throw new RuntimeException("Error");}}catch (JMSException e){e.printStackTrace();}}}
}

你可以通过向Destination发送一条信息,通过JmsQueueReceiver处理后,发送异常进而事务混滚。可以通过http://10.10.195.187:8161/admin/queues.jsp查看相关信息(ip地址是博主的ActiveMQ服务器所在id,根据实际情况修改)。

如果想要接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。

    <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="messageListener" ref="jmsQueueReceiver" /><property name="destination" ref="queueDestination" /><property name="sessionTransacted" value="true"/><property name="transactionManager" ref="jtaTransactionManager"/></bean><bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

当给小时监听容器指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。

到这里大概的Spring+ActiveMQ整合告一段落,所有代码经博主亲测,确保有效性及正确性。如有疑问,可在下方留言。


参考资料:

  1. Spring整合JMS(一)——基于ActiveMQ实现
  2. Spring+ActiveMQ+Mysql 配置JMS
  3. ActiveMQ简述

欢迎跳转到本文的原文链接:https://honeypps.com/mq/spring-activemq-quick-start/

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


Sping+ActiveMQ整合相关推荐

  1. 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例

    转载:http://blog.csdn.net/jiuqiyuliang/article/details/48758203 第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模 ...

  2. ActiveMQ整合spring

    主要讲解点: 使用ActiveMQ完成发短信功能(重点) 2.Kindeditor上传图片及图片管理器功能的实现(对应后台代码是重点, Kindeditor参照demo会用就行) 宣传活动的保存功能( ...

  3. JMS学习(4):--Spring和ActiveMQ整合的完整实例

    前言 这篇博文,我们基于spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PU ...

  4. Spring和ActiveMQ整合的完整实例

     Spring和ActiveMQ整合的完整实例 前言 这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了 ...

  5. ActiveMQ整合spring结合项目开发流程(生产者和消费者)总结

    1 一:生产者代码编写: 2 1.配置pom.xml引入相关坐标 3 <dependencies> 4 <!-- spring开发测试 --> 5 <dependency ...

  6. SpringBoot ActiveMQ 整合使用

    介绍 ActiveMQ 它是 Apache 出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 ...

  7. Spring mvc4 + ActiveMQ 整合

    转载:http://www.cnblogs.com/leiOOlei/p/5075402.html 一.配置部分 二.代码部分 三.页面部分 四.Controller控制器 五.效果展示 六.加入监听 ...

  8. ActiveMQ相关存储介绍

    ActiveMQ 的存储: ActiveMQ 在 queue 中存储 Message 时,采用先进先出顺序(FIFO)存储.同一时间一个消息被分派给单个消费者,且只有当 Message 被消费并确认时 ...

  9. Spring知识点提炼

    原文出处: 朱小厮 1. Spring框架的作用 轻量:Spring是轻量级的,基本的版本大小为2MB 控制反转:Spring通过控制反转实现了松散耦合,对象们给出它们的依赖,而不是创建或查找依赖的对 ...

最新文章

  1. Exchange环境搭建心得
  2. Linux配置最基础的命令
  3. Semaphore 里面居然有这么一个大坑!
  4. Python paho-mqtt 模块使用(转)
  5. linux msgsend 头文件,Unix/Linux进程间通信
  6. 树莓派 rfid_技术 | 对恶意树莓派设备的取证分析
  7. java实现音频播放小程序_微信小程序实现音频文件播放进度的实例代码
  8. oracle 修改列类型6,Oracle用户、权限、角色管理 编辑
  9. 服务器虚拟计算节点,什么是云服务器计算节点
  10. 制作可保存配置的U盘版BT4(BackTrack4 )
  11. 74LS138译码器实现举重裁判电路-QuartusII 软件仿真
  12. 文件右键点击打开方式后没有始终使用此应用打开该文件的勾选框
  13. 开发文件管理器-2015年9月19日
  14. 推荐系统-协同过滤在Spark中的实现
  15. golang vendor目录
  16. DSP TMS320C5509A 控制DDS AD9854芯片驱动
  17. 5-ipv6服务器之-dns
  18. 一个强迫症的电脑上(桌面篇)
  19. js 变量、函数重复声明和变量提升浅析
  20. matlab图像类论文,基于matlab图形图像处理技术毕设论文.doc

热门文章

  1. Django 2 零基础 - 待办清单网站
  2. 200915阶段一C++模板
  3. IDEA报错:Loading class `com.mysql.jdbc.Driver‘. This is deprecated. The new driver class is `com.mysql
  4. PCB线宽与电流的关系
  5. 什么?ES6 中还有 Tail Calls!
  6. 34.任务计划cron chkconfig systemctl管理服务 unit target
  7. How to think positively 如何培养正念
  8. CodeForces - 1455E Four Points(数学+几何)
  9. HDU - 5306 Gorgeous Sequence(吉司机线段树)
  10. CodeForces - 1313C2 Skyscrapers (hard version)(单调栈+dp/分治)