聊聊rocketmq的ConsumerManageProcessor
序
本文主要研究一下rocketmq的ConsumerManageProcessor
NettyRequestProcessor
rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
public interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();
}
- NettyRequestProcessor接口定义了processRequest、rejectRequest方法
ConsumerManageProcessor
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private final BrokerController brokerController;public ConsumerManageProcessor(final BrokerController brokerController) {this.brokerController = brokerController;}@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.GET_CONSUMER_LIST_BY_GROUP:return this.getConsumerListByGroup(ctx, request);case RequestCode.UPDATE_CONSUMER_OFFSET:return this.updateConsumerOffset(ctx, request);case RequestCode.QUERY_CONSUMER_OFFSET:return this.queryConsumerOffset(ctx, request);default:break;}return null;}@Overridepublic boolean rejectRequest() {return false;}//......
}
- ConsumerManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET或者RequestCode.QUERY_CONSUMER_OFFSET的request;其中针对RequestCode.GET_CONSUMER_LIST_BY_GROUP执行getConsumerListByGroup方法,针对RequestCode.UPDATE_CONSUMER_OFFSET执行updateConsumerOffset方法,针对RequestCode.QUERY_CONSUMER_OFFSET执行queryConsumerOffset方法;其rejectRequest返回false
getConsumerListByGroup
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor {//......public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);final GetConsumerListByGroupRequestHeader requestHeader =(GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (consumerGroupInfo != null) {List<String> clientIds = consumerGroupInfo.getAllClientId();if (!clientIds.isEmpty()) {GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();body.setConsumerIdList(clientIds);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;} else {log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}} else {log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());return response;}//......
}
- getConsumerListByGroup方法通过brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup())获取consumerGroupInfo
updateConsumerOffset
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor {//......private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);final UpdateConsumerOffsetRequestHeader requestHeader =(UpdateConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}//......
}
- updateConsumerOffset方法主要是执行brokerController.getConsumerOffsetManager().commitOffset
queryConsumerOffset
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
public class ConsumerManageProcessor implements NettyRequestProcessor {//......private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (minOffset <= 0&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response;}//......
}
- queryConsumerOffset方法主要是通过brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId())获取指定consumerGroup、topic及queueId的offset
小结
ConsumerManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET或者RequestCode.QUERY_CONSUMER_OFFSET的request;其中针对RequestCode.GET_CONSUMER_LIST_BY_GROUP执行getConsumerListByGroup方法,针对RequestCode.UPDATE_CONSUMER_OFFSET执行updateConsumerOffset方法,针对RequestCode.QUERY_CONSUMER_OFFSET执行queryConsumerOffset方法;其rejectRequest返回false
doc
- ConsumerManageProcessor
聊聊rocketmq的ConsumerManageProcessor相关推荐
- 聊聊rocketmq的ProducerImpl
序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java pu ...
- 聊聊rocketmq的RemotingException
序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/Remoti ...
- 聊聊rocketmq的BrokerHousekeepingService
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下rocketmq的BrokerHousekeepingService BrokerHousekeepingServ ...
- 与顶级互联网公司技术大佬面对面聊聊RocketMQ
作为由阿里巴巴捐赠的Apache顶级云原生消息中间件,RocketMQ 立足于在线交易链路,帮助企业实现异步解耦和削峰填谷以及 IoT 边缘数据以及 C 端用户行为数据采集传输和集成等众多功能.我们可 ...
- 聊聊rocketmq的ConsumerIdsChangeListener
序 本文主要研究一下rocketmq的ConsumerIdsChangeListener ConsumerGroupEvent rocketmq-all-4.6.0-source-release/br ...
- 聊聊rocketmq的FileAppender
序 本文主要研究一下rocketmq的FileAppender WriterAppender org/apache/rocketmq/logging/inner/LoggingBuilder.java ...
- 真香,聊聊 RocketMQ 5.0 的 POP 消费模式!
大家好,我是君哥. 大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式. 不过,RocketMQ 的 ...
- 深入源码聊聊RocketMQ刷盘机制
大家好,我是Leo. 今天聊一下RocketMQ的三种刷盘机制. 同步刷盘 异步刷盘(RocketMQ默认) 异步刷盘+缓冲区 出自微信公众号[欢少的成长之路] 本章概括 同步刷盘 整个同步刷盘策略由 ...
- 面试官:哥们,你们的系统架构中为什么要引入消息中间件?
点击上方"蓝字", 右上角选择"设为星标" 周一至五早11点半!精品文章准时送上! 本文来自石杉的架构笔记 这篇文章开始,我们把消息中间件这块高频的面试题给大家 ...
最新文章
- 全球通信云市场爆发增长,RTC 技术普惠还有多远
- centos 如何登陆mysql_[CentOS 0010] CentOS 配置mysql允许远程登录
- Action framework - Table PPFTTRIGG
- vonic 环境配置_Vonic 2.0 全新文档站上线
- 【很好的分享】zookeeper系列
- [2018.10.10 T1] 餐馆
- usb, micro-usb card 损坏, 数据恢复
- 一些真正免费的API接口
- 数据可视化看板怎么搭建,这样做小白能看懂
- 【旧文章搬运】暴力的查进程方法
- 《生活中的魔法数学》读后感
- Page migration
- SAP APO 取订单函数(取计划订单数据一)
- TapTap 算法平台的 Serverless 探索之路
- vue开发微信公众号订阅消息踩坑记录
- CGTrader新赛CG Wildlife Challenge(CG野生生物竞赛)重磅推出
- OpenGL(QT平台)学习与实战(一)
- 线性代数 --- 线性代数基本定理下(四个基本子空间两两正交且互为正交补)
- Kubernetes PV和PVC 常见问题
- DBA运维工具-OLazy