概述

ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,在发送完一个批次的消息后,指针的标记位置指向下一批次待发送消息的位置,进行后续的发送操作。这是一种比较健壮和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态。
因此,从ActiveMQ5.0.0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发送系统直接把持久消息发送给消费者,当消费者处于不活跃状态下,切换使用Cursors来处理消息的发送。

当消息消费者处于活跃状态并且处理能力比较强时,被持久存储的消息直接被发送到与消费者关联的发送队列,见下图

当消息已经出现积压,消费者再开始活跃;或者消费者的消费速度比消息的发送速度慢时,消息将从Pending Cursor中提取,并发送与消费者关联的发送队列。见下图

Message Cursors分成三种类型

1: Store-based
2:VM

3:File-based

Store-based

从activemq5.0开始,默认使用此种类型的cursor,其能够满足大多数场景的使用要求。同时支持非持久消息的处理,Store-based内嵌了File-based的模式,非持久消息直接被Non-Persistent Pending Cursor所处理。工作模式见下图

VM

相关的消息引用存储在内存中,当满足条件时,消息直接被发送到消费者与之相关的发送队列,处理速度非常快,但出现慢消费者或者消费者长时间处于不活跃状态的情况下,无法适应。工作模式见下图 :

File-based

当内存设置达到设置的限制,消息被存储到磁盘中的临时文件中。工作模式见下图 :

配置使用

在缺省情况下,ActiveMQ会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors,例如:
1:对Topic subscribers
<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
                <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <deadLetterStrategy>
                <individualDeadLetterStrategy topicPrefix="Test.DLQ." />
            </deadLetterStrategy>
            <pendingSubscriberPolicy>
                <vmCursor />
            </pendingSubscriberPolicy>
            <pendingDurableSubscriberPolicy>
                <vmDurableCursor/>
            </pendingDurableSubscriberPolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>

</destinationPolicy>

配置说明:有效的Subscriber类型是vmCursor和fileCursor,缺省是store based cursor。有效的持久化Subscriber的cursor types是storeDurableSubscriberCursor, vmDurableCursor 和fileDurableSubscriberCursor,缺省是store based cursor。
对于Queues的配置
<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue="org.apache.>">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
                </deadLetterStrategy>
                <pendingQueuePolicy>
                    <vmQueueCursor />
                </pendingQueuePolicy>
        </policyEntry>
    </policyEntries>
</policyMap>
</destinationPolicy>

配置说明:有效的类型是storeCursor, vmQueueCursor 和 fileQueueCursor

Async Sends 异步发送

AciveMQ支持异步和同步发送消息,是可以配置的。通常对于快的消费者,是直接把消息同步发送过去,但对于一个Slow Consumer,你使用同步发送消息可能出现Producer堵塞等现象,慢消费者适合使用异步发送。
配置使用
1:ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Fast Consumer则使用dispatcheAsync=false
2:在Connection URI级别来配置使用Async Send
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
3:在ConnectionFactory级别来配置使用Async Send
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
4:在Connection级别来配置使用Async Send

((ActiveMQConnection)connection).setUseAsyncSend(true);

Dispatch Policies

严格顺序分发策略(Strict Order Dispatch Policy)
通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。
Strict order dispatch policy 会保证每个topic consumer会以相同的顺序接收消息,代价是性能上的损失。以下是一个配置例子:
<policyEntry topic="ORDERS.>">
    <dispatchPolicy>
        <strictOrderDispatchPolicy />
    </dispatchPolicy>
</policyEntry>
对于Queue的配置为:

<policyEntry queue=">" strictOrderDispatch="false" />

轮询分发策略(Round Robin Dispatch Policy)
ActiveMQ的prefetch缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大。而且缺省的dispatch policies会尝试尽可能快的填满prefetch缓冲。
然而在有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这些少量的消息总是倾向于被分发到个别的consumer上。这样就会因为负载的不均衡分配而导致处理时间的增加。
Round robin dispatch policy会尝试平均分发消息,以下是一个例子:
<policyEntry topic="ORDERS.>">
    <dispatchPolicy>
        <roundRobinDispatchPolicy/>
    </dispatchPolicy>

</policyEntry>

