定义

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

Demo

我们仍然是利用 Spring Cloud Stream 的编程模型 + Spring Cloud Alibaba RocketMQ 来实现。

理论

在消费时,可以设置一个字段 ConsumeFromWhere(源码位置在:org.apache.rocketmq.common.consumer.ConsumeFromWhere),从哪开始消费。可选参数,去掉 Deprecated 的,剩下的就是

public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET,CONSUME_FROM_FIRST_OFFSET,CONSUME_FROM_TIMESTAMP,
}
  • CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费
  • CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费
  • CONSUME_FROM_TIMESTAMP:从某个时间开始消费

我们需要设置从某个时间开始消费,即配置 CONSUME_FROM_TIMESTAMP 并设置好具体的时间点。

实现

首先还是看一下配置文件

server:port: 8080servlet:context-path: /mq-examplespring:application:name: mq-examplecloud:stream:bindings:input-backtracking:content-type: application/jsondestination: test-topic3group: backtracking-consumer-group# 定义 name 为 output 的 binding 生产output-order:content-type: application/jsondestination: test-topic3rocketmq:# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类binder:# 配置 rocketmq 的 nameserver 地址name-server: 127.0.0.1:9876group: rocketmq-groupbindings:output-order:# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类producer:#group: producer-group # 生产者分组sync: true # 是否同步发送消息,默认为 false 异步。input-backtracking: # 回溯消息配置# com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPropertiesconsumer:consumeFromWhere: CONSUME_FROM_TIMESTAMPconsumeTimestamp: 20220117110148enabled: true # 是否开启消费,默认为 truebroadcasting: false # 是否使用广播消费,默认为 false 使用集群消费

这里我们仍然用之前的 ouput-order 作为生产者,生产消息。

消息者配置上主要注意 input-backtracking 节点中的属性配置:

  • consumeFromWhere 即上文提到的从哪儿开始消费,这里我们指定时间消费
  • consumeTimestamp 即指定的时间点

程序入口:

@SpringBootApplication
@EnableBinding({ MySource.class})
public class MqBootstrapApplication {public static void main(String[] args) {SpringApplication.run(MqBootstrapApplication.class);}}

要加上 @EnableBinding

MySource:

public interface MySource {@Output("output-order")MessageChannel output4Order();@Input("input-backtracking")MessageChannel inputBackTracking();}

controller 生产消息:

@GetMapping("/produce")public void produceMsg() {Map<String, Object> headers = Maps.newHashMapWithExpectedSize(16);headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);headers.put(MessageConst.PROPERTY_TAGS, "test03");Order order = Order.builder().id(1L).desc("test").build();Message message = MessageBuilder.createMessage(order, new MessageHeaders(headers));mySource.output4Order().send(message);}

ReceiveService 消费消息:


@Service
@Log4j2
public class ReceiveService {@StreamListener("input-backtracking")public void receiveBackTrackingInput(String receiveMsg, GenericMessage message, @Headers Map headers) {log.info("接收到回溯消息:{}", receiveMsg);}}

测试

可以先调用 controller 生产消息,或者不用 Demo 中的生产者生产消息,找一个之前发过消息的 topic , 看一下它的消息轨迹,找到存储时间

如果你用之前发过消息的 topic 记得修改配置文件中的 topic名称 :

确认找到的这条消息已经被消费过(因为要测回溯,至少是二次消费),将 consumeTimestamp 的时间配置在 存储时间之后。

这时启动项目,观察 ReceiveService 的输出:

 接收到回溯消息:{"id":1,"desc":"test"}

证明消息回溯消费成功。

参考

  • https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc
  • https://www.niewenjun.com/2020/05/09/fen-bu-shi/rocketmq/#toc-heading-29

