本文以ActiveMQ最新的5.10版本为准。

大家知道,JMS规范中,Message消息头接口中有setJMSRedelivered(boolean redelivered)和getJMSRedelivered()方法,用于设置和获取消息的重发标志,当然set方法主要是MOM来调用的,我们客户端使用的是get方法。

还记得当时阿里的电话面试曾问过我,你知道ActiveMQ中的消息重发时间间隔和重发次数吗?我当时尴尬了,只知道会重发,还真没去了解过其中的细节,所以最后被完美的“淘汰了”。

后来有时间了就去网上看了下官方的文档,所以现在把ActiveMQ中的重发机制和大家一起分享一下。

首先,我们得大概了解下,在哪些情况下,ActiveMQ服务器会将消息重发给消费者,这里为简单起见,假定采用的消息发送模式为队列(即消息发送者和消息接收者)。

1.如果消息接收者在处理完一条消息的处理过程后没有对MOM进行应答,则该消息将由MOM重发。需要注意的是,如果采用非事务持久化消息加Session.CLIENT_ACKNOWLEDGE应答模式,当消费者在处理完消息后没有主动调用Message#acknowledge()方法时,MOM不会主动重发,如果这时候MOM宕机了,当重启MOM后,将消费者机器也重启后MOM才会重发消息,但此时的消息不会有重发标记,因为MOM都不记得自己有宕机过,也不知道这些消息被发送过。

2.如果我们队某个队列设置了预读参数(consumer.prefetchSize),如果消息接收者在处理第一条消息时(没向MOM发送消息接收确认)就宕机了,则预读数量的所有消息都将被重发。

3.如果Session是事务的,则只要消息接收者有一条消息没有确认,或发送消息期间MOM或客户端某一方突然宕机了,则该事务范围中的所有消息MOM都将重发。

说到这里,大家可能会有疑问,ActiveMQ消息服务器怎么知道消费者客户端到底是消息正在处理中还没来得急对消息进行应答还是已经处理完成了没有应答或是宕机了根本没机会应答呢?其实在所有的客户端机器上,内存中都运行着一套客户端的ActiveMQ环境,该环境负责缓存发来的消息,负责维持着和ActiveMQ服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。

我们可以来对ActiveMQ的重发策略(Redelivery Policy)来进行自定义配置,其中的配置参数主要有以下几个:

a) collisionAvoidanceFactor :碰撞躲避因数,默认值是0.15,这个参数是为了躲避高并发的重发带来的问题,我们查看org.apache.activemq.RedeliveryPolicy类的源代码,

// +/-15% for a 30% spread -cgs
    protected double collisionAvoidanceFactor = 0.15d;
    protected long initialRedeliveryDelay = 1000L;

可以发现,该默认值带来的变动范围是正负百分之15,也就是有30%的范围,也就是说,如果延迟发送时间(也就是initialRedeliveryDelay 默认值)是1000毫秒,则该条消息第一次有可能被拖延850毫秒到1150毫秒之间后被发送,如果有第二次重发,基数就不是1000毫秒了,而是以上一次重发拖延时间为基础来算。源代码如下:

public long getNextRedeliveryDelay(long previousDelay) {
        long nextDelay = redeliveryDelay;

if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
            nextDelay = (long) (previousDelay * backOffMultiplier);
            if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
                // in case the user made max redelivery delay less than redelivery delay for some reason.
                nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
            }
        }

if (useCollisionAvoidance) {
            /*
             * First random determines +/-, second random determines how far to
             * go in that direction. -cgs
             */
            Random random = getRandomNumberGenerator();
            double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
            nextDelay += nextDelay * variance;

        }

return nextDelay;
    }

b)maximumRedeliveries :最大重发次数,默认值是6,如果你想不限次数重发,可以设置成-1。同样是org.apache.activemq.RedeliveryPolicy类中的代码:

public static final int NO_MAXIMUM_REDELIVERIES = -1;
    public static final int DEFAULT_MAXIMUM_REDELIVERIES = 6;

protected int maximumRedeliveries = DEFAULT_MAXIMUM_REDELIVERIES;

我们探究一下maximumRedeliveries 的get方法,可以发现有org.apache.activemq.ActiveMQSession和org.apache.activemq.ActiveMQMessageConsumer两个类中有用到:

