本文主要研究一下rocketmq的ConsumerManageProcessor

NettyRequestProcessor

rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java

public interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();
}
  • NettyRequestProcessor接口定义了processRequest、rejectRequest方法

ConsumerManageProcessor

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);private final BrokerController brokerController;public ConsumerManageProcessor(final BrokerController brokerController) {this.brokerController = brokerController;}@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.GET_CONSUMER_LIST_BY_GROUP:return this.getConsumerListByGroup(ctx, request);case RequestCode.UPDATE_CONSUMER_OFFSET:return this.updateConsumerOffset(ctx, request);case RequestCode.QUERY_CONSUMER_OFFSET:return this.queryConsumerOffset(ctx, request);default:break;}return null;}@Overridepublic boolean rejectRequest() {return false;}//......
}
  • ConsumerManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET或者RequestCode.QUERY_CONSUMER_OFFSET的request;其中针对RequestCode.GET_CONSUMER_LIST_BY_GROUP执行getConsumerListByGroup方法,针对RequestCode.UPDATE_CONSUMER_OFFSET执行updateConsumerOffset方法,针对RequestCode.QUERY_CONSUMER_OFFSET执行queryConsumerOffset方法;其rejectRequest返回false

getConsumerListByGroup

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {//......public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);final GetConsumerListByGroupRequestHeader requestHeader =(GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (consumerGroupInfo != null) {List<String> clientIds = consumerGroupInfo.getAllClientId();if (!clientIds.isEmpty()) {GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();body.setConsumerIdList(clientIds);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;} else {log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}} else {log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());return response;}//......
}
  • getConsumerListByGroup方法通过brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup())获取consumerGroupInfo

updateConsumerOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {//......private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);final UpdateConsumerOffsetRequestHeader requestHeader =(UpdateConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}//......
}
  • updateConsumerOffset方法主要是执行brokerController.getConsumerOffsetManager().commitOffset

queryConsumerOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java

public class ConsumerManageProcessor implements NettyRequestProcessor {//......private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (minOffset <= 0&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response;}//......
}
  • queryConsumerOffset方法主要是通过brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId())获取指定consumerGroup、topic及queueId的offset

小结

ConsumerManageProcessor实现了NettyRequestProcessor接口,其processRequest方法只处理code为RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET或者RequestCode.QUERY_CONSUMER_OFFSET的request;其中针对RequestCode.GET_CONSUMER_LIST_BY_GROUP执行getConsumerListByGroup方法,针对RequestCode.UPDATE_CONSUMER_OFFSET执行updateConsumerOffset方法,针对RequestCode.QUERY_CONSUMER_OFFSET执行queryConsumerOffset方法;其rejectRequest返回false

doc

  • ConsumerManageProcessor

聊聊rocketmq的ConsumerManageProcessor相关推荐

  1. 聊聊rocketmq的ProducerImpl

    序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java pu ...

  2. 聊聊rocketmq的RemotingException

    序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/Remoti ...

  3. 聊聊rocketmq的BrokerHousekeepingService

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下rocketmq的BrokerHousekeepingService BrokerHousekeepingServ ...

  4. 与顶级互联网公司技术大佬面对面聊聊RocketMQ

    作为由阿里巴巴捐赠的Apache顶级云原生消息中间件,RocketMQ 立足于在线交易链路,帮助企业实现异步解耦和削峰填谷以及 IoT 边缘数据以及 C 端用户行为数据采集传输和集成等众多功能.我们可 ...

  5. 聊聊rocketmq的ConsumerIdsChangeListener

    序 本文主要研究一下rocketmq的ConsumerIdsChangeListener ConsumerGroupEvent rocketmq-all-4.6.0-source-release/br ...

  6. 聊聊rocketmq的FileAppender

    序 本文主要研究一下rocketmq的FileAppender WriterAppender org/apache/rocketmq/logging/inner/LoggingBuilder.java ...

  7. 真香,聊聊 RocketMQ 5.0 的 POP 消费模式!

    大家好,我是君哥. 大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式. 不过,RocketMQ 的 ...

  8. 深入源码聊聊RocketMQ刷盘机制

    大家好,我是Leo. 今天聊一下RocketMQ的三种刷盘机制. 同步刷盘 异步刷盘(RocketMQ默认) 异步刷盘+缓冲区 出自微信公众号[欢少的成长之路] 本章概括 同步刷盘 整个同步刷盘策略由 ...

  9. 面试官:哥们,你们的系统架构中为什么要引入消息中间件?

    点击上方"蓝字", 右上角选择"设为星标" 周一至五早11点半!精品文章准时送上! 本文来自石杉的架构笔记 这篇文章开始,我们把消息中间件这块高频的面试题给大家 ...

最新文章

  1. 全球通信云市场爆发增长,RTC 技术普惠还有多远
  2. centos 如何登陆mysql_[CentOS 0010] CentOS 配置mysql允许远程登录
  3. Action framework - Table PPFTTRIGG
  4. vonic 环境配置_Vonic 2.0 全新文档站上线
  5. 【很好的分享】zookeeper系列
  6. [2018.10.10 T1] 餐馆
  7. usb, micro-usb card 损坏, 数据恢复
  8. 一些真正免费的API接口
  9. 数据可视化看板怎么搭建,这样做小白能看懂
  10. 【旧文章搬运】暴力的查进程方法
  11. 《生活中的魔法数学》读后感
  12. Page migration
  13. SAP APO 取订单函数(取计划订单数据一)
  14. TapTap 算法平台的 Serverless 探索之路
  15. vue开发微信公众号订阅消息踩坑记录
  16. CGTrader新赛CG Wildlife Challenge(CG野生生物竞赛)重磅推出
  17. OpenGL(QT平台)学习与实战(一)
  18. 线性代数 --- 线性代数基本定理下(四个基本子空间两两正交且互为正交补)
  19. Kubernetes PV和PVC 常见问题
  20. DBA运维工具-OLazy

热门文章

  1. RSS Feed Generator for PHP (兼有podcast rss - iTunes )
  2. redis——redis事务相关处理
  3. delare和typeset
  4. Linux kernel 4.x中的min和max宏
  5. LeetCode 374. Guess Number Higher or Lower
  6. JupyterNotebook配置远程登录
  7. Ubuntu 安装 Android-Studio
  8. requests-发送post请求
  9. 在细节消息中包含能够捕获失败的信息(63)
  10. 建站利器 | 阿里巴巴上线静态开源站点搭建工具 Docsite