1  JMS消息中间件

2.1 消息中间件介绍

什么是中间件?

redis缓存服务器就是一个中间件。独立于系统之外的一个服务器

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java DatabaseConnectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等实现高性能,高可用,可伸缩和最终一致架构。

使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ, Kafka,MetaMQ,RocketMQ

也叫作消息服务中间件,消息容器!

使用java消息服务解决的问题?

主要是在分布式系统中,通过消息服务系统解决高并发访问的情况处理,提高系统处理效率。

2.2 消息队列应用场景

MQ Message Queue 就是消息队列。

以下介绍消息队列在实际应用中常用的四个使用场景

A.  异步处理,

B.  应用解耦,

C.  流量削锋

D.  消息通讯

2.2.1   异步处理

场景说明:

用户注册后,需要发注册邮件和注册短信。

传统的做法有两种:

1串行的方式;

2.并行方式

(1)     串行方式

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

(2)     并行方式.

将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。

小结

如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下

(3)     消息队列方式

用户等待的响应时间55ms,大大提升。

使用消息队列提高响应速度

2.2.2   应用解耦

场景说明:

用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图

传统模式的缺点:

l  假如库存系统无法访问,则订单减库存将失败,从而导致订单失败

如何解决以上问题呢?

l  订单系统与库存系统耦合

引入应用消息队列后的方案,如下图:

l  订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

l  库存系统订阅下单的消息,采用pub/sub(发布/订阅)的方式,获取下单信息,库存系统根据下单信息,进行库存操作

l  假如在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.2.3   流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:

秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个回题,一般需要在应用前端加入消息队列。

模型图

模型图说明

l  可以控制活动的人数

l  可以缓解短时间内高流量压垮应用

l  用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面

l  秒杀业务根据消息队列中的请求信息,再做后续处理

2.2.4   日志处理

目志处理是指将消息队列用在目志处理中,比如Kafka的应用,解决大量日志传输的问题。

架构简化如下

日志采集客户端,负责目志数据采集,定时写受写入Kafka队列

Kafka消息队列,负责目志数据的接收,存储和转发

目志处理应用:订阅并消费kafka队列中的目志数据

(1)     Kafka :接收用户目志的消息队列

(2)     Logstash:做目志解析,统一成JSON输出给Elasticsearch

(3)     Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能

(4)     Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因

ELK平台:

E: Elasticsearch搜索服务

L:  Logstash 日志解析

Kibana  可视化视图组件

ELK平台部署参考:    《开源实时日志分析ELK平台部署.docx》

2.2.5   消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊大室等。

(1)     点对点通讯:

客户端A和客户端B使用同一队列,进行消息通讯。

(2)     聊大室通讯:

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊大室效果。

以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

3  JMS消息服务相关概念

消息队列的JAVAEE规范JMS。JMS (Java MessageService,java消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

3.1 消息模型

3.1.1 概述

在JMS标准中,有两种消息模型:

(1)    P2P (Point to Point)   点对点模型(Queue队列模型)

(2)    Publish/Subscribe(Pub/Sub) 发布/订阅模型(Topic主题模型)

3.1.2 P2P 模型

l  P2P 模型图

l  涉及概念

(1)     消息队列(Queue),

(2)     发送者(Sender),

(3)     接收者(Receiver).  消费者。

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

l  P2P的特点

(1)     每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)

(2)     发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

(3)     接收者在成功接收消息之后需向队列应答成功

(4)     如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。

3.1.3 Pub/Sub模式

l  Pub/Sub模式图

l  涉及概念

(1)     主题(Topic),

(2)     发布者(Publisher),

(3)     订阅者(Subscriber)

多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

l  Pub/Sub的特点

(1)     每个消息可以有多个消费者

(2)     发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者, 它必须创建一个订阅者之后,才能消费发布者的消息

(3)     为了消费消息,订阅者必须保持运行的状态

(4)     为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

(5)     如果希望发送的消息可以不被做任何处理、以及只被一个消息者处或者被多个消费者处理的话,那么可以采用Pub/Sub模型。(群发)

3.1.4 小结

