1. 持久订阅时, 客户端需要首先向JMS提供者注册一个表面自己身份的id(clientId)。这样当咱们这个客户端处于离线时, JMS提供者会为这个客户端保存所有发送到主题的消息。当客户端再次连接到JMS提供者时, JMS提供者根据这个客户端id, 把消息发送给它。

2. 创建持久订阅必须设置一个客户端id, 不然会报如下错误

3. 设置客户端id

3.1. 设置客户端id要紧跟在创建连接之后

// 1. 创建一个连接工厂
TopicConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
// 2. 创建连接
TopicConnection conn = cf.createTopicConnection();
// 3. 设置客户端id
conn.setClientID(clientId);

3.2. 如果设置客户端id没有紧跟在创建连接之后回报如下错误

4. 多个客户端设置clientID, clientID不能重复。如果已有一个活动的被clientID标识的客户端, 再出现一个重复clientID标识的客户端连接, 会报如下错误

5. 持久订阅的实现机制

5.1. 生产者发送消息给提供者, 如果此时提供者发现没有任何的消费者(包括在线/离线), 那么就会认为该消息无用, 不需要存储, 会直接删除。

5.2. 如果有在线的消费者, 那么提供者会将消息直接传送给在线的消费者, 因为这个时候连接是通的, 消息有传输的通道。

5.3. 如果有离线的消费者, 那么提供者会把属于该消费者的消息存储下来, 等消费者在线的时候, 再将保存的离线消息推送给它。对于持久订阅者, 提供者会在该消费者第一次登录在线的时候, 将它的身份信息记录下来。记录身份的关键就是clientID和主题名称。当持久订阅者又重新在线的时候, 提供者会根据当前连接的clientID和主题名称, 去查询属于它的离线消息, 并进行推送。

6. 例子

6.1. 新建一个名为JMSDurableSubscriber的Java项目, 同时拷入相关jar包

6.2. 编写MyProducer.java

package com.jmsapp.persistent;import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class MyProducer {// 默认连接用户名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默认用户密码private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 队列名称private static final String topicName = "persistentSubscriber";public static void main(String[] args) {// 1. 创建一个连接工厂TopicConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 连接对象TopicConnection conn = null;// 会话对象TopicSession session = null;try {// 2. 创建连接conn = cf.createTopicConnection();// 3. 启动连接conn.start();// 4. 创建会话session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 创建消息目的地。如果是点对点, 那么它的实现是Queue; 如果是订阅模式, 那它的实现是Topic。这里我们创建一个名为persistentSubscriber的主题。Topic topic = session.createTopic(topicName);// 6. 消息生产者TopicPublisher publisher = session.createPublisher(topic);// 7. 创建文本消息和发送消息StreamMessage message = session.createStreamMessage();message.writeString("JMS中的持久订阅");publisher.publish(message);} catch (JMSException e) {e.printStackTrace();} finally {try {if (session != null) {session.close();}} catch (JMSException e1) {e1.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}}
}

6.3. 编写MyConsumer.java

package com.jmsapp.persistent;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class MyConsumer {// 默认连接用户名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默认用户密码private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 队列名称private static final String topicName = "persistentSubscriber";// 客户端idprivate static final String clientId = "rjbd";public static void main(String[] args) {// 1. 创建一个连接工厂TopicConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 连接对象TopicConnection conn = null;// 会话对象TopicSession session = null;try {// 2. 创建连接conn = cf.createTopicConnection();// 3. 设置客户端idconn.setClientID(clientId);// 4. 创建会话session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 创建消息目的地。如果是点对点, 那么它的实现是Queue; 如果是订阅模式, 那它的实现是Topic。这里我们创建一个名为persistentSubscriber的主题。Topic topic = session.createTopic(topicName);// 6. 消息消费者TopicSubscriber subscriber = session.createDurableSubscriber(topic, clientId);// 7. 接收消息subscriber.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message msg) {try {StreamMessage message = (StreamMessage) msg;System.out.println("接收: " + message.readString());} catch (JMSException e) {e.printStackTrace();}}});// 8. 启动连接, 准备开始接收消息conn.start();} catch (JMSException e) {e.printStackTrace();}}
}

6.4. 运行MyConsumer.java, 接收端是一直处于运行状态的

6.5. 终止运行MyConsumer.java

6.6. 运行MyProducer.java

