PullConsumer取消息需要自己手动调用Consumer的pull方法主动拉取消息。需要的参数有具体的消息队列(调用消费者的fetchSubscibeMessageQueue()可以得到相应topic的所欲消息队列),需要过滤用的tag(可以为空),以及消费队列所在的进度,以及这次取消息的最大量。

调用了DefaultMQPullConsumer的Pull()方法之后会直接调用DefaultMQPullConsumerImpl的pull()方法。最后会调用DefaultMQPullConsumerImpl的pullSyncImpl()来主动拉消息(同步方式)。

private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums,boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException {this.makeSureStateOK();if (null == mq) {throw new MQClientException("mq is null", null);}if (offset < 0) {throw new MQClientException("offset < 0", null);}if (maxNums <= 0) {throw new MQClientException("maxNums <= 0", null);}this.subscriptionAutomatically(mq.getTopic());int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);SubscriptionData subscriptionData;try {subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//mq.getTopic(), subExpression);} catch (Exception e) {throw new MQClientException("parse subscription error", e);}long timeoutMillis =block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//mq, // 1subscriptionData.getSubString(), // 20L, // 3offset, // 4maxNums, // 5sysFlag, // 60, // 7this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8timeoutMillis, // 9CommunicationMode.SYNC, // 10null// 11);return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
}

这里可以看到,除了tag之外其他所有参数都是必不可少的,在检查完所有的参数之后,会调用subscriptionAutomatically()方法来检查传入的消息队列的topic是否有相应的订阅topic数据存在。如果不存在,则会尝试重新建立新的topic订阅数据。

private void subscriptionAutomatically(final String topic) {if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {try {SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//topic, SubscriptionData.SUB_ALL);this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);} catch (Exception e) {}}
}

接下来构造这次拉消息操作的标志量sysFlag,并且根据消费者组名,topic,subExpression来构造这次的SubscriptionData。在确认了本次拉取消息的timeout之后,将会调用pullAPIWrapper的pullKerenImpl()来拉取消息。

pullAPIWrapper的pullKerenImpl()来拉取消息。
public PullResult pullKernelImpl(//final MessageQueue mq,// 1final String subExpression,// 2final long subVersion,// 3final long offset,// 4final int maxNums,// 5final int sysFlag,// 6final long commitOffset,// 7final long brokerSuspendMaxTimeMillis,// 8final long timeoutMillis,// 9final CommunicationMode communicationMode,// 10final PullCallback pullCallback// 11
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(//brokerAddr,//requestHeader,//timeoutMillis,//communicationMode,//pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

在尝试向broker拉取消息的第一步,首选in当然是要获得向哪个broker发送消息的地址,通过消息队列所对应的brokerName,以及brokerId通过客户端的findBrokerAddressInSubscribe()来获取相应的broker的地址。Broker地址只是简单的从客户端所保存的broker地址表中获取相应的broker地址。如果没有找到,则会尝试从地址服务器更新相应topic的地址数据,再尝试找一次。

在根据之前的数据封装完这次请求的requestHeader之后,将会调用MqClientAPIImpl的 pullMessage()方法将这个请求通过netty客户端发送给broker。

public PullResult pullMessage(//final String addr,//final PullMessageRequestHeader requestHeader,//final long timeoutMillis,//final CommunicationMode communicationMode,//final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC:this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC:return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break;}return null;
}

这里由于是同步的方式,这里会选择调用相应的方式。

private PullResult pullMessageSync(//final String addr,// 1final RemotingCommand request,// 2final long timeoutMillis// 3
) throws RemotingException, InterruptedException, MQBrokerException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processPullResponse(response);
}

这里在同步方式下,会等待netty发送请求后,同步等待结果返回,并通过processPullResponse()对返回的结果进行处理。

private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark());}PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}

在这里会根据broker返回结果头所包含的信息,以及结果返回码生成结果的抽象类PullResultExt,PullResultExt继承自PullRequest。

private final PullStatus pullStatus;
private final long nextBeginOffset;
private final long minOffset;
private final long maxOffset;
private List<MessageExt> msgFoundList;

以上是PullRequest的成员组成,可以见得,在从broker中返回的结果中,下一次消费的起始偏移量,最大最小偏移量都存在这里,用以对确定下一次从Broker拉取消息时的开始位置。

private final long suggestWhichBrokerId;
private byte[] messageBinary;

而具体的消息信息,则保存在PullResultExt中。

在生成了PullResultExt之后,最后回到了DefaultMQPullConsumerImpl的pullSyncImpl()上,在获取了DefaultMQClient将从Broker那里得到的返回的结果之后包装生成的PullRequestExt之后,调用processPullRequest()方法对拉回来的结果进行处理。

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());if (PullStatus.FOUND == pullResult.getPullStatus()) {ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}pullResultExt.setMsgFoundList(msgListFilterAgain);}pullResultExt.setMessageBinary(null);return pullResult;
}

在这个方法中,对消息的具体字节流进行了反序列化,并且根据订阅的tag在这里完成对于消息的过滤。

RocketMQ源码解析-PullConsumer取消息(1)相关推荐

  1. RocketMQ源码解析-PullConsumer取消息(2)

    如果在调用DefaultMQPullConsumer的pull方法的时候添加了pullcallback参数,那么就会调用DefaultMQPullConsumerImpl的pullAsyncImpl( ...

  2. RocketMQ源码解析之延迟消息实现原理

    原创不易,转载请注明出处 文章目录 前言 1.延时消息的demo 2.实现的原理 前言 今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场 ...

  3. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  4. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  5. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  6. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  7. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  8. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  9. 消息中间件RocketMQ源码解析-- --调试环境搭建

    1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...

最新文章

  1. C/C++各种数据类型转换汇总
  2. vim 寄存器,剪切板
  3. java解码base64的png图片_使用PHP对图片进行base64编码和解码(png、jpg,声音、视频)...
  4. 若依前后端分离版手把手教你本地搭建环境并运行项目
  5. 【面试招聘】算法岗通关宝典 | 社招一年经验,字节5轮、阿里7轮
  6. 开放下载!《阿里云存储白皮书》全面解读阿里云存储二十年的技术演进
  7. matlab rem和mod,Matlab的mod和rem
  8. 动态规划 —— 背包问题 P02 —— 完全背包
  9. kafka经典面试题
  10. java 扫描jar包_java 扫描指定包(包括jar包)
  11. phpstorm ctrl 鼠标左键 无效_击败无聊的办公室重复操作,用 Python 控制鼠标和键盘...
  12. Android开发 入门篇(一)
  13. 如何简单快速的写出幸运转盘抽奖
  14. 怎样将图片转换成word文字
  15. Origin画法——简单的分布图画法
  16. php网络图片拼接,图片处理-PHP图片拼接如何高效的实现
  17. 小白学 Python 数据分析(17):Matplotlib(二)基础操作
  18. 分类器评估指标——混淆矩阵 ROC AUC KS AR PSI Lift Gain
  19. NIUSHOP wap端广告页面设置
  20. mac怎么查node版本_Mac更新node版本和npm版本

热门文章

  1. C++之使用IO库输入输出
  2. 代码里配置java代理
  3. springboot整合redis,推荐整合和使用案例(2021版)
  4. oracle 052 题库更新,OCP题库升级,新版052考试题及答案整理-18
  5. CAD编辑工具中如何设置图纸的比例
  6. Shell命令-系统信息及显示之dmesg、uptime
  7. memcache服务应用实践
  8. 权限修饰符(public、protected、default、private)权限验证
  9. App-V轻量级应用程序虚拟化之三客户端测试
  10. 阿里云再降价 数据库产品降20%