3.2 消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

(1)     同步

订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻寨;

消费者等待消费: 如果队列/主题中没有消息,那就一直等待。知道有消息消费就结束。

(2)     异步

订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

消费者监听消息。

JNDI:java命名和目录接口是一种标准的]ava命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

JNDI 在JMS中起到查找和访回发送目标或消息来源的作用。

3.3 JMS编程模型 API

(1)     ConnectionFactory

创建connection对象的工厂,针对两种不同的jms消息模型,分别有 QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找 ConnectionFactory对象。

(2)     Destination

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic),对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象· Queue、Topic可以通过JNDI来查找 Destination。

(3)     Connection

connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。

Connection可以产生一个或多个Session,跟ConnectionFactory一样,Connection 也有两种类型:Queueconnection和TopicConnection

(4)     session

session是操作消息的接口。可以通过session创建生产者、消费者、消息等。session 提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession.

(5)     消息的生产者

消息生产者由session创建,并用于将消息发送到Destination.同样,消息生产者分两种类型:QueueSender和TopicPublisher可以调用消息生产者的方法(send或publish 方法)发送消息。

(6)      消息消费者

消息消费者由session创建,用于接收被发送到Destination的消息。两种类型 QueueReceiver和TopicSubscriber可分别通过session的createReceiver(Queue) 或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

(7)     MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage 方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

4  消息队列ActiveMQ

一般商用的容器,比如WebLogic,JBoss,都支持JMS标准,开发上很方便。但免费的比如Tomcat,jetty等则需要使用第三方的消息中间件。本部分内容介绍常用的消息中间件(Active MQ,RabbitMQ,Zero MQ,Kafka)以及他们的特点。

4.1 ActiveMQ

ActiveMQ是Apache出品最流行的,能力强劲的开源消息总线。

ActiveMQ是一个完全支持]JMS1.1和j2EE 1 4规范的JMSProvider实现。尽管JMS规范出台己经是很久的事情了,但是JMS在当今的] 2EE应用中间仍然扮演着特殊的地位。

MQ全称为MessageQueue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。

ActiveMQ特性如下.

(1)     多种语言和协议编写客户端。

(2)     语言:Java,C,C++,C#,Ruby,Perl,Python,PHP

(3)     应用协议OpenWire,StompREST,WS Notification,XMPP,AMQP

(4)     完全支持JSM1.1和J2EE 1 4规范(持久化,XA消息,事务)

(5)     对Spring 的支持,ActiveMQ可以很容易内嵌到使用spring的系统里面去,而且也支持Spring2.0及更高版本的特性

(6)     通过了常见J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA1.5 resourceadaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE1.4的商业服务器上。

(7)     持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA

(8)     支持通过JDBC和journal提供高速的消息持久化

(9)     从设计上保证了高性能的集群,客户端-服务器,点对点

(10)    支持Ajax

(11)    支持与Axis的整合

(12)    可以很容易得调用内嵌JMS provider,进行测试

4.2 ActivieMQ 下载安装

点击download,

l  下载:

apache-activemq-5.14.5-bin.zip

l  解压:

apache-activemq-5.14.5\bin\win64\activemq.bat

l  启动访问:

账号密码都是admin

http://localhost:8161/

4.2.1 依赖

ilcps_parent 中的pom.xml配置文件以及包含下面的坐标

4.3  Queue 消息发送与接收

4.3.1 Queue消息发送

package cn.itcast.jms.test;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 发送消息到Queue队列中。

* Queue名称是: hello

*/

publicclass Queue_1_sender {

// 发送消息

publicstaticvoid main(String[] args) throws Exception {

//1. 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");

//2. 创建连接

Connection conn = connectionFactory.createConnection();

// 开启连接

conn.start();

//3. 创建session

// 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。

// 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器)

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4. 创建消息发送的目的地对象

//interface javax.jms.Queue extends javax.jms.Destination

Queue queue = session.createQueue("hello");

//5. 创建消息

TextMessage msg = session.createTextMessage();

msg.setText("发送Queue消息到hello队列中.....");

//6. 消息生产者

MessageProducer messageProducer = session.createProducer(queue);

//7. 发送消息

messageProducer.send(msg);

// 关闭

session.close();

conn.close();

}

}

