2019独角兽企业重金招聘Python工程师标准>>>

#1 前言 在上一篇文章《ActiveMQ - 初体验,探讨JMS通信模型》中引出了两个话题:

  • 在点对点通信模型中,消费者没有启动,消息生产者投递消息到队列queue中,这时MQ服务重启,最后再启动消息消费者,消费者能接收到MQ服务未重启之前由消息生产者发布的消息,这是为什么呢?

  • 在消息发布/订阅通信模型中,消息订阅者由于某种原因挂了,这时消息发布者通过MQ服务向所订阅的topic发布消息,但消息订阅者重启后接收不到前面发布的消息,如果是重要消息,那不是会造成难以估计的后果吗,有没有解决方法?

为了解决这些问题,JMS标准制定了两个概念,一个是持久化消息(Persistent messages),另一个是持久订阅(durable subscribers)。

#2 持久化消息与非持久化消息

消息的持久化,顾名思义,就是MQ服务在接收到消息后,把消息存储在文件或者数据库,而不是存储在计算机物理内存中,这样即时消息的消费者或者订阅者挂了重启后,也有可能获取到之前没有接收到的消息。那么,怎样控制消息发布者的投递方式呢?

控制消息投递方式有两种,一种是针对整个producer的(可以理解为全局的),另一种是针对消息对象的(可以理解为局部的)。在默认情况下,消息的投递方式都是采用持久化投递的。下面看一下两种投递方式是怎么设置的。

  • 对整个producer域设置投递方式
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);//发送一个持久信息producer.setDeliveryMode(DeliveryMode.PERSISTENT); //设置持久投递
producer.send(session.createTextMessage(persistent_message_global));
  • 对单个消息message设置投递方式
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
//发送一个持久信息
TextMessage message = session.createTextMessage(persistent_message_single);
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);producer.send(message);

2.1 演示和结论

接下来的演示中,消息发送者将通过上面的两种投递方式,发送三条消息,分别是两条持久消息和一条非持久消息,改变一下producer、broker、consumer三者的启动顺序,看一下效果和总结结论。

先看下用于实验的代码。

** 消息生产者 **

public class Producer {public static final String QUEUE_NAME = "test-deliverymode-queue";public static void main(String[] args) {System.out.println("Producer started!");//构建非持久消息和持久消息的内容String non_persistent_message_global = "全局方式设置的非持久消息 : " + System.currentTimeMillis();String persistent_message_global = "全局方式设置的持久消息 : " + System.currentTimeMillis();String persistent_message_single = "针对单条消息设置的持久消息 : " + System.currentTimeMillis();try {Connection connection = ActiveMQManager.createConnection();connection.start();Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);MessageProducer producer = session.createProducer(queue);//发送一个持久信息producer.setDeliveryMode(DeliveryMode.PERSISTENT);producer.send(session.createTextMessage(persistent_message_global));//发送一个非持久信息producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);producer.send(session.createTextMessage(non_persistent_message_global));//发送一个持久信息TextMessage message = session.createTextMessage(persistent_message_single);message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);producer.send(message);System.out.println("成功发送消息:" + non_persistent_message_global);System.out.println("成功发送消息:" + persistent_message_global);System.out.println("成功发送消息:" + persistent_message_single);producer.close();session.close();connection.close();} catch (Exception e) {e.printStackTrace();}System.out.println("Producer end!");}
}

** 消息消费者 **

public class Consumer {public static void main(String[] args) throws IOException {System.out.println("Consumer started!");try {Connection connection = ActiveMQManager.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(Producer.QUEUE_NAME);MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = textMessage.getText();System.out.println("Consumer 获取消息 ---->" + text);} catch (Exception e) {e.printStackTrace();}}});System.in.read();consumer.close();session.close();connection.stop();} catch (Exception e) {e.printStackTrace();}System.out.println("Consumer end!");}
}

还是和上一篇文章一样,为了更好的探讨持久消息,需要更换启动顺序。

** 启动顺序: **