6.7. 再次运行MyConsumer.java, 接收到消息(非持久化订阅, 这样操作就接收不到消息)

007_JMS中的持久订阅相关推荐

  1. JMS学习(五)--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系...

    一,消息的持久化和非持久化 ①DeliveryMode 这是传输模式.ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent deliver ...

  2. JMS学习(六)--提高非持久订阅者的可靠性 以及 订阅恢复策略

    一,非持久订阅者 和 实时消费消息 在这篇文章中区分了Domain为Pub/Sub.Destination为Topic时,消费者有两种:持久订阅者 和 非持久订阅者. 对于持久订阅者而言,只要订阅了某 ...

  3. JMS学习七(ActiveMQ之Topic的持久订阅)

    非持久化订阅持续到它们订阅对象的生命周期.这意味着,客户端只能在订阅者活动时看到相关主题发布的消息.如果订阅者不活动,它会错过相关主题的消息.如果花费较大的开销,订阅者可以被定义为durable(持久 ...

  4. activemq 持久订阅_ActiveMQ群集,持久订阅者和虚拟主题可助您一臂之力

    activemq 持久订阅 因此,您希望使用ActiveMQ跨分布式主题进行发布-订阅,并且要可靠. 您可以使用永久订阅,对吗? 可以,但是,如果您将群集与ActiveMQ一起使用,则可能会遇到意外的 ...

  5. ActiveMQ群集,持久订阅者和虚拟主题可助您一臂之力

    因此,您希望跨分布式主题使用ActiveMQ进行发布-订阅,并且要可靠. 您可以只使用永久订阅,对不对? 可以,但是,如果将群集与ActiveMQ一起使用,则可能会遇到意外行为. 我最近在一个客户端上 ...

  6. ActiveMQ持久订阅设置

    在JMS中,Topic实现publish和subscribe语义.一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝.但是在消息代理接收到消 ...

  7. 在React Native和Node.js中验证iOS订阅收据

    交易收据:订阅管理的关键要素 (Transaction receipts: the key element of subscription management) Transaction receip ...

  8. ActiveMq持久订阅必须指定clientId以及设置clientId源码分析

    创建持久订阅时的代码: @Overridepublic TopicSubscriber createDurableSubscriber(Topic topic, String name, String ...

  9. SQL Server 2008空间数据应用系列十一:Bing Maps中呈现GeoRSS订阅的空间数据

    友情提示,您阅读本篇博文的先决条件如下: 1.本文示例基于Microsoft SQL Server 2008 R2调测. 2.具备 Transact-SQL 编程经验和使用 SQL Server Ma ...

最新文章

  1. 配置linux-Fedora系统下iptables防火墙
  2. android-Service和Thread的区别
  3. Faster RCNN网络简介
  4. scroll-view如何自适应页面剩余高度
  5. 自己写的计算时间坐标的代码
  6. Cloud for Customer的work center显示逻辑
  7. ubuntu 下groovy 安装配置
  8. 华为Mate 50系列明年初发布:麒麟990/骁龙8 Gen1加持
  9. 如何在正则表达式中使用变量?
  10. c语言练习题库网站,c语言练习题库网站_0.doc
  11. 在将计算机技术应用于会计工作的初期,所开发的会计核算软件主要用于,2013年会计从业考试《电算化》会计核算软件...
  12. 学习数据库(1)——初始数据库
  13. python pytz_关于python:找不到符合pytz要求的版本
  14. 作为空降领导,该如何做?
  15. 天马行空 | 假如上网装X需要花钱?
  16. SQL 模拟生成商品订单表
  17. android 包命名不管你怎么命,千万不要用下面几个
  18. 法航AF447失事,机上有228人
  19. MySQL修改数据库名字
  20. 关于编写“AUTORUN.inf”一点心得!!!

热门文章

  1. 借助 Evolution Linux 的帮助来轻松安装 Arch Linux
  2. php cli模式下获取参数的方法
  3. 只用jsp实现同样的Servlet功能
  4. 信息安全系统设计基础学习总结第二周
  5. 使用SD-WAN策略与传统路由器的OFFICE 365配置
  6. struts导出txt文件
  7. Windows Server 2008防火墙问题及Sql Server2005用户登录问题
  8. MACD 的数学解释
  9. arm开发板6410/2440上mjpg-streamer网络视频服务器移植
  10. 使用代理网络配置maven,显示导入失败的原因