1、概述

首先和大家一起回顾一下Java 消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:

  1. 消息服务:一个中间件,用于解决两个或多个程序之间的耦合,底层由Java 实现。
  2. 优势:异步、可靠
  3. 消息模型:点对点,发布/订阅
  4. JMS中的对象

然后在另一篇博客《Java消息队列-ActiveMq实战》中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

在接下来的这篇博客中,我会和大家一起来整合Spring 和ActiveMq,这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。

2、目录结构


  2.1 项目目录

IDE选择了IDEA(建议大家使用),为了避免下载jar 的各种麻烦,底层使用maven搭建了一个项目,整合了Spring 和ActiveMq

    2.2 pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" >
   < modelVersion >4.0.0</ modelVersion >
   < groupId >Crawl-Page</ groupId >
   < artifactId >Crawl-Page</ artifactId >
   < packaging >war</ packaging >
   < version >1.0-SNAPSHOT</ version >
   < name >Crawl-Page Maven Webapp</ name >
   < url >http://maven.apache.org</ url >
   <!-- 版本管理 -->
   < properties >
     < springframework >4.1.8.RELEASE</ springframework >
   </ properties >
   < dependencies >
     < dependency >
       < groupId >junit</ groupId >
       < artifactId >junit</ artifactId >
       < version >4.10</ version >
       < scope >test</ scope >
     </ dependency >
   <!-- JSP相关 -->
   < dependency >
     < groupId >jstl</ groupId >
     < artifactId >jstl</ artifactId >
     < version >1.2</ version >
   </ dependency >
   < dependency >
     < groupId >javax.servlet</ groupId >
     < artifactId >servlet-api</ artifactId >
     < scope >provided</ scope >
     < version >2.5</ version >
   </ dependency >
     <!-- spring -->
     < dependency >
       < groupId >org.springframework</ groupId >
       < artifactId >spring-core</ artifactId >
       < version >${springframework}</ version >
     </ dependency >
     < dependency >
       < groupId >org.springframework</ groupId >
       < artifactId >spring-context</ artifactId >
       < version >${springframework}</ version >
     </ dependency >
     < dependency >
       < groupId >org.springframework</ groupId >
       < artifactId >spring-tx</ artifactId >
       < version >${springframework}</ version >
     </ dependency >
     < dependency >
       < groupId >org.springframework</ groupId >
       < artifactId >spring-webmvc</ artifactId >
       < version >${springframework}</ version >
     </ dependency >
     < dependency >
       < groupId >org.springframework</ groupId >
       < artifactId >spring-jms</ artifactId >
       < version >${springframework}</ version >
     </ dependency >
     <!-- xbean 如<amq:connectionFactory /> -->
     < dependency >
       < groupId >org.apache.xbean</ groupId >
       < artifactId >xbean-spring</ artifactId >
       < version >3.16</ version >
     </ dependency >
     <!-- activemq -->
     < dependency >
       < groupId >org.apache.activemq</ groupId >
       < artifactId >activemq-core</ artifactId >
       < version >5.7.0</ version >
     </ dependency >
     < dependency >
       < groupId >org.apache.activemq</ groupId >
       < artifactId >activemq-pool</ artifactId >
       < version >5.12.1</ version >
     </ dependency >
     <!-- 自用jar包,可以忽略-->
     < dependency >
       < groupId >commons-httpclient</ groupId >
       < artifactId >commons-httpclient</ artifactId >
       < version >3.1</ version >
     </ dependency >
   </ dependencies >
   < build >
     < finalName >Crawl-Page</ finalName >
     < plugins >
       < plugin >
         < groupId >org.apache.tomcat.maven</ groupId >
         < artifactId >tomcat7-maven-plugin</ artifactId >
         < configuration >
           < port >8080</ port >
           < path >/</ path >
         </ configuration >
       </ plugin >
     </ plugins >
   </ build >
</ project >

因为这里pom.xml 文件有点长,就不展开了。

我们可以看到其实依赖也就几个,1、Spring 核心依赖 2、ActiveMq core和pool(这里如果同学们选择导入jar,可以直接导入我们上一篇博客中说道的那个activemq-all 这个jar包)3、java servlet 相关依赖

这里面我们选择的ActiveMq pool 的依赖版本会和之后的dtd 有关系,需要版本对应,所以同学们等下配置activemq 文件的时候,需要注意dtd 版本选择

    2.3 web.xml