(1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动消息消费者

** 结论:不管是否是持久消息,消费者都能正常接收和处理。 **

(2)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动消息消费者

** 结论:持久消息可以被消费者正常接收。但是,非持久消息消费者没有接收到,这是因为非持久消息存储在内存,在MQ服务停止后,内存中的消息都会被清除掉,自然就不会再被接收到了。 **

(3)顺序:启动MQ broker服务 -> 启动消息消费者 -> 启动消息发布者

** 结论:所有的消息可以被消费者正常接收。**

#3 非持久订阅与持久订阅 前面留了这么一个关于发布/订阅模型的问题,就是在默认情况下消息订阅者是不能获取到离线消息的,不管这个消息是持久消息还是非持久消息,为了解决这个问题,activemq对消息订阅者划分为非持久订阅和持久订阅这两种状态。默认情况下,消息订阅者是非持久订阅的。要注意的是,持久订阅者和非持久订阅者针对的域Domain是Pub/Sub,而不是P2P模型。

3.1 非持久订阅

非持久订阅只有当客户端处于激活状态,也就是和 activemq broker 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。

非持久订阅的实现代码如下:

Connection connection = ActiveMQManager.createConnection();
connection.setClientID(CLIENT_ID); //持久化订阅要设置
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
MessageConsumer consumer = session.createDurableSubscriber(topic, "DurableTopicSubscriber"); 持久化订阅要使用这种方式创建consumer//监听消息
Connection connection = ActiveMQManager.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);//监听消息
consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = textMessage.getText();System.out.println("NonDurableTopicSubscriber 获取消息 ---->" + text);} catch (Exception e) {e.printStackTrace();}}
});

3.2 持久订阅

消息订阅者为持久订阅时,客户端向activemq broker注册一个自己身份的ID,当这个客户端处于离线时,broker会为这个ID 保存所有发送到主题的消息,当客户再次连接到broker时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。

要建立持久订阅,必须满足以下条件:

  1. 为连接connection设置一个客户 ID,如果ID以前已经被占用了,将会抛出异常。
Connection connection = ActiveMQManager.createConnection();
connection.setClientID(CLIENT_ID); //持久化订阅要设置
connection.start();
  1. 为订阅的主题指定一个订阅名称,连接ID和订阅名两者组合必须唯一,否则会抛异常
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
MessageConsumer consumer = session.createDurableSubscriber(topic, SUBSCRIBER_NAME); //持久订阅需要以这种方式创建订阅者

下面通过实验给大家一个更加深刻的理解。在消息发布者端发布两条消息,一条是持久消息,另一条是非持久消息。

消息发布:

public class Producer {public static final String TOPIC_NAME = "test-durable-topic";public static void main(String[] args) {System.out.println("Producer started!");//构建非持久消息和持久消息的内容String message_persistent = "持久的消息 : " + System.currentTimeMillis();String message_non_persistent = "非持久的消息 : " + System.currentTimeMillis();try {Connection connection = ActiveMQManager.createConnection();connection.start();Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);Topic topic = session.createTopic(TOPIC_NAME);MessageProducer producer = session.createProducer(topic);//发送一个非持久信息TextMessage non_persistent_message = session.createTextMessage(message_non_persistent);non_persistent_message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);producer.send(non_persistent_message);//发送一个持久信息TextMessage persistent_message = session.createTextMessage(message_persistent);persistent_message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);producer.send(persistent_message);System.out.println("成功发送消息:" + message_non_persistent);System.out.println("成功发送消息:" + message_persistent);producer.close();session.close();connection.close();} catch (Exception e) {e.printStackTrace();}System.out.println("Producer end!");}
}

消息订阅:

/*** 持久订阅*/
public class DurableTopicSubscriber {public static final String CLIENT_ID = "client_id";public static final String SUBSCRIBER_NAME = "subscriber_name";public static void main(String[] args) throws IOException {System.out.println("DurableTopicSubscriber started!");try {Connection connection = ActiveMQManager.createConnection();connection.setClientID(CLIENT_ID); //持久化订阅要设置connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(Producer.TOPIC_NAME);//订阅者要使用这种方式创建MessageConsumer consumer = session.createDurableSubscriber(topic, SUBSCRIBER_NAME); //监听消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;String text = textMessage.getText();System.out.println("DurableTopicSubscriber 获取消息 ---->" + text);} catch (Exception e) {e.printStackTrace();}}});// 线程一直等待System.in.read();consumer.close();session.close();connection.stop();} catch (Exception e) {e.printStackTrace();}System.out.println("DurableTopicSubscriber end!");}
}

通过实验顺序的不同,不一样的小结论

(1)通过管理后台清空所有的订阅者 -> MQ启动 -> 消息发布 -> 启动持久订阅者

结论:消息仍然没有被订阅者接收到,这是因为订阅者在消息发布之前没有在MQ服务中注册,告诉MQ服务自己是订阅某个主题的消息的,你要在我不在线的时候帮我保存这个消息,直到我取出这个消息为止

(2)MQ启动 ->启动持久订阅者 -> 持久订阅者下线 -> 消息发布 -> 持久订阅者重新上线

结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到

(3)MQ启动 -> 启动持久订阅者 -> 持久订阅者下线 -> 持久消息和非持久同时发布 -> MQ下线后重启 -> 持久订阅者重新上线

结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到

代码

http://git.oschina.net/thinwonton/activemq-showcase

转载于:https://my.oschina.net/thinwonton/blog/888654

ActiveMQ - 持久化消息与持久主题订阅相关推荐

