自顶向下学习 RocketMQ(九):回溯消费
定义
回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。
Demo
我们仍然是利用 Spring Cloud Stream 的编程模型 + Spring Cloud Alibaba RocketMQ 来实现。
理论
在消费时,可以设置一个字段 ConsumeFromWhere(源码位置在:org.apache.rocketmq.common.consumer.ConsumeFromWhere),从哪开始消费。可选参数,去掉 Deprecated 的,剩下的就是
public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET,CONSUME_FROM_FIRST_OFFSET,CONSUME_FROM_TIMESTAMP,
}
- CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费
- CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费
- CONSUME_FROM_TIMESTAMP:从某个时间开始消费
我们需要设置从某个时间开始消费,即配置 CONSUME_FROM_TIMESTAMP
并设置好具体的时间点。
实现
首先还是看一下配置文件
server:port: 8080servlet:context-path: /mq-examplespring:application:name: mq-examplecloud:stream:bindings:input-backtracking:content-type: application/jsondestination: test-topic3group: backtracking-consumer-group# 定义 name 为 output 的 binding 生产output-order:content-type: application/jsondestination: test-topic3rocketmq:# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类binder:# 配置 rocketmq 的 nameserver 地址name-server: 127.0.0.1:9876group: rocketmq-groupbindings:output-order:# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类producer:#group: producer-group # 生产者分组sync: true # 是否同步发送消息,默认为 false 异步。input-backtracking: # 回溯消息配置# com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPropertiesconsumer:consumeFromWhere: CONSUME_FROM_TIMESTAMPconsumeTimestamp: 20220117110148enabled: true # 是否开启消费,默认为 truebroadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
这里我们仍然用之前的 ouput-order 作为生产者,生产消息。
消息者配置上主要注意 input-backtracking
节点中的属性配置:
- consumeFromWhere 即上文提到的从哪儿开始消费,这里我们指定时间消费
- consumeTimestamp 即指定的时间点
程序入口:
@SpringBootApplication
@EnableBinding({ MySource.class})
public class MqBootstrapApplication {public static void main(String[] args) {SpringApplication.run(MqBootstrapApplication.class);}}
要加上 @EnableBinding
MySource:
public interface MySource {@Output("output-order")MessageChannel output4Order();@Input("input-backtracking")MessageChannel inputBackTracking();}
controller 生产消息:
@GetMapping("/produce")public void produceMsg() {Map<String, Object> headers = Maps.newHashMapWithExpectedSize(16);headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);headers.put(MessageConst.PROPERTY_TAGS, "test03");Order order = Order.builder().id(1L).desc("test").build();Message message = MessageBuilder.createMessage(order, new MessageHeaders(headers));mySource.output4Order().send(message);}
ReceiveService 消费消息:
@Service
@Log4j2
public class ReceiveService {@StreamListener("input-backtracking")public void receiveBackTrackingInput(String receiveMsg, GenericMessage message, @Headers Map headers) {log.info("接收到回溯消息:{}", receiveMsg);}}
测试
可以先调用 controller 生产消息,或者不用 Demo 中的生产者生产消息,找一个之前发过消息的 topic , 看一下它的消息轨迹,找到存储时间
如果你用之前发过消息的 topic 记得修改配置文件中的 topic名称 :
确认找到的这条消息已经被消费过(因为要测回溯,至少是二次消费),将 consumeTimestamp
的时间配置在 存储时间之后。
这时启动项目,观察 ReceiveService
的输出:
接收到回溯消息:{"id":1,"desc":"test"}
证明消息回溯消费成功。
参考
- https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new.adoc
- https://www.niewenjun.com/2020/05/09/fen-bu-shi/rocketmq/#toc-heading-29
自顶向下学习 RocketMQ(九):回溯消费相关推荐
- 源码分析RocketMQ顺序消息消费实现原理
本节目录 1.消息队列负载 2.消息拉取 3.消息顺序消息消费 3.1核心属性与构造函数 3.2 start方法 3.3 submitConsumeRequest 3.4 ConsumeMessage ...
- 《RocketMQ实战专栏》为什么是你学习RocketMQ的最佳资料
<RocketMQ实战与原理>专栏简介 简介 RocketMQ业界主流的消息中间件之一,承载公司核心业务消息的流转.对RocketMQ核心原理的理解与最佳实践成了开发与运维同学的必备技能. ...
- Maven学习总结(九)——使用Nexus搭建Maven私服
2019独角兽企业重金招聘Python工程师标准>>> Maven学习总结(九)--使用Nexus搭建Maven私服 一.搭建nexus私服的目的 为什么要搭建nexus私服,原因很 ...
- IOS学习笔记(九)之UIAlertView(警告视图)和UIActionSheet(操作表视图)基本概念和使用方法...
IOS学习笔记(九)之UIAlertView(警告视图)和UIActionSheet(操作表视图)基本概念和使用方法 Author:hmjiangqq Email:jiangqqlmj@163.com ...
- 从零开始学习jQuery (九) jQuery工具函数
本系列文章导航 从零开始学习jQuery (一) 开天辟地入门篇 从零开始学习jQuery (二) 万能的选择器 从零开始学习jQuery (三) 管理jQuery包装集 从零开始学习jQuery ( ...
- JavaScript学习总结(九)——Javascript面向(基于)对象编程
转载自 JavaScript学习总结(九)--Javascript面向(基于)对象编程 一.澄清概念 1.JS中"基于对象=面向对象" 2.JS中没有类(Class),但是它取了 ...
- python3.4学习笔记(九) Python GUI桌面应用开发工具选择
python3.4学习笔记(九) Python GUI桌面应用开发工具选择 Python GUI开发工具选择 - WEB开发者 http://www.admin10000.com/document/9 ...
- PyTorch框架学习十九——模型加载与保存
PyTorch框架学习十九--模型加载与保存 一.序列化与反序列化 二.PyTorch中的序列化与反序列化 1.torch.save 2.torch.load 三.模型的保存 1.方法一:保存整个Mo ...
- 吴恩达《机器学习》学习笔记九——神经网络相关(1)
吴恩达<机器学习>学习笔记九--神经网络相关(1) 一. 非线性假设的问题 二. 神经网络相关知识 1.神经网络的大致历史 2.神经网络的表示 3.前向传播:向量化表示 三. 例子与直觉理 ...
最新文章
- Java基础super关键字、final关键字、static关键字、匿名对象整理
- 阿里云Ubuntu 14.04 + Nginx + let's encrypt 搭建https访问
- vs2015 单元测试 linux,VS2015做单元测试
- jwt php tp5,TP5框架中使用JWT的方法示例
- Oracle中判断字段是否为数字
- H3C DHCP实验
- nginx、tomcat负载均衡
- 觉得清楚,跟说清楚写清楚,两回事
- 全国各地车牌号码查询表
- PDF与Base64的相互转换以及操作
- scipy库中的stats模块
- lqr算法 c语言,【CS229 lecture18】linear quadratic regulation(LQR) 線性二次型調節控制
- oracle数据文件头损坏6,恢复数据库时遇到数据文件头损坏 | 信春哥,系统稳,闭眼上线不回滚!...
- 蓝色简约的工业大学学校网站静态模板
- linux 命令杂集
- Python图形界面实现咖啡店点单系统
- openerp mysql_openerp 经典收藏 Openerp开发进销存系统完毕总结(转载)
- 编写仿supersu的权限管理工具(aosp11 root、实现aosp系统内置wifi、root管理apk)
- Mac 安装element-ui
- html个人中心网页,个人中心页面.html