Optimized Acknowledgement

ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。如果希望在应用程序中禁止经过优化的确认方式,那么可以采用如下方法:
1:在Connection URI 上启用Optimized Acknowledgements
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=true");
2:在ConnectionFactory 上启用Optimized Acknowledgements
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(true);
3:在Connection上启用Optimized Acknowledgements
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);

4:5.6以后的版本,还可以在Connection URI上设置setOptimizeAcknowledgeTimeOut参数,默认值为300ms,你可以设置自己要用的值,0表示禁用。

Producer Flow Control

生产者流量控制(Producer Flow Control)
流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞直到资源可以继续使用,或者抛出一个JMSException,可以通过<systemUsage>来配置。
同步发送消息的producer会自动使用producer flow control ;对于异步发送消息的producer,要使用producer flow control,你先要为connection配置一个ProducerWindowSize参数,如下:
((ActiveMQConnectionFactory)cf).setProducerWindowSize(1024000);
ProducerWindowSize是producer在发送消息的过程中,收到broker对于之前发送消息的确认之前, 能够发送消息的最大字节数
可以禁用producer flow control,以下是ActiveMQ配置文件的一个例子
<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="FOO.>" producerFlowControl="false"/>
        </policyEntries>
    </policyMap>

</destinationPolicy>

注意,自从ActiveMQ 5.x中引入新的消息游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。
结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放在内存中,并在达到内存限制的时候停掉生产者,你需要配置<vmQueueCursor>,示例如下:
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
    <pendingQueuePolicy>
        <vmQueueCursor/>
    </pendingQueuePolicy>
</policyEntry>
上面的配置可以保证所有的非持久化队列消息都保存在内存中,每一个队列的内存限制为1Mb配置客户端的异常

为了应对代理空间不足,而导致不确定的阻塞send()方法的一种替代方案,就是将其配置成客户端抛出的一个异常。通过将sendFailIfNoSpace属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端。下面是一个配置的示例:

<systemUsage>
    <systemUsage sendFailIfNoSpace="true">
        <memoryUsage>
            <memoryUsage limit="20 mb"/>
        </memoryUsage>
    </systemUsage>
</systemUsage>
这么配置的好处是,客户端可以捕获javax.jms.ResourceAllocationException异常,稍等一下,并重试send()操作,而不是无限期地傻等下去。
从5.3.1版本之后,sendFailIfNoSpaceAfterTimeout属性被加了进来。这个属性同样导致send()方法失败,并在客户端抛出异常,但仅当等待了指定时间之后才触发。如果在配置的等待时间过去之后,代理上的空间仍然没有被释放,仅当这个时候send()方法才会失败,并且在客户端抛出异常。示例:
<systemUsage>
    <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
        <memoryUsage>
            <memoryUsage limit="20 mb"/>
        </memoryUsage>
    </systemUsage>
</systemUsage>
定义超时的单位是毫秒

System usage
可以通过<systemUsage>元素的一些属性来减慢生产者,如下例子:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="10 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
你可以为非持久化的消息设置内存限制,为持久化消息设置磁盘空间,以及为临时消息设置总的空间,broker将在减慢生产者之前使用这些空间。使用上述的默认设置,broker将会一直阻塞send()方法的调用,直至一些消息被消费,有了可用的空间。