web.xml 也大同小异,指定Spring 配置文件,springMvc 命名,编码格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<? xml version = "1.0" encoding = "UTF-8" ?>
< web-app xmlns = "http://java.sun.com/xml/ns/javaee"
          xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
          version = "3.0" >
   < display-name >Archetype Created Web Application</ display-name >
   <!-- 加载spring的配置文件,例如hibernate、jms等集成 -->
   < context-param >
     < param-name >contextConfigLocation</ param-name >
     < param-value >
       classpath:applicationContext*.xml;
     </ param-value >
   </ context-param >
   < listener >
     < listener-class >org.springframework.web.context.ContextLoaderListener</ listener-class >
   </ listener >
   < servlet >
     < servlet-name >springMVC</ servlet-name >
     < servlet-class >org.springframework.web.servlet.DispatcherServlet</ servlet-class >
     < init-param >
       < param-name >contextConfigLocation</ param-name >
       < param-value >classpath:spring-mvc.xml</ param-value >
     </ init-param >
     < load-on-startup >1</ load-on-startup >
   </ servlet >
   < servlet-mapping >
     < servlet-name >springMVC</ servlet-name >
     < url-pattern >/</ url-pattern >
   </ servlet-mapping >
   <!-- 处理编码格式 -->
   < filter >
     < filter-name >characterEncodingFilter</ filter-name >
     < filter-class >org.springframework.web.filter.CharacterEncodingFilter</ filter-class >
     < init-param >
       < param-name >encoding</ param-name >
       < param-value >UTF-8</ param-value >
     </ init-param >
     < init-param >
       < param-name >forceEncoding</ param-name >
       < param-value >true</ param-value >
     </ init-param >
   </ filter >
   < filter-mapping >
     < filter-name >characterEncodingFilter</ filter-name >
     < url-pattern >/*</ url-pattern >
   </ filter-mapping >
</ web-app >

    2.4 SpringMvc 和applicationContext.xml

这里面的SpringMVC没什么特别,有需要的同学可以参考一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<? xml version = "1.0" encoding = "UTF-8" ?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
< beans xmlns = "http://www.springframework.org/schema/beans"
        xmlns:aop = "http://www.springframework.org/schema/aop"
        xmlns:context = "http://www.springframework.org/schema/context"
        xmlns:mvc = "http://www.springframework.org/schema/mvc"
        xmlns:tx = "http://www.springframework.org/schema/tx"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
     <!-- 启用MVC注解 -->
     < mvc:annotation-driven />
     <!-- 指定Sping组件扫描的基本包路径 -->
     < context:component-scan base-package = "com.Jayce" >
         <!-- 这里只扫描Controller,不可重复加载Service -->
         < context:include-filter type = "annotation" expression = "org.springframework.stereotype.Controller" />
     </ context:component-scan >
     <!-- JSP视图解析器-->
     < bean class = "org.springframework.web.servlet.view.InternalResourceViewResolver" >
         < property name = "prefix" value = "/WEB-INF/views/" />
         < property name = "suffix" value = ".jsp" />
         <!--  定义其解析视图的order顺序为1 -->
         < property name = "order" value = "1" />
     </ bean >
</ beans >

applicationContext.xml 主要使用来装载Bean,我们项目中并没有什么特别的Java Bean,因此只用来指出包扫描路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:amq = "http://activemq.apache.org/schema/core"
        xmlns:jms = "http://www.springframework.org/schema/jms"
        xmlns:context = "http://www.springframework.org/schema/context"
        xmlns:mvc = "http://www.springframework.org/schema/mvc"
        xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.14.3.xsd">
     < bean class = "org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor" />
     <!-- 配置扫描路径 -->
     < context:component-scan base-package = "com.Jayce" >
         <!-- 只扫描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加载 -->
        < context:exclude-filter type = "annotation" expression = "org.springframework.stereotype.Controller" />
     </ context:component-scan >
</ beans >

    2.5 applicationContext-ActiveMQ.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:amq = "http://activemq.apache.org/schema/core"
        xmlns:jms = "http://www.springframework.org/schema/jms"
        xmlns:context = "http://www.springframework.org/schema/context"
        xmlns:mvc = "http://www.springframework.org/schema/mvc"
        xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
>
     < context:component-scan base-package = "com.Jayce" />
     < mvc:annotation-driven />
     < amq:connectionFactory id = "amqConnectionFactory"
                            brokerURL = "tcp://192.168.148.128:61616"
                            userName = "admin"
                            password = "admin" />
     <!-- 配置JMS连接工长 -->
     < bean id = "connectionFactory"
           class = "org.springframework.jms.connection.CachingConnectionFactory" >
         < constructor-arg ref = "amqConnectionFactory" />
         < property name = "sessionCacheSize" value = "100" />
     </ bean >
     <!-- 定义消息队列(Queue) -->
     < bean id = "demoQueueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >
         <!-- 设置消息队列的名字 -->
         < constructor-arg >
             < value >Jaycekon</ value >
         </ constructor-arg >
     </ bean >
     <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
     < bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" >
         < property name = "connectionFactory" ref = "connectionFactory" />
         < property name = "defaultDestination" ref = "demoQueueDestination" />
         < property name = "receiveTimeout" value = "10000" />
         <!-- true是topic,false是queue,默认是false,此处显示写出false -->
         < property name = "pubSubDomain" value = "false" />
     </ bean >
     <!-- 配置消息队列监听者(Queue) -->
     < bean id = "queueMessageListener" class = "com.Jayce.Filter.QueueMessageListener" />
     <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
     < bean id = "queueListenerContainer"
           class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >
         < property name = "connectionFactory" ref = "connectionFactory" />
         < property name = "destination" ref = "demoQueueDestination" />
         < property name = "messageListener" ref = "queueMessageListener" />
     </ bean >
</ beans >

这里和大家讲解一下这个配置文件,如果大家能够从上述配置文件中看懂,可以跳过。同学们也可以在ActiveMQ官网中的查看。

1、ActiveMq 中的DTD,我们在声明相关配置之前,我们需要先导入ActiveMq 中的DTD,不然Spring 并不理解我们的标签是什么意思。

http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd

我们在pom.xml 文件中有配置了activemq 的版本依赖我们这里的版本,需要和依赖的版本一样,不然是找不到相关的dtd

2、amq:connectionFactory:很直白的一个配置项,用于配置我们链接工厂的地址和用户名密码,这里需要注意的是选择tcp连接而不是http连接

3、jmsTemplate:比较重要的一个配置,这里指定了连接工厂,默认消息发送目的地,还有连接时长,发布消息的方式

3、项目结构


  3.1 ProducerService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.Jayce.Service;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
  * Created by Administrator on 2017/1/5.
  */