4.3.2 Queue消息消费

4.3.2.1    方式1:调用receive() 方法

package cn.itcast.jms.test;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 消费Queue队列中的消息:

* Queue名称是: hello

*/

publicclass Queue_2_consumer {

// 发送消息

publicstaticvoid main(String[] args) throws Exception {

//1. 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");

//2. 创建连接

Connection conn = connectionFactory.createConnection();

// 开启连接

conn.start();

//3. 创建session

// 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。

// 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器)

Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

//4. 创建消息发送的目的地对象

//interface javax.jms.Queue extends javax.jms.Destination

Queue queue = session.createQueue("hello");

//6. 创建消费者

MessageConsumer messageConsumer = session.createConsumer(queue);

//7. 消费消息  (同步)

// receive() 调用这个方法,如果容器中没有消息,线程处于阻塞状态。直到有消息才结束当前线程。

// receive(5000)  从容器中如果没有拿到消息的等待时间 5秒

Message message = messageConsumer.receive(5000);

if (message != null) {

TextMessage msg = (TextMessage) message;

System.out.println("----->" + msg.getText());

}

// 提交事务、关闭

session.commit();

session.close();

conn.close();

}

}

 

4.3.2.2    方式2:监听器方式消费消息

package cn.itcast.jms.test;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 消费Queue队列中的消息:

* Queue名称是: hello

*/