  1. ActiveMQ持久化消息的三种方式

    本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle.下面逐一介绍. A:持久化为文件 这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就可以了.涉及到的配置和代码有 < ...

  2. 想让用户“一见钟情”,你需要主题订阅消息精准推送

    现今智能终端App越来越多,各个App推送的信息量急速扩张,如何让自己推送的信息精准抓住用户的视线,在市场竞争中占据高地?其中一个答案就是--主题订阅消息推送. 主题订阅消息推送可以根据用户习惯或让用 ...

  3. ActiveMQ 持久化讯息数据库信息

    www.MyException.Cn   发布于:2012-11-10 10:48:50   浏览:0次 ActiveMQ 持久化消息数据库信息 最近有网友问我,ActiveMQ持久化的中表结构是什么 ...

  4. JMS学习七(ActiveMQ之Topic的持久订阅)

    非持久化订阅持续到它们订阅对象的生命周期.这意味着,客户端只能在订阅者活动时看到相关主题发布的消息.如果订阅者不活动,它会错过相关主题的消息.如果花费较大的开销,订阅者可以被定义为durable(持久 ...

  5. JMS学习九 ActiveMQ的消息持久化到Mysql数据库

    1.将连接Mysql数据库驱动包,放到ActiveMQ的lib目录下 2,修改ActiveMQ的conf目录下的active.xml文件,修改数据持久化的方式 2.1  修改原来的kshadb的持久化 ...

  6. ActiveMQ持久化方式(转)

    消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息 中心重新启动后仍然可以将消息发送出去,如果把这种持久化 ...

  7. ActiveMQ持久化方式

    ActiveMQ持久化方式 发表于8个月前(2014-09-04 15:55)   阅读(686) | 评论(0) 17人收藏此文章, 我要收藏 赞1 慕课网,程序员升职加薪神器,点击免费学习 摘要  ...

  8. RabbitMQ系列笔记主题订阅模式

    导语 昨天的内容主要讲了RabbitMQ的发布订阅模式和路由模式,都很好的满足了我们的日志打印,但是如果说,我对日志的打印,希望可以过滤掉一些内容呢,比如说,在打印错误日志的时候,只打印login时的 ...

  9. PHP中使用ActiveMQ实现消息队列

    2019独角兽企业重金招聘Python工程师标准>>> PHP中使用ActiveMQ实现消息队列前面我们已经学了如何部署ActiveMQ, 我们知道通过ActiveMQ的一个管理后台 ...

  10. SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)

    场景 Windows上Mqtt服务器搭建与使用客户端工具MqttBox进行测试: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/1 ...

最新文章

  1. 十行代码实现网页标题滚动效果!
  2. Boost:BOOST_CURRENT_FUNCTION的测试程序
  3. 钉钉授权第三方WEB网站扫码登录
  4. ux和ui_如何为您的UX / UI设计选择正确的原型制作工具
  5. html语言书写注意事项,HTML注意事项(学习笔记)
  6. 调用远程摄像头进行人脸识别_工地安全:AI如何实现安全帽检测与人脸识别?...
  7. Oracle Goldengate在HP平台裸设备文件系统OGG-01028处理
  8. 复杂网络社区划分方法综述
  9. unity3d 资源打包加密 整理
  10. 【macOS付费软件推荐】第6期:Reeder
  11. sublime text3怎么运行python代码_Sublime Text3配置在可交互环境下运行python快捷键
  12. 读Google MapReduce后有感
  13. 医院挂号系统代码_智慧医院中心是怎样做的?分诊叫号系统如何正确使用!
  14. python追加_python追加写
  15. ARCHPR(暴力破解压缩包密码软件)
  16. 用AndServer做安卓手机服务器
  17. vue 汉字转拼音字母
  18. U3D_Shader编程(第二篇:基础夯实篇)
  19. e4a mqtt 类库 开发 安卓 mqtt apk 安卓mqtt客户端开发
  20. 一步一步学习Redis——使用config命令查看或设置配置项

热门文章

  1. size()计算jquery对象中元素的个数
  2. 判断URL的HTTP状态
  3. JavaScript学习笔记(七)
  4. RestClient测试
  5. 当前五大浏览器内核及简史
  6. 【网络流24题】餐巾计划问题(费用流)
  7. [python]设计模式
  8. 记在thinkPHP中一个创建模型的小错误
  9. 彼得林奇:赚钱密码(1990年一次演讲)
  10. Android进阶篇-Http协议