@Service
public class ProducerService {
     @Resource (name= "jmsTemplate" )
     private JmsTemplate jmsTemplate;
     public void sendMessage(Destination destination, final String msg){
         System.out.println(Thread.currentThread().getName()+ " 向队列" +destination.toString()+ "发送消息---------------------->" +msg);
         jmsTemplate.send(destination, new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 return session.createTextMessage(msg);
             }
         });
     }
     public void sendMessage( final String msg){
         String destination = jmsTemplate.getDefaultDestinationName();
         System.out.println(Thread.currentThread().getName()+ " 向队列" +destination+ "发送消息---------------------->" +msg);
         jmsTemplate.send( new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 return session.createTextMessage(msg);
             }
         });
     }
}

将消息生产者做成一个服务,当我们需要发送消息的时候,只需要调用ProducerService实例中的sendMessage 方法就可以向默认目的发送一个消息。

这里提供了两个发送方式,一个是发送到默认的目的地,一个是根据目的地发送消息。

有兴趣的同学可以和我上一篇文章《ActiveMq实战》中ActiveMq 发送消息的方式对比一下,可以发现一些不同。

   3.2 ConsumerService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.Jayce.Service;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
  * Created by Administrator on 2017/1/5.
  */
@Service
public class ConsumerService {
     @Resource (name= "jmsTemplate" )
     private JmsTemplate jmsTemplate;
     public TextMessage receive(Destination destination){
         TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
         try {
             System.out.println( "从队列" + destination.toString() + "收到了消息:\t"
                     + textMessage.getText());
         } catch (JMSException e) {
             e.printStackTrace();
         }
         return textMessage;
     }
}

因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmsTemplate中的 receive 方法,就可以从里面获取到一条消息。

再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样ActiveMQ中才会确定这条消息已经被正确读取了。而整合了Spring之后,事务将由Spring 来管理。

   3.3 MessageController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.Jayce.Controller;
import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;
/**
  * Created by Administrator on 2017/1/5.
  */