publicclass Queue_2_consumer2_listener {

// 发送消息

publicstaticvoid main(String[] args) throws Exception {

//1. 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");

//2. 创建连接

Connection conn = connectionFactory.createConnection();

// 开启连接

conn.start();

//3. 创建session

// 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。

// 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器)

Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

//4. 创建消息发送的目的地对象

//interface javax.jms.Queue extends javax.jms.Destination

Queue queue = session.createQueue("hello");

//6. 创建消费者

MessageConsumer messageConsumer = session.createConsumer(queue);

//7. 消费消息  (监听器消费消息,异步)

messageConsumer.setMessageListener(new MessageListener() {

publicvoid onMessage(Message message) {

TextMessage msg = (TextMessage) message;

try {

System.out.println(msg.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

});

// 保持监听器的运行

while(true){}

/* 提交事务、关闭

session.commit();

session.close();

conn.close();

*/

}

}

4.3.3 Topic 消息发送与接收

4.3.4   Topic消息发送

/**

* 发送消息到Topic主题消息中

*/

publicclass Topic_1_sender {

// 发送消息

publicstaticvoid main(String[] args) throws Exception {

//1. 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

//2. 创建连接

Connection conn = connectionFactory.createConnection();

// 开启连接

conn.start();

//3. 创建session

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4. 创建消息发送的目的地对象

Topic topic = session.createTopic("hello2");

//5. 创建消息

TextMessage msg = session.createTextMessage();

msg.setText("发送Topic消息到hello2队列中.....");

//6. 消息生产者

MessageProducer messageProducer = session.createProducer(topic);

//7. 发送消息

messageProducer.send(msg);

// 关闭

session.close();

conn.close();

}

}

4.3.5   Topic消息接收

/**

* 消费消息

*/

publicclass Topic_2_consumer {

// 发送消息

publicstaticvoid main(String[] args) throws Exception {

//1. 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

//2. 创建连接

Connection conn = connectionFactory.createConnection();

// 开启连接

conn.start();

//3. 创建session

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4. 创建消息发送的目的地对象

Topic topic = session.createTopic("hello2");

//5. 创建消息消费者

MessageConsumer messageConsumer = session.createConsumer(topic);

//6. 消费消息

messageConsumer.setMessageListener(new MessageListener() {

@Override

publicvoid onMessage(Message message) {

try {

TextMessage msg = (TextMessage) message;

System.out.println("消费消息成功---->" + msg.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

});

// 保持监听器运行

while(true){}

}

}

4.4 多个消费者测试

4.4.1 队列消息

l  需求:有10个Queue队列消息,2个消费者,

l  每个消费者消费多少个消息?

5个。  消息容器会确保消息平均分配给每个消费者。

1.发送10个消息

/**

* 发送多个消息测试

*/

publicclass Queue_1_sender {

// 发送消息

publicstaticvoid main(String[] args) throws Exception {

//1. 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");

//2. 创建连接

Connection conn = connectionFactory.createConnection();

// 开启连接

conn.start();

//3. 创建session

// 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。

// 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器)

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4. 创建消息发送的目的地对象

//interface javax.jms.Queue extends javax.jms.Destination

Queue queue = session.createQueue("hello");

//6. 消息生产者

MessageProducer messageProducer = session.createProducer(queue);

//7. 发送消息

for (inti=1; i<=10; i++) {

MapMessage msg = session.createMapMessage();

msg.setString("email", i+"610731230@qq.com");

msg.setString("phone", i+"18665591009");

// 发送多个消息

messageProducer.send(msg);

}

// 关闭

session.close();

conn.close();

}

}

2.消费者1

/**

* 消费者1

*/

publicclass Queue_2_consumer1 {

// 发送消息

publicstaticvoid main(String[] args) throws Exception {

//1. 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");

//2. 创建连接

Connection conn = connectionFactory.createConnection();

// 开启连接

conn.start();

//3. 创建session

// 参数1:是否需要事务环境,如果为true表示需要事务环境,最后发送消息后需要提交事务。

// 参数2:自动应答机制(表示从容器消费消息后自动通知消息容器)

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4. 创建消息发送的目的地对象

//interface javax.jms.Queue extends javax.jms.Destination

Queue queue = session.createQueue("hello");

//6. 创建消费者

MessageConsumer messageConsumer = session.createConsumer(queue);

//7. 消费消息  (监听器消费消息,异步)

messageConsumer.setMessageListener(new MessageListener() {

publicvoid onMessage(Message message) {

MapMessage msg = (MapMessage) message;

try {

String email = msg.getString("email");

String phone = msg.getString("phone");

System.out.println(email + "," + phone);

} catch (JMSException e) {

e.printStackTrace();

}

}

});

// 保持监听器的运行

while(true){}

}

}

3. 消费者2

同消费者1代码一样。

4. 测试

(先运行消费者)

4.4.2   主题消息

l  需求:有10个Topic主题消息,2个消费者,

l  每个消费者消费多少个消息?

10个。 Topic主题消息特点,一个消息可以被多个消费者消费。

1.发送10个消息

2.消费者1

3. 消费者2

4. 测试

(先运行消费者)

5  Spring整合ActiveMQ  (重要)

5.1  整合配置,实现发送消息

5.1.1 发送消息配置

l  关键点

n  创建ActiveMQ连接工厂

n  创建缓存工厂

n  创建JmsTemplate

l  配置实现

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"

xmlns:amq="http://activemq.apache.org/schema/core"

xmlns:jms="http://www.springframework.org/schema/jms"

xsi:schemaLocation="

        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd

        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd

        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd

        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd

        http://www.springframework.org/schema/data/jpa

        http://www.springframework.org/schema/data/jpa/spring-jpa.xsd

        http://www.springframework.org/schema/jms

        http://www.springframework.org/schema/jms/spring-jms.xsd

        http://activemq.apache.org/schema/core

        http://activemq.apache.org/schema/core/activemq-core.xsd">

<!--

Spring整合ActiveMQ,实现消息发送

   1. 创建ActiveMQ连接工厂

           2. 创建缓存工厂

           3. 创建JmsTemplate

-->

<!-- 1. 创建ActiveMQ连接工厂 -->

<amq:connectionFactory id="amqConnectionFactory"

userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>

<!-- 2. 创建缓存工厂 -->

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

<!-- 注入连接工厂-->

<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>

<!-- session缓存数目 -->

<property name="sessionCacheSize" value="5"></property>

</bean>

<!-- 3. 创建JmsTemplate(发送消息的模板工具类对象) -->

<!-- 3.1发送Queue队列消息 -->

<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">

<!-- 注入缓存工厂 -->

<property name="connectionFactory" ref="cachingConnectionFactory"></property>

<!-- 默认值 -->

<property name="pubSubDomain" value="false"></property>

</bean>

<!-- 3.2发送Topic主题消息 -->

<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">

<!-- 注入缓存工厂 -->

<property name="connectionFactory" ref="cachingConnectionFactory"></property>

<!-- 设置消息模型为主题消息 -->

<property name="pubSubDomain" value="true"></property>

</bean>

</beans>

5.1.2 发送消息代码

l  通过JmsTemplate,实现消息发送

package cn.itcast.jms.test3_spring;

import javax.annotation.Resource;

import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.Session;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**

* Spring整合ActiveMQ发送消息

*/

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("classpath:applicationContext-mq-send.xml")

publicclass Sender {

// 注入发送Queue、Topic消息的模板对象

@Resource

private JmsTemplate jmsQueueTemplate;

@Resource

private JmsTemplate jmsTopicTemplate;

@Test

publicvoid sender() throws Exception {

// 发送Queue队列消息

jmsQueueTemplate.send("email", new MessageCreator() {

@Override

public Message createMessage(Session session) throws JMSException {

MapMessage mapMessage = session.createMapMessage();

mapMessage.setString("email", "610731230@qq.com");

returnmapMessage;

}

});

// 发送Topic主题消息

jmsTopicTemplate.send("phone", new MessageCreator() {

@Override

public Message createMessage(Session session) throws JMSException {

MapMessage mapMessage = session.createMapMessage();

mapMessage.setString("phone", "18665591009");

returnmapMessage;

}

});

}

}

 

5.2  整合配置,实现消费消息

5.2.1 创建ilcps_jms 消息处理系统

l  继承父项目

5.2.2 写监听器类

/**

* 消息消费监听器,监听Queue队列中的email信息

* @author Administrator

*

*/

publicclass EmailListener implements MessageListener{

@Override

publicvoid onMessage(Message msg) {

try {

// 1. 转换

MapMessage mapMessage =  (MapMessage) msg;

// 2. 根据key,获取消息中的数据

String email = mapMessage.getString("email");

// 3. 业务处理(发邮件)

System.out.println("消息处理成功--->" + email);

} catch (JMSException e) {

e.printStackTrace();

}

}

}

/**

* 消息消费监听器,监听Topic主题中的phone信息

* @author Administrator

*

*/

publicclass PhoneListener implements MessageListener{

@Override

publicvoid onMessage(Message msg) {

try {

// 1. 转换

MapMessage mapMessage =  (MapMessage) msg;

// 2. 根据key,获取消息中的数据

String phone = mapMessage.getString("phone");

// 3. 业务处理(发短信)

System.out.println("消息处理成功--->" + phone);

} catch (JMSException e) {

e.printStackTrace();

}

}

}

5.2.3 Spring整合ActiveMQ 消费消息配置

5.2.3.1    参考

<!--

配置消息监听器类,监听队列或主题消息模型中的消息。从而实现消费消息。

jms:listener-container

destination-type 监听的JMS消息类型(queue、topic)

connection-factory Spring的缓存连接工厂

jms:listener

destination 对应MQ中队列名称或主题名称

rel         消息监听器类(实现MessageListener接口)

-->

<jms:listener-container destination-type="queue" connection-factory="cachingFactory">

<jms:listener destination="love" ref=""/>

</jms:listener-container>

l  2. 配置说明

属性

描述

container-type

监听器容器的类型。可用的选项是: default、simple、default102 或者 simple102 (默认值是 'default')。

connection-factory

JMS ConnectionFactory Bean的引用(默认的Bean名称是 'connectionFactory')。

task-executor

JMS监听器调用者Spring TaskExecutor 的引用。

destination-resolver

DestinationResolver 策略的引用,用以解析JMS Destinations。

message-converter

MessageConverter 策略的引用,用以转换JMS Messages 成监听器方法的参数。默认值是 SimpleMessageConverter。

destination-type

监听器的JMS目的地类型。可用的选项包含: queue、topic 或者 durableTopic (默认值是 'queue')。

client-id

这个监听器容器在JMS客户端的id。

acknowledge

本地JMS应答模式。可用的选项包含: auto、client、dups-ok 或者 transacted (默认值是 'auto')。 'transacted' 的值可激活本地事务性 Session。 也可以通过指定下面介绍的 transaction-manager 属性。

transaction-manager

Spring PlatformTransactionManager 的引用。

concurrency

每个监听器可激活的Session最大并发数。

prefetch

加载进每个Session的最大消息数。记住增加这个值会造成并发空闲。

5.2.3.2    完整配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"

xmlns:amq="http://activemq.apache.org/schema/core"

xmlns:jms="http://www.springframework.org/schema/jms"

xsi:schemaLocation="

        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd

        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd

        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd

        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd

        http://www.springframework.org/schema/data/jpa

        http://www.springframework.org/schema/data/jpa/spring-jpa.xsd

        http://www.springframework.org/schema/jms

        http://www.springframework.org/schema/jms/spring-jms.xsd

        http://activemq.apache.org/schema/core

        http://activemq.apache.org/schema/core/activemq-core.xsd">

<!--

Spring整合ActiveMQ,实现消息消费

   1. 创建ActiveMQ连接工厂

           2. 创建缓存工厂

           3. 创建监听器

-->

<!-- 1. 创建ActiveMQ连接工厂 -->

<amq:connectionFactory id="amqConnectionFactory"

userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>

<!-- 2. 创建缓存工厂 -->

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

<!-- 注入连接工厂-->

<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>

<!-- session缓存数目 -->

<property name="sessionCacheSize" value="5"></property>

</bean>

<!-- 3. 监听器配置 -->

<!-- 创建监听器对象 -->

<bean id="emailListener" class="cn.itcast.listener.EmailListener"></bean>

<bean id="phoneListener" class="cn.itcast.listener.PhoneListener"></bean>

<!--

配置消息监听器类,监听队列或主题消息模型中的消息。从而实现消费消息。

jms:listener-container

destination-type 监听的JMS消息类型(queue、topic)

connection-factory Spring的缓存连接工厂

jms:listener

destination 对应MQ中队列名称或主题名称

rel         消息监听器类(实现MessageListener接口)

-->

<!-- 3.1 监听指定名称(email)的队列中的消息-->

<jms:listener-container destination-type="queue" connection-factory="cachingConnectionFactory">

<jms:listener destination="email" ref="emailListener"/>

</jms:listener-container>

<!-- 3.2 监听指定名称(email)的主题中的消息 -->

<jms:listener-container destination-type="topic" connection-factory="cachingConnectionFactory">

<jms:listener destination="phone" ref="phoneListener"/>

</jms:listener-container>

</beans>

5.2.4 消费消息测试

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("classpath:applicationContext-mq-send.xml")

publicclass ConsumeApp {

// 保持监听器运行

@Test

publicvoid run() throws Exception {

while(true){}

}

}

6  小结

(1)  Jms Java 消息服务

作用: 在分布式系统中,通过java的消息服务,把一些占用系统资源的独立的业务抽取出去,通过消息处理系统完成这块业务的实现。从而提供系统的处理能力。

(2)  核心概念

a)    消息模型

Queue 队列消息模型

Topic 主题消息模型

b)    消息消费的2种方式

i.         receive()

ii.         MessageListener()

c)     Jms编程模型(Api)

包: javax.jms.*

i.         ConnectionFactroy

ii.         Destination

iii.         Session

创建生产者、消费者、目的地、消息

iv.         Connection

v.         MessageProducer

vi.         MessageConsumer

vii.         Message

(3)  发送消费Queue队列消息

(4)  发送消费Topic主题消息

(5)  测试:多个消费者

(6)  Spring整合ActiveMQ

a)    发消息

i.         配置

ConnectionFactory、CachingConnectionFactory、JmsTemplate

ii.         测试

b)    消费消息

i.         写监听器(获取消息Message内容)

ii.         配置

<jms:listener-container>

iii.         测试

启动tomcat服务器,监听器运行,就可以监听容器中消息。

JMS ActiveMQ相关推荐

  1. java spring boot jms_spring boot整合JMS(ActiveMQ实现)

    一.安装ActiveMQ 具体的安装步骤,请参考我的另一篇博文: 二.新建spring boot工程,并加入JMS(ActiveMQ)依赖 三.工程结构 pom依赖如下: xsi:schemaLoca ...

  2. goldengate java_配置Goldengate向JMS(ActiveMQ)发布消息

    配置Goldengate向JMS(ActiveMQ)发布消息 通过Goldengate的Application Adpater可以方便的向JMS队列发送消息,其他应用程序通过订阅JMS消息 来对数据作 ...

  3. JMS ActiveMQ研究文档

    1. 背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性:(1)同步通信:客户发出调用后,必须等待服务 ...

  4. JMS ActiveMQ案例

    创建一个web工程 导入ActiveMQ依赖的jar包  activemq-all-5.9.jar 写一个生产者(send)servlet package com.sun.jms; import ja ...

  5. JMS ActiveMQ 简介

    一.JMS简介 全称:Java Message Service 中文:Java消息服务 JMS是Java的一套API标准,最初的目的是为了使应用程序能够访问现有的MOM系统(MOM是Message O ...

  6. 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例

    转载:http://blog.csdn.net/jiuqiyuliang/article/details/48758203 第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模 ...

  7. JMS学习(4):--Spring和ActiveMQ整合的完整实例

    前言 这篇博文,我们基于spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PU ...

  8. jms activemq_带有ActiveMQ的JMS

    jms activemq 带有ActiveMQ的JMS JMS是Java消息服务的缩写,它提供了一种以松散耦合,灵活的方式集成应用程序的机制. JMS以存储和转发的方式跨应用程序异步传递数据. 应用程 ...

  9. Activemq Jms 简单示例

    Activemq Jms 简单示例 简介     简单的 Activemp JMS 示例代码 activemq 运行     简单使用docker启动一个: docker run -dit --nam ...

最新文章

  1. JavaScript之充实文档的内容
  2. 如何完全卸载Java
  3. 计算机房装修对门的要求,防火门尺寸要求有哪些 防火门尺寸规范
  4. 【每周CV论文】初学实例分割需要读哪些文章?
  5. ASP.NET MVC 环境配置,从1.0到2.0的转换和学习资源等
  6. 亲历2013年TED大会:全球最潮灵感大会
  7. JS中的call()和apply()方法(转)
  8. python读取oracle数据到hvie parquet_创建Hive表来从parquet / avro模式读取parquet文件
  9. ASP.NET页面解析(3)
  10. jumpserver(0.3.2版本)开源跳板机系统部署
  11. 将servlet[login]标记为不可用_30分钟上手OUTLOOK - 邮件标记
  12. django 学习-13 Django文件上传
  13. putty-不输入密码直接登陆
  14. [渝粤教育] 广东-国家-开放大学 21秋期末考试土力学与地基基础10445k1
  15. bochs怎么运行Linux系统,Ubuntu上使用Bochs
  16. 基于php的人力资源管理系统,基于thinkPHP框架的人力资源管理系统
  17. 大国崛起(三) 大不列颠及北爱尔兰联合王国(英国)
  18. Ipad IOS 蓝牙键盘快捷键
  19. helm开发环境部署gitea
  20. IT十年人生过客-二十四-转型互联网

热门文章

  1. 【web项目】前端生日礼物--注册页面篇
  2. matlab矩阵检索、嵌套,矩阵操作笔记
  3. 大型支付系统后台对账系统的控制和管理
  4. 一文讲清支付资金清算系统的功能
  5. java c 速度_Java和C#运行速度对比:Java比C#快约3倍
  6. 一键清除本地缓存的所有无用的docker镜像命令
  7. 《第一行代码》总结之网络、服务(五)
  8. [转帖]关于 /dev/urandom 的流言终结 | Linux 中国
  9. Altera PDN 设计和 FPGA 收发器性能
  10. uniapp人脸识别解决方案