activimq java集成_Java消息队列-Spring整合ActiveMq
1、概述
首先和大家一起回顾一下Java 消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:
消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现。
优势:异步、可靠
消息模型:点对点,发布/订阅
JMS中的对象
然后在另一篇博客《Java消息队列-ActiveMq实战》中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解:
多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通过JDBC和journal提供高速的消息持久化
从设计上保证了高性能的集群,客户端-服务器,点对点
支持Ajax
支持与Axis的整合
可以很容易得调用内嵌JMS provider,进行测试
在接下来的这篇博客中,我会和大家一起来整合Spring 和ActiveMq,这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。
2、目录结构
2.1 项目目录
IDE选择了IDEA(建议大家使用),为了避免下载jar 的各种麻烦,底层使用maven搭建了一个项目,整合了Spring 和ActiveMq
2.2 pom.xml
View Code
因为这里pom.xml 文件有点长,就不展开了。
我们可以看到其实依赖也就几个,1、Spring 核心依赖 2、ActiveMq core和pool(这里如果同学们选择导入jar,可以直接导入我们上一篇博客中说道的那个activemq-all 这个jar包)3、java servlet 相关依赖
这里面我们选择的ActiveMq pool 的依赖版本会和之后的dtd 有关系,需要版本对应,所以同学们等下配置activemq 文件的时候,需要注意dtd 版本选择
2.3 web.xml
web.xml 也大同小异,指定Spring 配置文件,springMvc 命名,编码格式
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
Archetype Created Web Application
contextConfigLocation
classpath:applicationContext*.xml;
org.springframework.web.context.ContextLoaderListener
springMVC
org.springframework.web.servlet.DispatcherServlet
contextConfigLocation
classpath:spring-mvc.xml
1
springMVC
/
characterEncodingFilter
org.springframework.web.filter.CharacterEncodingFilter
encoding
UTF-8
forceEncoding
true
characterEncodingFilter
/*
2.4 SpringMvc 和applicationContext.xml
这里面的SpringMVC没什么特别,有需要的同学可以参考一下:
View Code
applicationContext.xml 主要使用来装载Bean,我们项目中并没有什么特别的Java Bean,因此只用来指出包扫描路径:
View Code
2.5 applicationContext-ActiveMQ.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
>
brokerURL="tcp://192.168.148.128:61616"
userName="admin"
password="admin" />
class="org.springframework.jms.connection.CachingConnectionFactory">
Jaycekon
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
这里和大家讲解一下这个配置文件,如果大家能够从上述配置文件中看懂,可以跳过。同学们也可以在ActiveMQ官网中的查看。
1、ActiveMq 中的DTD,我们在声明相关配置之前,我们需要先导入ActiveMq 中的DTD,不然Spring 并不理解我们的标签是什么意思。
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd
我们在pom.xml 文件中有配置了activemq 的版本依赖我们这里的版本,需要和依赖的版本一样,不然是找不到相关的dtd
2、amq:connectionFactory:很直白的一个配置项,用于配置我们链接工厂的地址和用户名密码,这里需要注意的是选择tcp连接而不是http连接
3、jmsTemplate:比较重要的一个配置,这里指定了连接工厂,默认消息发送目的地,还有连接时长,发布消息的方式
3、项目结构
3.1 ProducerService
package com.Jayce.Service;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
/**
* Created by Administrator on 2017/1/5.
*/
@Service
public class ProducerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination,final String msg){
System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
public void sendMessage(final String msg){
String destination = jmsTemplate.getDefaultDestinationName();
System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
}
将消息生产者做成一个服务,当我们需要发送消息的时候,只需要调用ProducerService实例中的sendMessage 方法就可以向默认目的发送一个消息。
这里提供了两个发送方式,一个是发送到默认的目的地,一个是根据目的地发送消息。
有兴趣的同学可以和我上一篇文章《ActiveMq实战》中ActiveMq 发送消息的方式对比一下,可以发现一些不同。
3.2 ConsumerService
package com.Jayce.Service;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
* Created by Administrator on 2017/1/5.
*/
@Service
public class ConsumerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
public TextMessage receive(Destination destination){
TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
try{
System.out.println("从队列" + destination.toString() + "收到了消息:\t"
+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
return textMessage;
}
}
因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmsTemplate中的 receive 方法,就可以从里面获取到一条消息。
再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样ActiveMQ中才会确定这条消息已经被正确读取了。而整合了Spring之后,事务将由Spring 来管理。
3.3 MessageController
package com.Jayce.Controller;
import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;
/**
* Created by Administrator on 2017/1/5.
*/
@Controller
public class MessageController {
private Logger logger = LoggerFactory.getLogger(MessageController.class);
@Resource(name = "demoQueueDestination")
private Destination destination;
//队列消息生产者
@Resource(name = "producerService")
private ProducerService producer;
//队列消息消费者
@Resource(name = "consumerService")
private ConsumerService consumer;
@RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
@ResponseBody
public void send(String msg) {
logger.info(Thread.currentThread().getName()+"------------send to jms Start");
producer.sendMessage(msg);
logger.info(Thread.currentThread().getName()+"------------send to jms End");
}
@RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
@ResponseBody
public Object receive(){
logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
TextMessage tm = consumer.receive(destination);
logger.info(Thread.currentThread().getName()+"------------receive from jms End");
return tm;
}
}
控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。
现在服务层和控制层都好了,接下来我们就进行一个简单的测试
4、项目测试
4.1 启动ActiveMq
先确定你的ActiveMQ服务已经开启。
4.2 启动项目
项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有需要的同学可以使用一下。
org.apache.tomcat.maven
tomcat7-maven-plugin
8080
/
4.3 发送消息
这里用了Chrome 的一个插件PostMan 有兴趣的同学可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!
我们发送了一个post 请求之后,看一下服务器的效果:
我们可以看到,已经向队列发送了一条消息。我们看一下ActiveMq现在的状态:
我们可以看到,一条消息已经成功发送到了ActiveMq中。
4.4 接收消息
使用get请求访问服务器后台:
服务的输出:
ActiveMq服务器状态:
我们可以看到,消费者已经消费了一条信息,并且没有断开与ActiveMq之间的链接。
4.5 监听器
在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到ActiveMq了,可以用一个Redis 就足够了。
不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。
4.5.1 applicationContext-ActiveMQ.xml 配置
在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
4.5.2 MessageListener
我们需要创建一个类实现MessageListener 接口:
package com.Jayce.Filter;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created by Administrator on 2017/1/5.
*/
public class QueueMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("QueueMessageListener监听到了文本消息:\t"
+ tm.getText());
//do something ...
} catch (JMSException e) {
e.printStackTrace();
}
}
}
实现接口的onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。
4.5.3 测试
和上面一样,使用postMan 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:
再看看ActiveMQ服务器的状态:
我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。
这样子一整个项目下来,我们已经成功的整合了Spring和ActiveMQ。
4.6 压力测试
这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:
package com.Jaycekon.test;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Administrator on 2017/1/5.
*/
public class Client {
@Test
public void test() {
HttpClient httpClient = new HttpClient();
new Thread(new Sender(httpClient)).start();
}
}
class Sender implements Runnable {
public static AtomicInteger count = new AtomicInteger(0);
HttpClient httpClient;
public Sender(HttpClient client) {
httpClient = client;
}
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
post.addParameter("msg", "Hello world!");
httpClient.executeMethod(post);
System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());
} catch (IOException e) {
e.printStackTrace();
}
}
}
这里面用了HttpClient 来向服务器发送Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。
activimq java集成_Java消息队列-Spring整合ActiveMq相关推荐
- java 消息队列详解_Java消息队列-Spring整合ActiveMq的详解
本篇文章主要介绍了详解Java消息队列-Spring整合ActiveMq ,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 1.概述 首先和大家一起回顾一下Java 消息服 ...
- Java消息队列-Spring整合ActiveMq
1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个或多个程序之间的耦合,底层由Jav ...
- Spring整合ActiveMQ完成消息队列MQ编程
<–start–> 第一步:新建一个maven,将工程命名为activeMQ_spring.在pom.xml文件中导入相关jar包. ①spring开发和测试相关的jar包: spring ...
- 【Java集成小米消息推送(海外版)】
Java集成小米消息推送(海外版) 注册登录: 小米推送运营中心 提交需要的信息,一般为包名等信息(或直接让Android开发提交) 需要的配置信息 1.App Secret 2.Package na ...
- ActiveMQ —— Spring 整合 ActiveMQ
前文 消息中间件 -- 简介 ActiveMQ 下载.安装 ActiveMQ -- Java 连接 ActiveMQ(点对点) ActiveMQ -- Java 连接 ActiveMQ(发布订阅 To ...
- java 消息队列_java消息队列
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. ...
- SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- 【无废话】SpringBoot集成Kafka消息队列
0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...
最新文章
- Android如何防止apk程序被反编译
- docker基本操作
- sphinx配置文件继承
- 10.24T3 解方程 取模意义下运算+秦九韶算法
- 如何跨域取到response额外的的headers
- 30岁学python编程_朋友问我,你都30岁了学编程来得及吗
- PAT_B_1012 数字分类 (有待改进)
- 女博士年薪156万入职华为!实力演绎美貌与智慧并重
- For the completeness of the story
- Javascript ES6 Set、Map、Proxy、Symbol
- 这样去分析大盘才能稳赚不亏best
- Python Re 模块超全解读
- Iocomp for .net/wpf the Crack
- 2345好压去热点广告
- 左连接的表中有多条数据如何只取一条(最大或最小)
- 当天邀请的饭局要参加吗?别说“我有安排”,高手都懂这3个礼数
- BUUCTF 每日打卡 2021-4-5
- python dataframe 写入到doc文件_将Python Pandas DataFrame写入Word文档
- 研究生计算机专业的方向有哪些?
- 2019年上海市数学建模讲座笔记(1)竞赛真题讲解
热门文章
- antvue 有赞布局_UI大全:前端UI框架集合(持续更新,当前32个)
- Linux查找树莓派ip地址,让树莓派“说”出自己的IP地址
- 威纶通HMI常见问题
- 计算机科学与技术学了会秃头吗,计算机科学与技术vs 信息与计算科学,秃头专业了解一下?...
- mysql中修改表的还原命令_MySQL的增、删、改、查和备份、恢复的命令
- ElasticSearch权威指南学习(索引管理)
- #题目:GCD XOR UVA - 12716
- linux/unix编程手册-6_10
- Java 的 ArrayList 的底层数据结构
- 2013 8.20 ip地址的计算总结