自顶向下学习 RocketMQ(九):回溯消费相关推荐

  1. 源码分析RocketMQ顺序消息消费实现原理

    本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...

  2. 《RocketMQ实战专栏》为什么是你学习RocketMQ的最佳资料

    <RocketMQ实战与原理>专栏简介 简介 RocketMQ业界主流的消息中间件之一,承载公司核心业务消息的流转.对RocketMQ核心原理的理解与最佳实践成了开发与运维同学的必备技能. ...

  3. Maven学习总结(九)——使用Nexus搭建Maven私服

    2019独角兽企业重金招聘Python工程师标准>>> Maven学习总结(九)--使用Nexus搭建Maven私服 一.搭建nexus私服的目的 为什么要搭建nexus私服,原因很 ...

  4. IOS学习笔记(九)之UIAlertView(警告视图)和UIActionSheet(操作表视图)基本概念和使用方法...

    IOS学习笔记(九)之UIAlertView(警告视图)和UIActionSheet(操作表视图)基本概念和使用方法 Author:hmjiangqq Email:jiangqqlmj@163.com ...

  5. 从零开始学习jQuery (九) jQuery工具函数

    本系列文章导航 从零开始学习jQuery (一) 开天辟地入门篇 从零开始学习jQuery (二) 万能的选择器 从零开始学习jQuery (三) 管理jQuery包装集 从零开始学习jQuery ( ...

  6. JavaScript学习总结(九)——Javascript面向(基于)对象编程

    转载自  JavaScript学习总结(九)--Javascript面向(基于)对象编程 一.澄清概念 1.JS中"基于对象=面向对象" 2.JS中没有类(Class),但是它取了 ...

  7. python3.4学习笔记(九) Python GUI桌面应用开发工具选择

    python3.4学习笔记(九) Python GUI桌面应用开发工具选择 Python GUI开发工具选择 - WEB开发者 http://www.admin10000.com/document/9 ...

  8. PyTorch框架学习十九——模型加载与保存

    PyTorch框架学习十九--模型加载与保存 一.序列化与反序列化 二.PyTorch中的序列化与反序列化 1.torch.save 2.torch.load 三.模型的保存 1.方法一:保存整个Mo ...

  9. 吴恩达《机器学习》学习笔记九——神经网络相关(1)

    吴恩达<机器学习>学习笔记九--神经网络相关(1) 一. 非线性假设的问题 二. 神经网络相关知识 1.神经网络的大致历史 2.神经网络的表示 3.前向传播:向量化表示 三. 例子与直觉理 ...

最新文章

  1. Java基础super关键字、final关键字、static关键字、匿名对象整理
  2. 阿里云Ubuntu 14.04 + Nginx + let's encrypt 搭建https访问
  3. vs2015 单元测试 linux,VS2015做单元测试
  4. jwt php tp5,TP5框架中使用JWT的方法示例
  5. Oracle中判断字段是否为数字
  6. H3C DHCP实验
  7. nginx、tomcat负载均衡
  8. 觉得清楚,跟说清楚写清楚,两回事
  9. 全国各地车牌号码查询表
  10. PDF与Base64的相互转换以及操作
  11. scipy库中的stats模块
  12. lqr算法 c语言,【CS229 lecture18】linear quadratic regulation(LQR) 線性二次型調節控制
  13. oracle数据文件头损坏6,恢复数据库时遇到数据文件头损坏 | 信春哥,系统稳,闭眼上线不回滚!...
  14. 蓝色简约的工业大学学校网站静态模板
  15. linux 命令杂集
  16. Python图形界面实现咖啡店点单系统
  17. openerp mysql_openerp 经典收藏 Openerp开发进销存系统完毕总结(转载)
  18. 编写仿supersu的权限管理工具(aosp11 root、实现aosp系统内置wifi、root管理apk)
  19. Mac 安装element-ui
  20. html个人中心网页,个人中心页面.html

热门文章

  1. 数字加千分位分隔符,加货币符号,数字转百分数
  2. 带目录计算机专业论文,计算机专业论文格式及目录系统
  3. 【竞争】SAP副总裁九华山庄的发言和真相报道(转:网易财经)
  4. OpenResty学习——第七章 Web开发实战2——商品详情页
  5. 涂鸦 opengl简单应用1
  6. 3D成像方法 汇总(原理解析)— 双目视觉、激光三角、结构光、ToF、光场、全息...
  7. Python 小写数字转为大写
  8. 甘超波:NLP检定语言模式
  9. Win2000请求拨号路由服务详解
  10. 阿里巴巴云原生网关三位一体的选择与实践