引子

这段时间闲来无事学了下activitymq,具体的学习方法是看ActiveMQ_in_Action。并通过自己编码把书里面介绍的内容一一实现。大家有兴趣也可以这么去学,不会花很多的时间,但是会让自己能清晰的理解activitymq[ActiveMQ_in_Action下载](http://download.csdn.net/download/niclascage/9935318)。在这里我只简单的罗列一些需要注意的知识点,想要详细了解可以去看书,最后会附上代码。

基础知识

一. JMS 的可靠性机制

消息确认JMS 消息只有在被确认之后,才认为已经被成功地消费了。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认分三种情况:Session.AUTO_ACKNOWLEDGE:客户成功的从receive方法返回或者从MessageListener.onMessage方法成功返回的时候, 会话自动确认。Session.CLIENT_ACKNOWLEDGE:客户通过消息的 acknowledge 方法确认.在这种模式中,确认是在会话层上进行的,确认一个被消费的消息将自动确认所有已被会话消费的消息Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可能会导致一些重复的消息消息持久化PERSISTENT:指示 JMS provider 持久保存消息,以保证消息不会因为 JMSprovider 的失败而丢失。NON_PERSISTENT:不要求 JMS provider 持久保存消息。消息过期可以设置消息在一定时间后过期,默认是永不过期。持久订阅首先消息生产者必须使用 PERSISTENT 提交消息。客户可以通过会话上的createDurableSubscriber 方法来创建一个持久订阅,该方法的第一个参数必须是一个 topic。第二个参数是订阅的名称。JMS provider 会向客户发送客户处于非激活状态时所发布的消息。

二. Transport

VM Transport:TCP Transport:允许客户端通过 TCP socket 连接到远程的 brokerFailover Transport:Failover Transport 是一种重新连接的机制Discovery transport:

三、消息持久化

Kahadb:是activemq从版本5.4之后的默认消息存储引擎JDBC Persistence:目前支持的数据库有 Apache Derby, Axion, DB2, HSQL, Informix, MaxDB,MySQL, Oracle, Postgresql, SQLServer, Sybase。

四、clusters

Queue consumer clustersBroker clusters在一个网络内运行多个 brokers 或者 stand alone brokers 时存在一个问题,这就是消息在物理上只被一个 broker 持有,因此当某个 broker 失效,那么你只能等待直到它重启后,这个 broker 上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。Master Slave 背后的想法是,消息被复制到 slave broker,因此即使 master broker 遇到了像硬件故障之类的错误,你也可以立即切换到 slave broker 而不丢失任何消息。Pure Master Slave 的工作方式如下:Slave broker 消费 master broker 上所有的消息状态,例如消息、确认和事务状态等。只要 slave broker 连接到了 master broker,它不会(也不被允许)启动任何 network connectors 或者 transport connectors,所以唯一的目的就是复制 master broker 的状态。Master broker 只有在消息成功被复制到 slave broker 之后才会响应客户。例如,客户的 commit 请求只有在 master broker 和 slave broker都处理完毕 commit 请求之后才会结束。当 master broker 失效的时候,slave broker 有两种选择,一种是 slavebroker 启动所有的 network connectors 和 transport connectors,这允许客户端切换到 slave broker;另外一种是 slave broker 停止。这种情况下,slave broker 只是复制了 master broker 的状态。客户应该使用 failover transport 并且应该首先尝试连接 masterbroker。例如: failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false设 置randomize为false就可以让客户总是首先尝试连接master broker(slave broker 并不会接受任何连接,直到它成为了 master broker)。Pure Master Slave 具有以下限制:只能有一个 slave broker 连接到 master broker。在因 master broker 失效而导致 slave broker 成为 master 之后,之前的 master broker 只有在当前的 master broker(原 slave broker)停止后才能重新生效。

五、Shared File System Master Slave

如果你使用 SAN 或者共享文件系统,那么你可以使用 Shared File SystemMaster Slave。基本上,你可以运行多个 broker,这些 broker 共享数据目录。当第一个 broker 得到文件上的排他锁之后,其它的 broker 便会 在循环中等待获得这把锁。客户端使用 failover transport来连接到可用的 broker。当 masterbroker 失效的时候会释放这把锁,这时候其中一个 slave broker 会得到这把锁从而成为 master broker。

六、 JDBC Master Slave

JDBC Master Slave 的工作原理跟 Shared File System Master Slave 类似,只是采用了数据库作为持久化存储。

代码

如下是代码,上面都有注释:
获取连接:

package activntymq.base;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class MQConnectionConfig {private static ConnectionFactory connectionFactory = null;  /*** 初始化连接工厂*/private static void initConnectionFactory(){//connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616"); /***  failover:activitymq断线重连*  randomize:true-随机连接,可以有效地控制客户端在多个broker上的负载均衡。false-首先连接到主节点,并在主节点不可用时只连接到辅助备份代理*  priorityBackup=true:指定本地broker*  以上配置例子,客户端将尝试连接并保持连接到本地broker。 如果本地broker失败,它当然会故障转移到远程。 *  但是,由于使用priorityBackup参数,客户端将不断尝试重新连接到本地。 一旦客户端可以这样做,客户端将重新连接到它,而不需要任何手动干预*/connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"failover:(tcp://127.0.0.1:61616,tcp://192.168.0.104:61616)randomize=false");}/*** 获取连接* @return*/public static Connection getConnection(){if(connectionFactory == null){initConnectionFactory();}try {Connection connection = connectionFactory.createConnection();return connection;} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}return null;}/*** 关闭acitivity资源* @param producer* @param cosumer* @param session* @param connection*/public static void close(MessageProducer producer, MessageConsumer consumer, Session session, Connection connection){try {if(producer != null){producer.close();}if(consumer != null){consumer.close();}if(session != null){session.close();}if(connection != null){connection.close();}} catch (JMSException e) {e.printStackTrace();}}
}

队列:

package activntymq.queque;import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;import activntymq.base.MQConnectionConfig;public class Producer {public void produce(){Connection connection = MQConnectionConfig.getConnection();Session session = null;MessageProducer producer = null;try {connection.start();/*** false:不支持事物,true:支持事物* 一、事物性会话中,事物提交的时候消息被确认* 二、非事务性回话中,消息何时被确认分三种情况:* 1、Session.AUTO_ACKNOWLEDGE:客户成功的从receive方法返回或者从 MessageListener.onMessage方法成功返回的时候, 会话自动确认* 2、Session.CLIENT_ACKNOWLEDGE:客户通过消息的 acknowledge 方法确认.在这种模式中,确认是在会话层上进行的,确认一个被消费的消息将自动确认所有已被会话消费的消息* 3、Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可能会导致一些重复的消息*/session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("FirstQueue");//消息队列producer = session.createProducer(destination);/*** PERSISTENT:消息持久化,默认消息持久,优点保证消息不丢失* NON_PERSISTENT:非持久化:优点性能更强*/producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息持久化sendMessage(session, producer);} catch (Exception e) {e.printStackTrace();} finally {MQConnectionConfig.close(producer, null, session, connection);}}public static void sendMessage(Session session, MessageProducer producer)  throws Exception {  int count = 0;while(true){ String msg = "hello activity mq " + count++;TextMessage message = session.createTextMessage(msg);  //message.acknowledge();System.out.println("Producer produce : " + msg);  producer.send(message);session.commit();Thread.sleep(2000);}  }  public static void main(String[] args) {Producer producer = new Producer();producer.produce();}
}package activntymq.queque;import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;import activntymq.base.MQConnectionConfig;public class Consumer implements MessageListener{public void cosume(){Connection connection = MQConnectionConfig.getConnection();Session session = null;MessageConsumer consumer = null;try {connection.start();//false:不支持事物,true:支持事物session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("FirstQueue");//消息队列consumer = session.createConsumer(destination);//异步消费消息:实现接口MessageListener,注册监听器 consumer.setMessageListener(this); //(异步接收) ,实现 onMessage方法//consumer.setMessageListener(this);reciveMsg(consumer, session);//同步消费消息} catch (JMSException e) {e.printStackTrace();}finally{//异步的时候去掉关闭资源的代码//MQConnectionConfig.close(null, consumer, session, connection);}}/*** 同步消费消息:适用于执行时间短的任务,这样即使是同步,线程也不容易被堵塞* @param consumer* @param session*/private void reciveMsg(MessageConsumer consumer, Session session) {try {while(true){//同步消费,两秒钟一次返回,如果recive方法不带参数,则会一直阻塞到消息到达TextMessage message = (TextMessage) consumer.receive(2000);if(message != null){System.out.println("Consumer sonsume : " + message.getText());}session.commit();}} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}  }/*** 异步消费消息:适用于执行时间长的任务,上下文切换不会太频繁,同步则容易被堵塞*/@Overridepublic void onMessage(Message message) {if(message != null){try {TextMessage txtMsg = (TextMessage) message; System.out.println("Consumer sonsume : " + txtMsg.getText());} catch (JMSException e) {e.printStackTrace();}}}public static void main(String[] args) {Consumer consumer = new Consumer();consumer.cosume();}
}

主题:

package activntymq.topic;import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;import activntymq.base.MQConnectionConfig;public class Producer {public void produce(){Connection connection = MQConnectionConfig.getConnection();Session session = null;MessageProducer producer = null;try {connection.start();//false:不支持事物,true:支持事物session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createTopic("myFirstTopic");//主题producer = session.createProducer(destination);sendMessage(session, producer);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {MQConnectionConfig.close(producer, null, session, connection);}}public static void sendMessage(Session session, MessageProducer producer)  throws Exception {  int count = 0;while(true){ String msg = "hello activity mq " + count++;TextMessage message = session.createTextMessage(msg);  System.out.println("Producer produce : " + msg);  producer.send(message);//session.commit();Thread.sleep(2000);}  }  public static void main(String[] args) {Producer producer = new Producer();producer.produce();}
}package activntymq.topic;import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import activntymq.base.MQConnectionConfig;public class Consumer {//持久订阅,客户端id设置成唯一private String clientId = "durableClient";public void cosume(){Connection connection = MQConnectionConfig.getConnection();Session session = null;MessageConsumer consumer = null;try {connection.setClientID(clientId);//持久订阅,需设置客户端idconnection.start();//false:不支持事物,true:支持事物session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createTopic("myFirstTopic");//主题//持久性订阅需设置消费者名称consumer = session.createDurableSubscriber((Topic) destination, clientId);//非持久性订阅//MessageConsumer consumer = session.createConsumer(destination);reciveMsg(consumer, session);} catch (JMSException e) {e.printStackTrace();}finally{MQConnectionConfig.close(null, consumer, session, connection);}}private void reciveMsg(MessageConsumer consumer, Session session) {try {while(true){TextMessage message = (TextMessage) consumer.receive(2000);if(message != null){System.out.println("Consumer sonsume : " + message.getText());}//session.commit();}} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}  }public static void main(String[] args) {Consumer consumer = new Consumer();consumer.cosume();}
}

总结

总的来说感觉学习activitymq的方法很简单。找一本书,然后把书上介绍的知识自己用代码一一实现。

ActivityMQ应用详解相关推荐

  1. 从命令行到IDE,版本管理工具Git详解(远程仓库创建+命令行讲解+IDEA集成使用)

    首先,Git已经并不只是GitHub,而是所有基于Git的平台,只要在你的电脑上面下载了Git,你就可以通过Git去管理"基于Git的平台"上的代码,常用的平台有GitHub.Gi ...

  2. JVM年轻代,老年代,永久代详解​​​​​​​

    秉承不重复造轮子的原则,查看印象笔记分享连接↓↓↓↓ 传送门:JVM年轻代,老年代,永久代详解 速读摘要 最近被问到了这个问题,解释的不是很清晰,有一些概念略微模糊,在此进行整理和记录,分享给大家.在 ...

  3. docker常用命令详解

    docker常用命令详解 本文只记录docker命令在大部分情境下的使用,如果想了解每一个选项的细节,请参考官方文档,这里只作为自己以后的备忘记录下来. 根据自己的理解,总的来说分为以下几种: Doc ...

  4. 通俗易懂word2vec详解词嵌入-深度学习

    https://blog.csdn.net/just_so_so_fnc/article/details/103304995 skip-gram 原理没看完 https://blog.csdn.net ...

  5. 深度学习优化函数详解(5)-- Nesterov accelerated gradient (NAG) 优化算法

    深度学习优化函数详解系列目录 深度学习优化函数详解(0)– 线性回归问题 深度学习优化函数详解(1)– Gradient Descent 梯度下降法 深度学习优化函数详解(2)– SGD 随机梯度下降 ...

  6. CUDA之nvidia-smi命令详解---gpu

    nvidia-smi是用来查看GPU使用情况的.我常用这个命令判断哪几块GPU空闲,但是最近的GPU使用状态让我很困惑,于是把nvidia-smi命令显示的GPU使用表中各个内容的具体含义解释一下. ...

  7. Bert代码详解(一)重点详细

    这是bert的pytorch版本(与tensorflow一样的,这个更简单些,这个看懂了,tf也能看懂),地址:https://github.com/huggingface/pytorch-pretr ...

  8. CRF(条件随机场)与Viterbi(维特比)算法原理详解

    摘自:https://mp.weixin.qq.com/s/GXbFxlExDtjtQe-OPwfokA https://www.cnblogs.com/zhibei/p/9391014.html C ...

  9. pytorch nn.LSTM()参数详解

    输入数据格式: input(seq_len, batch, input_size) h0(num_layers * num_directions, batch, hidden_size) c0(num ...

  10. Java集合详解之Map

    一.首先看看集合框架体系图 从图中可以看到,Map接口扩展了Iterator接口,关于Iterator接口详解请移步:Iterator接口详解 二.Map是什么? Map<k,v>使用键值 ...

最新文章

  1. cytoscape使用方法_信号通路分析工具教程——Cytoscape及OmniPath插件
  2. 2017-5-17 分析文本
  3. Windows下Zookeeper启动zkServer.cmd闪退问题的解决方案
  4. GitHub 上最受欢迎的 5 大 Java 项目
  5. linux两文件对比,linux对比两个文件的差异
  6. AttributeError: module ‘matplotlib‘ has no attribute ‘image‘
  7. java 随机抽取数组内容_工具类:随机抽取数组或集合中的几个不重复元素
  8. 2020 对自己的要求(专注力,执行力,心态)
  9. oracle odi 配置安装,ODI的安装和配置
  10. android中关于keytool 错误:java.lang.Exception:密钥库文件不存在: 解决步骤
  11. 毕业设计 高校排课系统
  12. Reporting Services 配置工具
  13. PHP初中级面试题收集
  14. Mac安装以及使用SVN提交代码教程
  15. 基于ArcGIS的Python——要素类至地理数据库
  16. 为什么我明明设置了 height: XX%,却不起作用?
  17. 【小伟哥AI之路】Jetson Nano之4针PWM风扇转速控制
  18. HIT_SC:实验回顾 - Lab3
  19. IData T1扫码终端H5 扫码实现
  20. Ubuntu 联想Y410P 博通Broadcom 43xx 无线网卡 安装驱动的方法

热门文章

  1. [php]php内存管理
  2. 这一年,这些书:2021年读书笔记
  3. 堆排序(Java语言实现)
  4. Axure使用图标字体
  5. Verilog 语言2选1数据选择器
  6. 彭国伦Fortran95学习笔记(一)第一至七章
  7. 游戏建模软件的ZBrush和Mudbox哪个好
  8. Java设计模式——策略模式
  9. windows网络编程 gethostbyname()
  10. 电脑qq语音连不到服务器,电脑问题:qq语音正常?