ActiveMQ Message Cursors、Async Sends、Optimized Acknowledgement、Producer Flow Control相关推荐

  1. ActiveMq生产者流量控制(Producer Flow Control)

    最近学习ActiveMq,昨日查看其配置文件activemq.xml的时候,被一行注释引到了http://activemq.apache.org/producer-flow-control.html页 ...

  2. 小程序高级电商前端第2周深入理解REST API开发规范 开启三端分离编程之旅<二>----scroll-view组件的灵活应用、async和await问题探讨、spu-scroll自定义组件

    前言: 转眼距离上一次写博文又过去一个月了,今年的博文节奏已经彻底被打破了: 真的是有心无力了,其原因在之前也提到过,组织架构调整,各种考核(跨领域性质的考核)实行末尾淘汰制,说不出的酸楚,不过换个心 ...

  3. 【转】异步编程系列(Thread、Task、async/await、ajax等)

    序 经过一番努力,我写的异步编程系列也算有头有尾,当然不是说这个系列已经更新完毕,这个头尾只是表示新旧知识点都有简单涉及到,接下去我还会丰富这一系列并且有机会整个小应用(愿景是弄一个开源组件吧,结合s ...

  4. ActiveMQ 消息游标(Message Cursors)

    Message Cursors 消息游标 A common problem in previous versions of ActiveMQ was running out of RAM buffer ...

  5. ES6中的promise、async、await用法详解

    <!DOCTYPE html> <html> <head><title>Promise.async.await</title> </h ...

  6. 那些年我们一起追逐的多线程(Thread、ThreadPool、委托异步调用、Task/TaskFactory、Parallerl、async和await)

    一. 背景 在刚接触开发的头几年里,说实话,根本不考虑多线程的这个问题,貌似那时候脑子里也有没有多线程的这个概念,所有的业务都是一个线程来处理,不考虑性能问题,当然也没有考虑多线程操作一条记录存在的并 ...

  7. 详解promise、async和await的执行顺序

    说明: 本文摘自 详解 promise.async和await的执行顺序. 1.题目和答案 一道题题目:下面这段promise.async和await代码,请问控制台打印的顺序? async func ...

  8. promise 、async/await 的原理及实现

    前言 事件循环机制 由于 javascript 引擎是采用单线程运行机制,执行耗时过大的操作时会造成页面的阻塞,为了解决页面的阻塞问题,js 将任务分为 同步任务.异步任务,随之而来的是异步带来的执行 ...

  9. 微信小程序全栈开发实践 第三章 微信小程序开发常用的API介绍及使用 -- 3.5 网络接口简介(五)基于Promise+await、async关键字改写登录模块

    零.回顾 在上节课我们主要实践练习了Promise的三个方法,包括any.all.race. 现在我们对Promise变成已经有了一个大致的了解. 这节课我们尝试将登录模块使用Promise编程方式进 ...

最新文章

  1. 以前5年只专注于.net,现今开始学习java.
  2. CreateWindow创建指定宽和高的client区域窗口的方法
  3. Unichar, char, wchar_t
  4. qam调制与解调matlab,QAM调制解调中如何加入软解调算法程序
  5. fixdown down,一个时代的终结。
  6. U盘文件目录损坏且无法读取
  7. 萌新记录自己刷过的题
  8. 阿木动态 | 助力机器人教育!一站式智能无人机专业课程建设方案!
  9. 【管理】需求分析与软件设计|需求分析报告和需求规格说明书的区别
  10. @mpx/cli 脚手架源码解析
  11. 1-13 格式化输出
  12. 前端基础第二天——HTML5基础
  13. NDK编译时NDK_PROJECT_PATH = null
  14. 看完张一鸣近十年微博,我总结了这些成长特质
  15. 吉他入门:吉他音阶训练入门教程
  16. 【2023年更新】1000个大数据/人工智能毕设选题推荐
  17. 诛仙服务器显示横线,诛仙手游字变颜色怎么弄_诛仙手游字体颜色代码大全_快吧手游...
  18. 向老板汇报,如何写好PPT?
  19. react性能监控根据工具_高性能React:3个新工具可加快您的应用程序
  20. 深度Linux 安装英伟达闭源驱动,deepin20 安装英伟达闭源驱动

热门文章

  1. 511遇见易语言乐玩插件找字扩展FindStrEx和快速找字扩展FindStrFastEx
  2. 【Active Learning - 03】Adaptive Active Learning for Image Classification
  3. OKHTTP 实现流式传输上传文件
  4. 计算机视觉英文翻译,计算机视觉(机器视觉)系统,computer vision (machine vision)system,音标,读音,翻译,英文例句,英语词典...
  5. IO流(文件流 , 缓冲流 , 对象流, 字符流 )
  6. OBS Studio的安装与使用
  7. STC15W单片机防丢语音报警GPS北斗定位测距双机LORA无线手持可充电
  8. Unity资源热更新--资源管理、Addressable
  9. WordPress Gutenberg Block扩展开发案例
  10. 股票中的做T的是什么意思?