@Controller
public class MessageController {
     private Logger logger = LoggerFactory.getLogger(MessageController. class );
     @Resource (name = "demoQueueDestination" )
     private Destination destination;
     //队列消息生产者
     @Resource (name = "producerService" )
     private ProducerService producer;
     //队列消息消费者
     @Resource (name = "consumerService" )
     private ConsumerService consumer;
     @RequestMapping (value = "/SendMessage" , method = RequestMethod.POST)
     @ResponseBody
     public void send(String msg) {
         logger.info(Thread.currentThread().getName()+ "------------send to jms Start" );
         producer.sendMessage(msg);
         logger.info(Thread.currentThread().getName()+ "------------send to jms End" );
     }
     @RequestMapping (value= "/ReceiveMessage" ,method = RequestMethod.GET)
     @ResponseBody
     public Object receive(){
         logger.info(Thread.currentThread().getName()+ "------------receive from jms Start" );
         TextMessage tm = consumer.receive(destination);
         logger.info(Thread.currentThread().getName()+ "------------receive from jms End" );
         return tm;
     }
}

控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。

现在服务层和控制层都好了,接下来我们就进行一个简单的测试

4、项目测试


  4.1 启动ActiveMq

先确定你的ActiveMQ服务已经开启。

  4.2 启动项目

项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有需要的同学可以使用一下。

1
2
3
4
5
6
7
8
9
10
< plugins >
       < plugin >
         < groupId >org.apache.tomcat.maven</ groupId >
         < artifactId >tomcat7-maven-plugin</ artifactId >
         < configuration >
           < port >8080</ port >
           < path >/</ path >
         </ configuration >
       </ plugin >
</ plugins >

  4.3 发送消息

这里用了Chrome 的一个插件PostMan 有兴趣的同学可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!

我们发送了一个post 请求之后,看一下服务器的效果:

我们可以看到,已经向队列发送了一条消息。我们看一下ActiveMq现在的状态:

我们可以看到,一条消息已经成功发送到了ActiveMq中。

  4.4 接收消息

使用get请求访问服务器后台:

服务的输出:

ActiveMq服务器状态:

我们可以看到,消费者已经消费了一条信息,并且没有断开与ActiveMq之间的链接。

  4.5 监听器

在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到ActiveMq了,可以用一个Redis 就足够了。

不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。

   4.5.1 applicationContext-ActiveMQ.xml 配置

在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。

1
2
3
4
5
6
7
8
9
10
<!-- 配置消息队列监听者(Queue) -->
     < bean id = "queueMessageListener" class = "com.Jayce.Filter.QueueMessageListener" />
     <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
     < bean id = "queueListenerContainer"
           class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >
         < property name = "connectionFactory" ref = "connectionFactory" />
         < property name = "destination" ref = "demoQueueDestination" />
         < property name = "messageListener" ref = "queueMessageListener" />
     </ bean >

   4.5.2 MessageListener

我们需要创建一个类实现MessageListener 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.Jayce.Filter;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
  * Created by Administrator on 2017/1/5.
  */
public class QueueMessageListener implements MessageListener {
     public void onMessage(Message message) {
         TextMessage tm = (TextMessage) message;
         try {
             System.out.println( "QueueMessageListener监听到了文本消息:\t"
                     + tm.getText());
             //do something ...
         } catch (JMSException e) {
             e.printStackTrace();
         }
     }
}

实现接口的onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。

   4.5.3 测试

和上面一样,使用postMan 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:

再看看ActiveMQ服务器的状态:

我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。

这样子一整个项目下来,我们已经成功的整合了Spring和ActiveMQ。

  4.6 压力测试

这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.Jaycekon.test;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
  * Created by Administrator on 2017/1/5.
  */
public class Client {
     @Test
     public void test() {
         HttpClient httpClient = new HttpClient();
         new Thread( new Sender(httpClient)).start();
     }
}
class Sender implements Runnable {
     public static AtomicInteger count = new AtomicInteger( 0 );
     HttpClient httpClient;
     public Sender(HttpClient client) {
         httpClient = client;
     }
     public void run() {
             try {
                 System.out.println(Thread.currentThread().getName()+ "---Send message-" +count.getAndIncrement());
                 PostMethod post = new PostMethod( "http://127.0.0.1:8080/SendMessage" );
                 post.addParameter( "msg" , "Hello world!" );
                 httpClient.executeMethod(post);
                 System.out.println(Thread.currentThread().getName()+ "---Send message Success-" +count.getAndIncrement());
             } catch (IOException e) {
                 e.printStackTrace();
             }
         }
     }