其中ActiveMQSession中的代码如下:

// Figure out how long we should wait to resend this message.
           long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
           for (int i = 0; i < redeliveryCounter; i++) {

// 每次重发拖延时间都是以上一次重发拖延时间来算,所以这里for循环来取得最新的拖延时间
                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
            }

// 交给定时任务重发
            connection.getScheduler().executeAfterDelay(new Runnable() {

public void run() {
                              ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
                     }
             }, redeliveryDelay);

ActiveMQMessageConsumer中的代码类似。

c)maximumRedeliveryDelay :重发最大拖延时间,默认为-1,表示没有最大拖延时间,此参数只有当useExponentialBackOff 为true时起效。同样是RedeliveryPolicy中的代码:

protected long maximumRedeliveryDelay = -1;

public long getNextRedeliveryDelay(long previousDelay) {
        long nextDelay = redeliveryDelay;

if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
            nextDelay = (long) (previousDelay * backOffMultiplier);
            if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
              
  // in case the user made max redelivery delay less than redelivery delay for some reason.
                nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
            }

        }

。。。。。

}

看源代码就显而易见了。

d)initialRedeliveryDelay :第一次重发的拖延时间基础,默认是1000,单位为毫秒,前面讲collisionAvoidanceFactor 属性时已经提到过,这里不再多说。

e)redeliveryDelay :如果initialRedeliveryDelay 为0,则使用redeliveryDelay ,默认也是1000。RedeliveryPolicy中源代码如下:

protected long initialRedeliveryDelay = 1000L;
    protected long redeliveryDelay = initialRedeliveryDelay;

f)useCollisionAvoidance :消息重发时是否采用前面提到的碰撞避免collisionAvoidanceFactor 参数,默认是false,不采用。源代码上面也给出了,这里不再多说。

g)useCollisionAvoidance :是否使用成倍增加拖延,默认为false,如果我们希望重发的拖延时间一次比一次大很多,则可以设置它为true。上面已经给出过源代码,这里再次给出:

protected boolean useExponentialBackOff;
    protected double backOffMultiplier = 5.0;

public long getNextRedeliveryDelay(long previousDelay) {
        long nextDelay = redeliveryDelay;

if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
            nextDelay = (long) (previousDelay * backOffMultiplier);
            if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
                // in case the user made max redelivery delay less than redelivery delay for some reason.
                nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
            }
        }

if (useCollisionAvoidance) {
            /*
             * First random determines +/-, second random determines how far to
             * go in that direction. -cgs
             */
            Random random = getRandomNumberGenerator();
            double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
            nextDelay += nextDelay * variance;
        }

return nextDelay;
    }
可以看出,成倍拖延是将上一次拖延时间乘以backOffMultiplier来实现的,而 backOffMultiplier默认为5.

h)backOffMultiplier :成倍拖延时间的倍率,默认为5,上面已经提到了,这里不再多说。

那么接下来我们讨论下该如何配置上面所说的几项,我们可以通过Java代码,也就是JMS API来配置,也可以通过Spring来配置,当然也可以通过连接器的URL来配置:

如果直接使用JMS API来使用ActiveMQ,我们可以如下配置(代码来自ActiveMQ的官方说明):

 ActiveMQConnection connection ...  // Create a connection
 RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
 queuePolicy.setInitialRedeliveryDelay(0);
 queuePolicy.setRedeliveryDelay(1000);
 queuePolicy.setUseExponentialBackOff(false);
 queuePolicy.setMaximumRedeliveries(2);
 
 RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
 topicPolicy.setInitialRedeliveryDelay(0);
 topicPolicy.setRedeliveryDelay(1000);
 topicPolicy.setUseExponentialBackOff(false);
 topicPolicy.setMaximumRedeliveries(3);
 
 // Receive a message with the JMS API
 RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
 map.put(new ActiveMQTopic("topic1"), topicPolicy);
 map.put(new ActiveMQQueue("queue1"), queuePolicy);
注意,从ActiveMQ5.7开始,我们可以给每个目的地(Destination)配置不同的重发策略。
ActiveMQConnection类中有一个成员变量    private RedeliveryPolicyMap redeliveryPolicyMap;,用来给不同的目的地配置不同的重发策略。
至于如何在连接器的URL上配置,可以参考官方文档:http://activemq.apache.org/connection-configuration-uri.html

