总结:

1. activemq有2种消息传递语义:queue点对点 以及 topic 发布订阅

2. 消息发送到broker,consumer之后才连接,如果是queue还能消费到,如果是topic则消费不到。

3. 如果设置false,并且ack设置成AUTO_ACKNOWLEDGE 或者 DUPS_OK_ACKNOWLEDGE ,session非事务执行,消息
  static final int AUTO_ACKNOWLEDGE = 1; 客户端发送和接受消息不需要额外工作,不需要ack;
  static final int CLIENT_ACKNOWLEDGE = 2; 客户端确认,客户端接受消息,调用javax.jmx.Message的acknowledge方法,jms服务器才会删除消息;
  static final int DUPS_OK_ACKNOWLEDGE = 3; 类似auto ack, 自动批量确认消息,具有延迟发送ack的特点,ActiveMq内部实现积累一定数量自动确认.
static final int SESSION_TRANSACTED = 0; 这个是给creatSeession(int ack)方法使用,直接设置ack参数,那么前面的true/false则为true.

4. queue中的消息被消费后会被删除(如果是CLIENT_ACKNOWLEDGE 需要提交message.ack....()),多个消费者默认轮询,每人一条均分;

topic 中的消费会广播给所有consumer,每个cosumer都会收到一份。

5. 消息持久化用在activemq服务重启使用,有如下两种:
    static final int NON_PERSISTENT = 1;

static final int PERSISTENT = 2;

集群模式

Queue consumer clusters
消费者集群:如果订阅消息的任何一个消息者A宕机,未处理的消息自动发送到另一个订阅
此消息队列的消息者B
通过failover:// transport 协议实现

Broker clusters
代理集群:有多个代理A、B、C进行集群,消费者连接上A,如果A宕机,自动切换到B上。
通过failover:// protocol 方式实现
各代理间互不通信,如果某个代理上没有消费者,消息将在此代理上累积。

Discovery of brokers
支持自动发现机制:客户端自动发现和连接到一个可用的代理上,以及代理自动侦测和连接到另一个代理上。

Networks of brokers
代理组成的网络:多个代理组成代理网络,如果某个代理上消息没有被处理,通过存储和转发机制推送到另一个代理上处理,避免单点代理上消息的累积。
这种方式允许客户端连接到任一个代理上,而且代理宕机,自动连接到另一个代理上。
同时支持可申缩的大量客户端数量,同时有需要可以按需增加代理的数量。
可以认为这是客户端集群连接到代理集群,带有自动灾备和发现机制的简单易用的消息结构。

Master Slave
主从配置:消息在主从代理之间复制,如果主代理宕机,可以没有任何消息丢失的自动切换到从代理上。
可用于单独的代理,或由代理组成的网络中。

Replicated Message Stores
消息存储复制:通过SNA等共享存储等方式,多个代理共享消息存储文件,如果一个代理宕机,另一个代理直接可以使用同样的存储文件提供服务,提高代理可用性。

具体代码:

queue使用事务发送消费,以及同步消费,异步使用MessageListener接口即可:

