ACK简介

在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?

RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

DEMO

当然,如果消费者不告知发送者我这边消费信息异常,那么发送者是不会知道的,所以消费者在设置监听的时候需要给个回调,具体代码如下:

consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {  for (MessageExt messageExt : msgs) {    String messageBody = new String(messageExt.getBody());    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容    }    //返回消费状态  //CONSUME_SUCCESS 消费成功  //RECONSUME_LATER 消费失败,需要稍后重新消费  return ConsumeConcurrentlyStatus.RECONSUME_LATER;  }  }); 

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

修改重试时间

重试的时间默认如下,这个可以通过查看broker的日志,

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

当然,这个重复的时间间隔是可以在配置文件内设置的,由于我这边配置的双master模式,所以在128服务器的broker-a.properties和129的broker-b.properties中分别配置,如下图,设置好后务必将之前的数据清理,具体查看RocketMQ双Master环境部署

messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s

重试消息的处理

一般情况下我们在实际生产中是不需要重试16次,这样既浪费时间又浪费性能,理论上当尝试重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。

package com.gwd.rocketmq;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;/**
* @FileName Consumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年6月25日 上午9:49:39
* 修改历史:
* 时间           作者          版本        描述
*====================================================
*
*/
public class Consumer {public static void main(String[] args) throws MQClientException {  //声明并初始化一个consumer  //需要一个consumer group名字作为构造方法的参数,这里为consumer1  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");  //同样也要设置NameServer地址  consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");  //这里设置的是一个consumer的消费策略  //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息  //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍  //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  //设置consumer所订阅的Topic和Tag,*代表全部的Tag  consumer.subscribe("TopicTest", "*");  //设置一个Listener,主要进行消息的逻辑处理  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {  for (MessageExt messageExt : msgs) {    if(messageExt.getReconsumeTimes()==3) {//可以将对应的数据保存到数据库,以便人工干预System.out.println(messageExt.getMsgId()+","+messageExt.getBody());return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}       //返回消费状态  //CONSUME_SUCCESS 消费成功  //RECONSUME_LATER 消费失败,需要稍后重新消费  return ConsumeConcurrentlyStatus.RECONSUME_LATER;  }  });  //调用start()方法启动consumer  consumer.start();  System.out.println("Consumer Started.");  }
}

所以任何异常都要捕获返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup的名字,如下图:

注意点

1.如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。

2.当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有ConsumeConcurrentlyStatus.RECONSUME_LATER的这个状态,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

测试案例

(1)producer

package com.gwd.rocketmq;import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;/**
* @FileName Producer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年6月25日 上午9:48:37
* 修改历史:
* 时间           作者          版本        描述
*====================================================
*
*/
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {  //声明并初始化一个producer  //需要一个producer group名字作为构造方法的参数,这里为producer1  DefaultMQProducer producer = new DefaultMQProducer("producer1");  //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔  //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里  producer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");  //     producer.setVipChannelEnabled(false);//3.2。6的版本没有该设置,在更新或者最新的版本中务必将其设置为false,否则会有问题  //调用start()方法启动一个producer实例  producer.start();  //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值  for (int i = 0; i < 5; i++) {  try {  Message msg = new Message("TopicTest",// topic  "TagA",// tag  ("Hello RocketMQ " + i).getBytes("utf-8")// body  );          //调用producer的send()方法发送消息  //这里调用的是同步的方式,所以会有返回结果  SendResult sendResult = producer.send(msg);  //打印返回结果,可以看到消息发送的状态以及一些相关信息  System.out.println(sendResult);  } catch (Exception e) {  e.printStackTrace();  Thread.sleep(1000);  }  }  //发送完消息之后,调用shutdown()方法关闭producer  producer.shutdown();  }
}

(2)Consumer

package com.gwd.rocketmq;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;/**
* @FileName Consumer.java
* @Description:
* @author gu.weidong
* @version V1.0
* @createtime 2018年6月25日 上午9:49:39
* 修改历史:
* 时间           作者          版本        描述
*====================================================
*
*/
public class Consumer {public static void main(String[] args) throws MQClientException {  //声明并初始化一个consumer  //需要一个consumer group名字作为构造方法的参数,这里为consumer1  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");  //同样也要设置NameServer地址  consumer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");  //这里设置的是一个consumer的消费策略  //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息  //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍  //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  //设置consumer所订阅的Topic和Tag,*代表全部的Tag  consumer.subscribe("TopicTest", "*");  //设置一个Listener,主要进行消息的逻辑处理  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {  for (MessageExt messageExt : msgs) {    String messageBody = new String(messageExt.getBody());    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容    }    //返回消费状态  //CONSUME_SUCCESS 消费成功  //RECONSUME_LATER 消费失败,需要稍后重新消费  return ConsumeConcurrentlyStatus.RECONSUME_LATER;  }  });  //调用start()方法启动consumer  consumer.start();  System.out.println("Consumer Started.");  }
}

(3)测试结果如下

RocketMQ——ack机制保证消费成功相关推荐

  1. RocketMQ重试机制(ACK确认机制)

    大家好,我是一个喜欢诗词的java研发赛亚人,感谢您的关注~ ┗( ▔, ▔ )┛.微信搜索[程序猿卡卡罗特],后续有更多硬核文章哦~ 今日诗词:少年恃险若平地,独倚长剑凌清秋. – [唐·顾况]&l ...

  2. ack strom 保证只有一次_Storm容错机制(一):ACK机制

    前言 好久没有写文章了,然后一连就写了三篇, 前两篇文章 Storm入门(一):编程模型 Storm入门(二):架构模型和集群部署 都是一些比较简单的入门教程,这一篇我们来聊一聊稍微高级点的话题, 关 ...

  3. 用redis实现消息队列(实时消费+ack机制)【转】

    用redis实现消息队列(实时消费+ack机制) java queue 消息队列 redis 消息队列 首先做简单的引入. MQ主要是用来: 解耦应用. 异步化消息 流量削峰填谷 目前使用的较多的有A ...

  4. kafkaspot在ack机制下如何保证内存不溢

    storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送:如果不实现ack机制, ...

  5. RocketMQ消费失败如何处理?如何保证消费消息的幂等性?

    文章目录 1. 消息消费失败如何处理? 2. 如何保证消费消息的幂等性? 1. 消息消费失败如何处理? 当消费者从Broker获取到消息后会进行消费,并返回消费状态.如下代码所示 //broker推消 ...

  6. RabbitMQ如何保证消息发送、消费成功

    好记忆不如烂笔头,能记下点东西,就记下点,有时间拿出来看看,也会发觉不一样的感受. 目录 1.发送确认机制设置 2.消息丢失.非信任或失败 3.消息重复消费 4.消费成功通知 5.总结 消息因为其:削 ...

  7. RocketMQ消息ACK机制

    1 RocketMQ是以consumer group+queue来确认消息消费进度,通过gruop+offset来标记一个goroup在queue上消费进度,消费成功之后都会返回一个ack消息告之br ...

  8. rdkafka线程过多_Kafka/RocketMQ 多线程消费时如何保证消费顺序?

    上两篇文章都在讨论顺序消息的一些知识,看到有个读者的留言如下: 这个问题问得非常棒,由于在之前的文章中并没有提及到,因此我在这篇文章中单独讲解,本文将从消费顺序性这个问题出发,深度剖析 Kafka/R ...

  9. RocketMQ 重试机制

    2019独角兽企业重金招聘Python工程师标准>>> RocketMQ 重试机制 消息重试分为2种:Producer端重试和Consumer端重试. Producer端重试 生产者 ...

最新文章

  1. 怎么理解ASM中的Failgroup
  2. PHP处理Oracle的CLOB
  3. Hiding Images in Plain Sight: Deep Steganography 于众目睽睽之下隐藏图像:深度隐写术
  4. C#泛型简化代码量示例
  5. 《蒙福人生》读后感作文2900字
  6. nginx获得response自定义的header
  7. concat合并的数组会有顺序么_超全的JS常用数组方法整理
  8. 平行空间怎么设置32位_每周推荐 | 空间占用减小32%,具有高级功能的16位逻辑封装...
  9. 一行行地读取输入行,将把最长的行打印出来
  10. Hibernate3.x异常No row with the given identifier exists 解决方法
  11. 【Python-3.5】win7安装Pygame
  12. arch linux u盘安装,安装 ArchLinux 到U盘(四)安装Archlinux
  13. Kubernetes(K8s) 1.14.3 单机版配置 node 节点 是 taint 时解决方法
  14. Python可视化深度图
  15. android 画扇形进度条,css绘制扇形进度条
  16. 基于Opencv实现车牌图片识别系统
  17. JavaScript高级程序设计闭包学习理解
  18. PS零基础自学笔记:常见操作方法记录(去水印、抠图、调色调)
  19. 华为模拟器eNSP下载与安装教程
  20. 软件工程基础知识-软件质量

热门文章

  1. RK3566-平板电脑主板方案
  2. Elasticsearch基本概念
  3. 第三方支付平台的状态表
  4. 企业邮箱哪个品牌好?
  5. 永硕E盘:很实用的网络硬盘和网址收藏夹
  6. 【OnMyWay】我为啥弃用了Ubuntu
  7. word中如何在一段文字中部分为标题,其余为正文
  8. 使用阿里云来搭建视频直播服务
  9. SpringBoot结合redis解决PV、UV亿级流量
  10. 直付通,支付宝白名单D0,接口申请及对接流程