activeMQ初识到使用(二)
上一篇总结到基本环境的准备,接下来以个人见解一起来说说activeMQ的使用。
四、生产者与消费者
既然是消息的桥梁,那么就要有消息的发送者(生产者)与接收者(消费者)。这里先来介绍一下生产者:
生产者配置:
1、连接工厂配置(ConnectionFactory)
ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。本案例中使用CachingConnectionFactory,配置如下:
<bean id="myConnectionFactory1" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --><property name="sessionCacheSize" value="10" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --><property name="brokerURL" value="tcp://localhost:61616"/> </bean> </property> </bean>
2、确定发送的目的地(destination)
在ActiveMQ中实现了两种类型的Destination,一个是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。即点对点,只能发送到众多消费者者当中的一个(经过我的测试,要是有两个消费者,每一次都会轮换接收),订阅/发布模式,可以发送到每一个消费者。这里采取订阅/发布模式。
<bean id="MessageListenerDestination" class="org.apache.activemq.command.ActiveMQTopic"><!-- 设置消息主题的名字 --><constructor-arg> <value>messageListener</value> </constructor-arg> </bean>
3、JMS模版的配置
配置JmsTemplate,即发送消息的配置。首先是注入连接(ConnectionFactory),其次是设置发布模式(destination),以及开启发布订阅模式,如下:
<bean id="MessageJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="myConnectionFactory1" /> <property name="defaultDestination" ref="MessageListenerDestination" /> <!-- 订阅发布模式 --> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> <property name="explicitQosEnabled" value="true" />
</bean>
生产者java类:
package com.lzt.MessageListener;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;@Component
public class MessageListenerSend {@Autowired@Qualifier("MessageJmsTemplate")private JmsTemplate MessageJmsTemplate;public void MessageListener(){MessageJmsTemplate.send("messageListener",new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage message = session.createTextMessage();message.setText("hello activemq");System.out.println("hello activemq");return message;}});}}
消费者配置:
1、连接工厂配置(ConnectionFactory),同上(多了一个clientId)。
clientId持久订阅,标识消费者,下面具体详细说。
<!-- 配置JMS连接工厂 receive --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <property name="clientId" value="client_111" /><property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://localhost:61616" /> <!-- 是否异步发送 --> <property name="useAsyncSend" value="true" /> </bean> </property> </bean>
2、三种监听方式配置(根据实际取其一即可):
消费者配置也就三个配置:配置接收目的地,与生产者的目的地对应;监听器即监听类的确认配置;监听容器的配置。下面总结一下三种监听方式的使用。
(a)MessageListener
监听类实现MessageListener接口,重写onMessage 。xml配置如下:
<!-- 测试MessageListener 监听 开始--><bean id="MessageListenerDestination" class="org.apache.activemq.command.ActiveMQTopic"><!-- 设置消息主题的名字 --><constructor-arg> <value>messageListener</value> </constructor-arg> </bean><!-- 接收消息配置 (自己定义) --> <bean id="MessageListener" class="com.lzt.MessageListener.MessageListenerReceive" /> <!-- 消息监听容器 --><bean id="MessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <!-- 发布订阅模式 --><property name="pubSubDomain" value="true" /> <!-- 消息持久化 --> <property name="subscriptionDurable" value="true" /> <property name="receiveTimeout" value="10000" /><property name="destination" ref="MessageListenerDestination" /> <property name="messageListener" ref="MessageListener" /> </bean>
消费者java类:
package com.lzt.MessageListener;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;import org.springframework.stereotype.Component;public class MessageListenerReceive implements MessageListener{public void onMessage(Message message) {try {System.out.println("1:接收消息");System.out.println("messageListener接收到消息:"+((TextMessage)message).getText());} catch (JMSException e) {e.printStackTrace();}}}
测试:
Controller里调用生产者:
package com.lzt.controller;/*** *lizitao*/import javax.annotation.Resource;import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;import com.lzt.MessageListener.MessageListenerSend;
import com.lzt.MessageListenerAdapter.SimpleJMSSender;
import com.lzt.SessionAwareMessageListener.SessionAwareSend;@Controller
public class ActiveMqController {@Resourceprivate SimpleJMSSender simpleJMSSender;@Resourceprivate SessionAwareSend sessionAwareSend;@Resourceprivate MessageListenerSend messageListenerSend;@RequestMapping("/test.do")public String test01(){return "test";}//测试监听器是MessageListener@RequestMapping("/MessageTest.do")public void MessageListener(){messageListenerSend.MessageListener();}//测试监听器是MessageListenerAdapter@RequestMapping("/AdapteTest.do")public void MessageListenerAdapteTest(){simpleJMSSender.MessageListenerAdapter();}//测试监听器是SessionAwareMessageListener@RequestMapping("/SessionAwareTest.do")public void SessionAwareMessageListenerTest(){sessionAwareSend.AwareSend();}}
运行程序,打开浏览器输入http://localhost:8080/ActiveMQ_Spring/MessageTest.do
可以看到控制台输出:
(b)SessionAwareMessageListener
SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。总结上面一大串话就是说,消费者实现这个接口,可以进行消息的回复,也就是消费者接收到消息后可以发一句给生产者,我已经收到你的消息啦。
xml配置与MessageListener一样,不同的在于还需要指定消息回复的公共监听器,这里指定CommonDestination。配置公共的监听器与上面的messageLsitener一样。
<!-- 可以获取session的MessageListener --> <bean id="consumerSessionAwareMessageListener" class="com.lzt.SessionAwareMessageListener.SessionAwareReceive"> <property name="destination" ref="CommonDestination"/> </bean>
java类的实现:
package com.lzt.SessionAwareMessageListener;import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.listener.SessionAwareMessageListener;public class SessionAwareReceive implements SessionAwareMessageListener<Message>{private Destination destination; public void onMessage(Message message, Session session) throws JMSException {System.out.println("接收到一条消息!");System.out.println("消息名字是:"+((TextMessage)message).getText());//返回接收到消息的信号MessageProducer producer = session.createProducer(destination); Message textMessage = session.createTextMessage("已接收到信息。。。"); producer.send(textMessage);}public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; }
}
可以看到与MessageListener最大的不同在于可以快速的回复消息,(当然也可以在MessageListener的接收类里再写一个发送代码实现完成与此监听器的同样功能,有兴趣的可以试试 )。
测试:浏览器输入:http://localhost:8080/ActiveMQ_Spring/SessionAwareTest.do
看到控制台输出:
(c)MessageListenerAdapter
MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。当然这个监听器也会进行消息的回复。
MessageListenerAdapter会把接收到的消息做如下转换:
TextMessage转换为String对象;
BytesMessage转换为byte数组;
MapMessage转换为Map对象;
ObjectMessage转换为对应的Serializable对象。
与上面两种监听器的配置不同的在于,配置的监听器不一样,上面只需指定监听类即可,而这个需要进行一些设置,毕竟是把监听交给一个普通java类进行处理:
值得注意的是当有消息回复的时候一定要把消息转换给注视了,我在测试的时候就被坑了,一直收不到消息回复,手贱加了一个这种配置,删了最好。默认的时候是当有消息回复就会进行自动转换并监听。defaultListenerMethod设置该类监听的方法是receive,不设置默认是handleMessage方法。defaultResponseDestination设置的是接收消息回复的监听器,这里设置为公共回复监听器。(这是消息回复的一种设置方法)
<!-- 消费消息配置 (自己定义) --> <bean id="myTopicConsumer" class="com.lzt.MessageListenerAdapter.SimpleJMSReceiver" />
<!-- MessageListenerAdapter消息监听器 --> <bean id="myTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="myTopicConsumer" /> <!-- 接收消息的方法名称 当不设置该项的时候默认是调用类中的handleMessage方法--> <property name="defaultListenerMethod" value="receive" /> <!-- 设置回复的目的地 --><!-- <property name="defaultResponseDestination" ref="CommonDestination"/> --><!-- 不进行消息转换 --> <!-- <property name="messageConverter"> <null /> </property> --></bean>
发送java类:
消息回复直接在发送的类方法里设置:msg.setJMSReplyTo(commonDestination);指定一个接收回复的监听器。与上面的设置2选其一即可,两个都配置优先级是这个高。
package com.lzt.MessageListenerAdapter;import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;@Component("simpleJMSSender")
public class SimpleJMSSender {@Autowired@Qualifier("myJmsTemplate")private JmsTemplate jmsTemplate;@Autowired@Qualifier("CommonDestination")private Destination commonDestination;public void MessageListenerAdapter() {jmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage msg = session.createTextMessage();// 设置消息内容msg.setText("Hello World activeMq!");System.out.println("Adapter发送消息:Send Hello World activeMq");//当接收消息的消费者有返回数据时,指定该返回的消息的目的地msg.setJMSReplyTo(commonDestination);return msg;}});}}
接收java类,当有返回消息回复的时候返回一个字符串:
package com.lzt.MessageListenerAdapter;import javax.jms.JMSException;
import org.springframework.jms.JmsException;
import org.springframework.stereotype.Component;@Component
public class SimpleJMSReceiver{/*public void receive(TextMessage message) throws JmsException,JMSException, InterruptedException {Thread.sleep(1000);System.out.println("监听到消息:"+message.getText());}*///测试该种监听器返回消息回复public String receive(String message) throws JmsException,JMSException, InterruptedException {System.out.println("Adapter receive监听到消息:"+message);return "lzt";}}
测试:浏览器输入http://localhost:8080/ActiveMQ_Spring/AdapteTest.do
控制台输出
填一下上面说到的clientId的坑,持久订阅,当设置了clientid后,在监听容器里必须设置<!-- 消息持久化 -->
<property name="subscriptionDurable" value="true" />
<property name="receiveTimeout" value="10000" />
二者缺一不可,缺了clentId,消息监听不到,缺了消息持久化,就不是持久订阅,当消费者不在线的时候监听不到,上线了也监听不到。
测试代码链接:
http://download.csdn.net/detail/lztizfl/9913622
activeMQ初识到使用(二)相关推荐
- 初识MIMO(二):MIMO的信道容量及其仿真
初识MIMO(二):MIMO的信道容量 一. SVD简介 SVD可以将一个矩阵分解为UΣVHU\Sigma V^HUΣVH的形式,U是大小为NRXN_{RX}NRX的方阵,V是大小为NTXN_{TX ...
- HTML标签 初识笔记(二)
HTML标签 初识笔记(二) hello world 我是前小白,我是一名平凡的小白级前端工程师. HTML 元素介绍 一.meta 元素 charset:网页字符编码 < meta chars ...
- 初识FPGA(二)(FPGA与ASIC及CPLD的对比)
目录 简述 ASIC和FPGA之间的比较 FPGA和CPLD之间的比较 简述 ASIC是英文Application Specific Integrated Circuits的缩写,即专用集成电路,是指 ...
- [ActiveMQ]初识ActiveMQ
初识ActiveMQ ActiveMQ介绍 官方网站:http://activemq.apache.org/ 最新版本:ActiveMQ 5.14.1(2016-10-28) 最新版本下载链接:htt ...
- 初识V4l2(二)-------浅析video_register_device
在V4l2初识(一)中,我们已经知道当插上一个摄像头的时候,在uvc_driver.c中最终会调用函数video_register_device函数.接下来我们就简要分析这个函数做了哪些事情,揭开其神 ...
- activeMq初识 - 2
activeMq简单实例: package com.gordon;import org.apache.activemq.ActiveMQConnectionFactory;import javax.j ...
- linux 文件夹大小_技能“慧”|初识Linux(二)
上期我们对Ubuntu的界面有了简单的认识,以及如何与本机之间进行文件传输,今天我们就来带大家一起了解一下命令行. 去可视化--命令行 虽然Ubuntu有着漂亮的图形界面,但是我们使用更多的还是命令行 ...
- 初识Python(二)
一.变量 1.声明变量 #!/usr/bin/env python # -*- coding: utf-8 -*- num = 100 上述代码声明了一个变量,变量名为:num,变量num的值为:10 ...
- ActiveMQ学习笔记(二十三)Comsumer高级特性2
一.Message Selectors 二.Redelivery Policy 测试: 重传次数,默认为6. 用最简单的QueueSender和QueueReceiver进行测试,QueueRecei ...
- 初识Seata(二)
目录 1.背景 2.环境 3.业务模型 4.数据库准备 1)订单库 2)库存库 3)账户库 5.微服务准备 1)pom.xml 2)注册中心准备 3)订单服务 1.背景 书接上回:初识Seata(一) ...
最新文章
- 从GNOME切换到KDE了
- 24 年前的 IE 仍能在 Win10中运行,这无敌兼容性与你的代码比比?
- 计算机网络:Socket网络通信底层数据传输
- 超级计算机为什么快,演讲视频_为什么这台超级计算机如此快? (有声) _沪江英语...
- SAP BI工具的优缺点
- 实录 | 平安人寿资深算法工程师谢舒翼:智能问答系统探索与实践
- loadrunner参数化excel数据
- linux安装svn(yum安装)
- 矩形波导中TE波和TM波的截止波数截止波长和截止频率
- 个人所得税java程序怎么编写_个人所得税Java实现代码
- 基于SuperMap iDesktop制作天地图1--10级详细说明
- html5中将图片的绝对路径转换成文件对象
- mysql 计算信度_因子数超过15个的组合信度和AVE计算工具
- 小程序登录问题--登录函数getUserInfo()写在app.js中,首次加载无法获取后台处理过的用户的信息,刷新一次后就可以获取的解决
- RK3399 Android7.1如何查看屏幕分辨率
- java计算机毕业设计会员商城管理系统源码+mysql数据库+系统+lw文档+部署
- Unity3D网络游戏《僵尸星球》
- Python语言程序设计(北京理工大学MOOC)1-5周
- Android 应用链接详解
- Material Design Icons