ActiveMQ——消息的生产和消费
一、ActiveMQ中消息的管理机制
使用ActiveMQ的目的必然是处理消息,大体步骤如下:
①通过ConnectionFactory连接到ActiveMQ服务器
②通过ConnectionFactory创建Connection
③通过Connection获取Session
④通过Session创建消息的目的地,即队列(Queue)或主题(Topic)
⑤通过Session创建消息生产者,生产的目的地(即④中创建的Queue或Topic)
⑥通过Session创建消息
⑦通过消息生产者将消息发送至消息的目的地
⑧消费者在相同的消息目的地消费消息
至于消息的目的地应该是队列(Queue)还是主题(Topic),需要看该消息是只能被消费一次、还是需要被消费多次,也就是说Queue中的消息适合点对点的模式,而Topic中的消息则适合发布/订阅模式
二、队列(Queue)中消息的发送和接收
1、消息的生产
public class MQProducer {//MQ服务的地址,ActiveMQ使用的是tcp协议private static final String MQ_URL = "tcp://192.168.2.107:61616";//队列的名称private static final String QUEUE_NAME = "queue01";public static void main(String args[]) throws JMSException {//连接MQ服务ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(MQ_URL);//创建连接Connection connection = factory.createConnection();//开启连接connection.start();//创建sessionSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建队列Queue queue = session.createQueue(QUEUE_NAME);//创建消息生产者MessageProducer producer = session.createProducer(queue);for (int i = 0; i < 3; i++) {//生产消息TextMessage textMessage = session.createTextMessage("msg" + i);//发送消息producer.send(textMessage);}//关闭资源producer.close();session.close();connection.close();}
}
Tip
:Queue和Topic都是Destination接口的实现
2、消息的消费,有三种方式
①receive():
public class MQConsumer {private static final String MQ_URL = "tcp://192.168.2.107:61616";//队列名称要和生产者一致private static final String QUEUE_NAME = "queue01";public static void main(String args[]) throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(MQ_URL);Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageConsumer consumer = session.createConsumer(queue);while (true) {Message receive = consumer.receive();//阻塞TextMessage message = (TextMessage) receive;String text = message.getText();System.out.println(">>>" + text);if (null != text && !"".equals(text)) {System.out.println(text);} else {System.out.println("null");break;}}//释放资源consumer.close();session.close();connection.close();}
}
Tip
:consumer.receive()是一个阻塞方法,在接收不到队列中的消息时就一直阻塞直到接收到消息为止,因此while循环是不可能被中断的,下面的资源也不会得到释放,就会一直和ActiveMQ保持连接。
②receive(Long l):设置超时时间
public class MQConsumer {private static final String MQ_URL = "tcp://192.168.2.107:61616";//队列名称要和生产者一致private static final String QUEUE_NAME = "queue01";public static void main(String args[]) throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(MQ_URL);Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageConsumer consumer = session.createConsumer(queue);while (true) {Message receive = consumer.receive(4000l);//设置超时时间,非阻塞TextMessage message = (TextMessage) receive;if (null != message) {String text = message.getText();System.out.println(">>>" + text);if (null != text && !"".equals(text)) {System.out.println(text);}}else{break;}}//超时后会释放资源,断开和ActiveMQ的连接consumer.close();session.close();connection.close();}
}
③采用消息监听器:messageListener
public class MQConsumer {private static final String MQ_URL = "tcp://192.168.2.107:61616";//队列名称要和生产者一致private static final String QUEUE_NAME = "queue01";public static void main(String args[]) throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(MQ_URL);Connection connection = factory.createConnection();connection.start();System.out.println(">>主:" + Thread.currentThread().getName());Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;String text = null;try {text = msg.getText();} catch (JMSException e) {e.printStackTrace();}System.out.println(">>子:" + Thread.currentThread().getName());System.out.println(text);}});//释放资源//consumer.close();//session.close();//connection.close();}
}
Tip
:
1️⃣使用消息监听(即MessageListener)的方式也是非阻塞的,在监听不到队列中的消息时不会一直等待
2️⃣使用消息监听的方式在监听到有消息时会开启一个新的线程处理消息,因此如果我们在主线程中释放了资源就可能会看不到onMessage()方法中的打印结果
3️⃣使用消息监听的方式一旦我们释放了连接等资源,在生产者再次向监听的队列中发送消息时就监听不到新发送的消息了,这就失去了监听的意义,那么问题来了:使用消息监听的方式到底该不该释放连接等资源呢?释放了就会监听不到新的消息,不就失去了监听的意义了吗?不释放又会占用系统资源,应该怎么办呢?答案是不释放,因为MQ是用在分布式系统内部的,用于各个系统之间的消息通信(所有系统的总量其实并不会很多),对于消费者端而言基本上一个队列或者一个Topic的连接数最多是分布式子系统的个数(在集群环境下最多是子系统的个数的n倍,n的值并不会很大,也就是每个子系统都要消费所有队列和订阅所有Topic的消息时的所需的连接数是最多的),因此连接数并不会很多(估算的话最多是:队列的个数 * 系统的总个数 * 集群的倍数 + Topic的个数 * 系统的总个数 * 集群的倍数),所以没必要考虑负载和系统资源占用的问题。虽然是这样,但我们并不能随便使用MQ,不能仅仅为了一个异步处理就开启并占用一个MQ的连接,在异步处理的时候可以采用开启线程的方式
4️⃣ActiveMQ队列中的消息在消费时采用的是轮询负载的机制,即每个消费者轮流消费消息,当然负载机制是可以配置的
三、主题(Topic)的消息的发送和接收
Topic模式适用于发布/订阅,生产者将消息发布到Topic中,每个消息都可能有多个消费者,属于1:N的关系,生产者和消费者之间有时间上的相关性,订阅某一个主题的消费者只能消费自它订阅之后生产者发布的消息,如果生产者在发布消息时没有订阅者,那发布的消息就是一条垃圾消息,因此生产者在发布消息时应该检查当前是否有订阅者。
JMS规范还允许客户创建持久订阅,这在一定程度上放松了时间上的相关性,持久订阅允许消费者消费它在未处于激活状态时生产者发送到主题的消息。
1、生产者发布消息到主题:仅仅是将消息的目的地变为了Topic
public class MQProducer {//MQ服务的地址private static final String MQ_URL = "tcp://192.168.2.107:61616";//主题的名称private static final String TOPIC_NAME = "topic01";public static void main(String args[]) throws JMSException {//连接MQ服务ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(MQ_URL);//创建连接Connection connection = factory.createConnection();//开启连接connection.start();//创建sessionSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建主题Topic topic = session.createTopic(TOPIC_NAME);//创建消息生产者MessageProducer producer = session.createProducer(topic);for (int i = 0; i < 3; i++) {//生产消息TextMessage textMessage = session.createTextMessage("msg" + i);//发送消息producer.send(textMessage);}//关闭资源producer.close();session.close();connection.close();}
}
2、订阅者消费消息:注意要先启动消费者端
public class MQConsumer {private static final String MQ_URL = "tcp://192.168.2.107:61616";//主题名称要和生产者一致private static final String TOPIC_NAME = "topic01";public static void main(String args[]) throws JMSException {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(MQ_URL);Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic queue = session.createTopic(TOPIC_NAME);MessageConsumer consumer = session.createConsumer(queue);//使用Lambda表达式consumer.setMessageListener((message -> {TextMessage msg = (TextMessage) message;String text = null;try {text = msg.getText();} catch (JMSException e) {e.printStackTrace();}System.out.println(">>子:" + Thread.currentThread().getName());System.out.println(text);}));// consumer.close();
// session.close();
// connection.close();}
}
四、两种模式的对比
Topic中的消息默认不会被持久化,在ActiveMQ服务重启后Topic中的消息就会丢失,但Queue中的消息会被持久化,重启服务后不会丢失。
ActiveMQ——消息的生产和消费相关推荐
- ActiveMQ使用线程池实现消息的生产与消费
1. 首先先引入相关的lib包,重点需引用activemq-client-5.8.0.jar,activemq-core-5.7.0.jar,activemq-pool-5.8.0.jar,activ ...
- RocketMQ事务消息从生产到消费原理详解(包括回查过程)
名词解释 half消息(生产者发送的Prepare消息):发送到MQ Server但无法被consumer消费的消息,暂时存在MQ Server,需要收到生产者二次确认后才能被消费 消息回查:一些意外 ...
- JMS学习三(ActiveMQ消息的可靠性)
下面我们来学习一下消息接受确认和发送持久化消息.消息的过期.消息的选择器和消息的优先级. 一.消息接收确认 1.jms消息只有在被确认之后才认为成功消费了这条消息.消息的成功消费通常包括三个步骤:(1 ...
- ActiveMQ消息消费流程及优化
ActiveMQ默认情况下是开启事务的吗? MQ默认情况下是没有开启事务的 , MQ将消息推送给消费者后, 消费者会立即向服务器确认该消息已被消费,之后才进行该条消息的处理逻辑 为什么在应用中有事务开 ...
- Kafka实现消息生产和消费
文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...
- 【转】ActiveMQ消息传送机制以及ACK机制详解
2019独角兽企业重金招聘Python工程师标准>>> 本文转载自 http://shift-alt-ctrl.iteye.com/blog/2020182 AcitveMQ是作为一 ...
- 用redis实现消息队列(实时消费+ack机制)【转】
用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...
- ActiveMQ消息传送机制以及ACK机制详解
2019独角兽企业重金招聘Python工程师标准>>> AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全 ...
- activemq消息丢失_Kafka or RabbitMQ:消息中间件选型深入分析
消息中间件选型深入分析 --从Kafka与RabbitMQ的对比来看全局 有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说 ...
最新文章
- 自动化早已不是那个自动化
- 谈周六晚上的毕业典礼
- python3 ssl.CertificateError: hostname manifest.googlevideo.com doesn t match either
- 9月份个人:windows系统的DNS服务器配置
- mysql建表以及列属性
- java 记事本换行_[求助]记事本自动换行
- SDN精华问答 | 为什么会出现SDN?
- 入职新公司,如何快速上手公司业务?
- Linux系统编程17:进程控制之进程等待为什么进程需要被等待wait方法和waitpid方法阻塞和非阻塞等待
- win7旗舰版+caffe+vs2013+matlab2014b(无GPU版)
- Java基本数据类型和String类型的转化
- Jmeter数据库及接口测试
- 《DSP using MATLAB》示例Example4.2
- linux网络管理员认证考试,红帽认证系统管理员 (RHCSA) 考试
- Ecshop小京东支付插件【小京东个人支付宝即时到帐支付插件支持PC电脑版+手机版】
- VMware ESXi安装mac os
- 3天实现暴力涨粉500+的引流话术整理!
- IT人,自我营销,你懂吗?
- UVM基础-Sequence、Sequencer(二)
- 大屏页面使用transform属性scale进行缩放,高德地图点击事件失效,地图展示模糊