这里面用了HttpClient 来向服务器发送Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。

5、项目源码

github:https://github.com/jaycekon/Crawl-Page

转载于:https://www.cnblogs.com/wanghuaijun/p/6768095.html

Java消息队列-Spring整合ActiveMq相关推荐

  1. java 消息队列详解_Java消息队列-Spring整合ActiveMq的详解

    本篇文章主要介绍了详解Java消息队列-Spring整合ActiveMq ,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 1.概述 首先和大家一起回顾一下Java 消息服 ...

  2. activimq java集成_Java消息队列-Spring整合ActiveMq

    1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Jav ...

  3. Spring整合ActiveMQ完成消息队列MQ编程

    <–start–> 第一步:新建一个maven,将工程命名为activeMQ_spring.在pom.xml文件中导入相关jar包. ①spring开发和测试相关的jar包: spring ...

  4. ActiveMQ —— Spring 整合 ActiveMQ

    前文 消息中间件 -- 简介 ActiveMQ 下载.安装 ActiveMQ -- Java 连接 ActiveMQ(点对点) ActiveMQ -- Java 连接 ActiveMQ(发布订阅 To ...

  5. Java 消息队列、缓存、同步(个人理解:空谈)

    Java 消息队列.缓存.同步. 消息队列 我的理解:消息队列,将消息存入消息队列,然后就OK了. 系统之间原先调用通用接口,但引入了消息队列后,系统之间调用MQ消息队列. 好处:响应快,能累积请求, ...

  6. Java消息队列--ActiveMq 初体验

    1.下载安装ActiveMQActiveMQ官网下载地址:http://activemq.apache.org/download.htmlActiveMQ 提供了Windows 和Linux.Unix ...

  7. Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...

  8. java 消息队列服务_ActiveMQ 消息队列服务

    1 ActiveMQ简介 1.1 ActiveMQ是什么 ActiveMQ是一个消息队列应用服务器(推送服务器).支持JMS规范. 1.1.1 JMS概述 全称:Java Message Servic ...

  9. 消息队列技术介绍 : ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ

    一. 消息队列概述 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合.异步消息.流量削锋等问 ...

最新文章

  1. C++ 笔记(12)— 判断(if/if...else/switch、条件运算符)
  2. idea代码区分成两屏显示
  3. div超出不换行_一日一技:XPath不包含应该怎么写?
  4. 一个高成熟度组织的规程和指南目录
  5. StringWriter/PrintWriter在Java输出异常信息中的作用
  6. Java黑皮书课后题第7章:7.8(求数组的平均值)使用下面的方法头编写两个重载的方法,返回数组的平均数。编写一个测试程序,提示用户输入10个double型值,然后调用这个方法显示平均值
  7. SAP Spartacus 定义在app.module.ts里的providers依赖注入元数据何时得到处理
  8. javascript原型_JavaScript的原型:古怪,但这是它的工作原理
  9. (36)FPGA面试技能提升篇(FPGA行业产品)
  10. mysql swarm_【Docker】 Swarm简单介绍
  11. win7眼睛保护色设置方法
  12. 苹果商店打不开怎么办_苹果手机迅雷版,iOS迅雷Beta内测版企业版安装下载
  13. 大端模式和小端模式之“终极记忆”
  14. 如何把root登陆的shell改为csh?
  15. WPFLoading遮层罩
  16. 再论iPhone Push Notification
  17. 偶现BUG的处理方式
  18. unity获取麦克风音量_深入探究Valve Index的耳机、麦克风设计过程
  19. 币圈人警惕!5大错误足以摧毁你的一切
  20. Ant Design of Vue 中 日期时间控件 禁止选中的(日期——)设置

热门文章

  1. 程序员软考刷题笔记——软件开发和运行维护基础知识
  2. CSS中最全的换行处理方式
  3. Nginx以及通过Nginx实现tomcat集群配置与负载均衡
  4. 演讲笔记 适合所有人的实用程序生成 PCG
  5. 20180416-D · Global Mortality · gt 包用于生成表 · R 语言数据可视化 案例 源码
  6. html 引用全局变量,全局变量在整个源程序文件中都有效
  7. 使用主定理求时间复杂度
  8. 2022-顺丰科技智慧物流校园技术挑战赛题解
  9. 连接远程应用服务器appserver出错,Windows Server 2008 RemoteApp(一)---部署远程桌面服务器...
  10. 陌陌终于入局直播答题!如何在激烈巷战中杀出血路?