public class ActivemqQueueProducer {private Session session;private MessageProducer producer;private Connection connection;private Queue queue;public void initialize() {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://dc2:61616");try {connection = connectionFactory.createConnection();// 事务执行如果时true,那么会开启事务,acknowledgeMode参数就会忽略;//  如果设置false,那么acknowledgeMode需要设置session = connection.createSession(true, Session.DUPS_OK_ACKNOWLEDGE);queue = session.createQueue("hlw_queue");producer = session.createProducer((Destination) queue);producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();} catch (JMSException e) {e.printStackTrace();}}public void sendText(String message) {TextMessage textMessage = null;try {textMessage = session.createTextMessage(message);producer.send(textMessage);System.out.println("Sending message" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}public void submit() {try {session.commit();} catch (JMSException e) {e.printStackTrace();}}public void close() {System.out.println("Producer:->Closeing connnection");try {if (producer != null) {producer.close();}if (session != null) {session.close();}if (connection != null) {connection.close();}} catch (JMSException e) {e.printStackTrace();}}}

同步消费:

public class ActivemqQueueConsumer {private String name = "";private String subject = "TOOL.DEFAULT";private Destination destination;private Connection connection;private Session session;private MessageConsumer consumer;public ActivemqQueueConsumer(String name) {this.name = name;}public void initialize() throws JMSException {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://dc2:61616");connection = connectionFactory.createConnection();//session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);destination = (Destination) session.createQueue("hlw_queue");consumer =  session.createConsumer(destination);connection.start();}public void receive() throws JMSException {initialize();System.out.println("Consumer(" + name + "):->Begin listening...");int count = 0;while (true) {TextMessage message = (TextMessage) consumer.receive();System.out.println("consumer recive:" + message.getText());//message.acknowledge();count++;System.out.println(count);}}public void submit() throws JMSException {session.commit();}public void close() throws JMSException {System.out.println("Consumer:->Closing connection");if (consumer != null) {consumer.close();}if (session != null) {session.close();}if (connection != null) {connection.close();}}
}

topic 事务发送:

ublic class ActivemqTopicProducer {private TopicSession session;private TopicPublisher producer;private Connection connection;private Topic topic;public void initialize() {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://dc2:61616");
//        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("amqp://dc2:5672");
//        JmsConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://dc2:5672");
//        JmsConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://dc2:61616");try {TopicConnection connection =  connectionFactory.createTopicConnection();session = connection.createTopicSession(true,Session.AUTO_ACKNOWLEDGE);topic = session.createTopic("hlw_topic");producer =  session.createPublisher(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();} catch (JMSException e) {e.printStackTrace();}}public void sendText(String message) {TextMessage textMessage = null;try {textMessage = session.createTextMessage(message);producer.send(textMessage);System.out.println("Sending message"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}public void submit(){try {session.commit();} catch (JMSException e) {e.printStackTrace();}}public void close(){System.out.println("Producer:->Closeing connnection");try {if (producer!=null){producer.close();}if (session!=null){session.close();}if(connection!=null){connection.close();}} catch (JMSException e) {e.printStackTrace();}}}

topic异步消费:

public class ActivemqTopicConsumerAsyn implements MessageListener {private String name = "";private Connection connection;private Session session;private MessageConsumer consumer;public ActivemqTopicConsumerAsyn(String name) {this.name = name;}public void initialize() throws JMSException {
//        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://dc2:61616");
//        connection = connectionFactory.createConnection();
//        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//        Topic topic = session.createTopic("hlw_topic");
//        consumer = session.createConsumer(topic);ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://dc2:61616");connection = connectionFactory.createConnection();connection.setClientID(name);session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("hlw_topic");ActiveMQTopicSubscriber activeMQTopicSubscriber = (ActiveMQTopicSubscriber)session.createDurableSubscriber(topic,"hlw_topic_subscriber");consumer = activeMQTopicSubscriber;connection.start();}public void receive() {try {initialize();System.out.println("Consumer(" + name + "):->Begin listening...");consumer.setMessageListener(this);} catch (JMSException e) {e.printStackTrace();}}@Overridepublic void onMessage(Message message) {try {if(message instanceof TextMessage){System.out.println("consumer("+name+")异步 receive:"+((TextMessage)message).getText());
//                Thread.sleep(500);}} catch (Exception e) {e.printStackTrace();}}public void submit() throws JMSException {session.commit();}public void close() throws JMSException {System.out.println("Consumer:->Closing connection");if (consumer != null) {consumer.close();}if (session != null) {session.close();}if (connection != null) {connection.close();}}
}

ActiveMq createSession DUPS_OK_ACKNOWLEDGE相关推荐

  1. activemq 消息阻塞优化和消息确认机制优化

    一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...

  2. 提高ActiveMQ工作性能

    2019独角兽企业重金招聘Python工程师标准>>> 提高ActiveMQ工作性能 博客分类: MQ (接上文<架构设计:系统间通信(22)--提高ActiveMQ工作性能( ...

  3. 【转】ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> 本文转载自 http://shift-alt-ctrl.iteye.com/blog/2020182 AcitveMQ是作为一 ...

  4. JMS ActiveMQ研究文档

    1. 背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性:(1)同步通信:客户发出调用后,必须等待服务 ...

  5. activemq - 浅析消息确认模式

    2019独角兽企业重金招聘Python工程师标准>>> 前言 JMS的消息确认模式,定义了客户端(消息发送者或者消费者)与broker确认消息的方式,可以认为是客户端与Broker之 ...

  6. ActiveMQ消息队列的使用及应用

    目录:  一:JMQ的两种消息模式1.1:点对点的消息模式1.2:订阅模式 二:点对点的实现代码2.1:点对点的发送端2.2:点对点的接收端 三:订阅/发布模式的实现代码3.1:订阅模式的发送端3.2 ...

  7. ActiveMQ入门系列二:入门代码实例(点对点模式)

    在上一篇<ActiveMQ入门系列一:认识并安装ActiveMQ(Windows下)>中,大致介绍了ActiveMQ和一些概念,并下载.安装.启动他,还访问了他的控制台页面. 这篇,就用代 ...

  8. 分布式--ActiveMQ 消息中间件(一) https://www.jianshu.com/p/8b9bfe865e38

    1. ActiveMQ 1). ActiveMQ ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message ...

  9. ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全 ...

最新文章

  1. 2021年春季学期-信号与系统-第七次作业参考答案-第四小题
  2. 都在说GPT-3和AlphaFold,2020没点别的AI技术突破了?
  3. make: *** [ext/fileinfo/libmagic/apprentice.lo] Er
  4. DelayQueue源码
  5. SAP CRM webclient ui drop down list key mode
  6. 卫星导航定位 -- 坐标系统与时间系统
  7. linux+tar怎样解压,如何在Linux上使用tar命令解压和压缩文件
  8. Nagios 监控平台快速安装
  9. jmeter 跳板机_Jmeter接口测试进阶
  10. MPush开源实时消息推送系统
  11. Android开发笔记之视频录制
  12. 数据结构的基本概念(ADT 抽象数据类型 数据结构三要素)
  13. 各种群体寻优算法的比较
  14. 阿里云服务器镜像操作系统如何选择?阿里云镜像注意事项
  15. 智能反射面(IRS)在无线通信安全领域应用的论文复现
  16. 亚马逊高管为何频繁离职
  17. linux 拼图游戏,2020经典宝石拼图
  18. 友盟php接入统计,ionic2 接入友盟统计
  19. Win11设备管理器在哪里打开?
  20. [考试]20141027

热门文章

  1. 10Easyx图形编程
  2. 城市区域二手房信息python爬取、保存和初步分析—笔记
  3. android命令大全 pdf,android调试桥(adb)常用命令.pdf
  4. 安装程序遇到错误:0x80240037 尝试打开时出错 - WSUSSCAN.cab 错误: 0x80070002。WSUSSCAN.cab文件 是什么?cab 是什么文件?
  5. SpringMVC 406
  6. 运筹学基础【三】 之 决策
  7. 以拼音输入法(自然语言处理)为例,简单理解隐含马尔可夫模型
  8. 【最全】ISTQB- FL模拟题(含答案)
  9. c语言共有几种运算符_C语言的运算符和表达式有哪些
  10. A Monocular SLAM System Leveraging Structural Regularity in Manhattan World