ActiveMQ的消息重发机制相关推荐

  1. ActiveMQ的消息重发策略和DLQ处理

    2019独角兽企业重金招聘Python工程师标准>>> ActiveMQ的消息重发策略和DLQ处理 博客分类: MQ 在以下三种情况中,ActiveMQ消息会被重发给客户端/消费者: ...

  2. RabbitMQ消息确认机制和消息重发机制

    一.机制 首先我们要知道一条消息的传递过程. 生产者 -> 交换机 ->  队列 我们的生产者生产消息,生产完成的消息发送到交换机,由交换机去把这个消息转发到对应的队列上.这其中我们可能在 ...

  3. 【转】ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> 本文转载自 http://shift-alt-ctrl.iteye.com/blog/2020182 AcitveMQ是作为一 ...

  4. ActiveMQ消息传送机制以及ACK机制详解

    2019独角兽企业重金招聘Python工程师标准>>> AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全 ...

  5. springboot整合activemq加入会签,自动重发机制,持久化

    消费者客户端成功接收一条消息的标志是:这条消息被签收. 消费者客户端成功接收一条消息一般包括三个阶段:          1.消费者接收消息,也即从MessageConsumer的receive方法返 ...

  6. Java短信确认机制_JAVA 消息确认机制之 ACK 模式

    JAVA 消息确认机制之 ACK 模式 CLIENT_ACKNOWLEDGE : 客户端手动确认, 这就意味着 AcitveMQ 将不会 "自作主张" 的为你 ACK 任何消息, ...

  7. MQ 入门(四)—— 消息确认机制Ack

    一.ACK机制简介 ACK (Acknowledgement),即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符.表示发来的数据已确认接收无误. JMS API中约定了Client端可以 ...

  8. 方法~作用于对象~失败_消息三:ActiveMQ Topic 消息失败重发

    3.1 JMS消息确认机制 在 Session 接口中定义的几个常量: AUTO_ACKNOWLEDGE = 1 自动确认 CLIENT_ACKNOWLEDGE = 2 客户端手动确认 DUPS_OK ...

  9. activemq 消息阻塞优化和消息确认机制优化

    一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...

最新文章

  1. java中Collections的接口及类层次图
  2. 让delphi程序不受WINDOWS日期格式的影响
  3. 一种User Mode下访问物理内存及Kernel Space的简单实现
  4. 浪费了4年后,公司的产品小哥去快手搞 Java 了
  5. Mac安装mysql数据库【亲测有用】
  6. JAVA面试要点002_Git中fetch和pull的区别
  7. 中国历史上成功的两人合作, 改进, 提高的例子
  8. MVCC(Multiversion concurrency control)
  9. dcs world f15c教学_【温故知新】DCS如何操作?看这篇就全懂了!
  10. 图像处理: 五种 插值法
  11. IC Compiler II(ICC II)后端设计流程——超详细
  12. python实时监控电脑运行信息(邮件预警)
  13. win10此计算机未连接到网络,win10提示无法连接到此网络是怎么回事 怎么办
  14. 和平精英灵敏度分享码服务器没有响应,和平精英2021最稳灵敏度分享码完整推荐...
  15. FastDFS文件系统单机环境搭建
  16. Charles抓包的使用步骤
  17. 王者荣耀测试自己本命英雄软件,王者荣耀在哪测本命英雄
  18. 使用 HTML 5 Canvas 和 Raycasting 创建伪 3D 游戏
  19. 2021-2027全球及中国酚醛内衬瓶盖行业研究及十四五规划分析报告
  20. Excel函数的使用和参数

热门文章

  1. 蓝桥杯 试题 算法训练 进击的青蛙(C++)
  2. Dota2 参议院java
  3. 记录uni.app开发微信小程序中地图的使用,以及项目中的解决办法
  4. 移动OA系统,联动企业协作让办公高效无间断
  5. 咱们码农可以从曾国藩身上学到点什么呢(一)
  6. selenium,geckodriver与Firefox版本不匹配
  7. 数据结构无向图简单路径
  8. 为什么要做个伸手党?程序员基础素养
  9. 重力感应器G—sensor 驱动分析
  10. 公安大学c语言真题,【图片】公安技术-网安 考研专业课 历年真题回忆 !干货!【中国人民公安大学吧】_百度贴吧...