RocketMQ源码解析-PullConsumer取消息(1)
PullConsumer取消息需要自己手动调用Consumer的pull方法主动拉取消息。需要的参数有具体的消息队列(调用消费者的fetchSubscibeMessageQueue()可以得到相应topic的所欲消息队列),需要过滤用的tag(可以为空),以及消费队列所在的进度,以及这次取消息的最大量。
调用了DefaultMQPullConsumer的Pull()方法之后会直接调用DefaultMQPullConsumerImpl的pull()方法。最后会调用DefaultMQPullConsumerImpl的pullSyncImpl()来主动拉消息(同步方式)。
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums,boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException {this.makeSureStateOK();if (null == mq) {throw new MQClientException("mq is null", null);}if (offset < 0) {throw new MQClientException("offset < 0", null);}if (maxNums <= 0) {throw new MQClientException("maxNums <= 0", null);}this.subscriptionAutomatically(mq.getTopic());int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);SubscriptionData subscriptionData;try {subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//mq.getTopic(), subExpression);} catch (Exception e) {throw new MQClientException("parse subscription error", e);}long timeoutMillis =block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//mq, // 1subscriptionData.getSubString(), // 20L, // 3offset, // 4maxNums, // 5sysFlag, // 60, // 7this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8timeoutMillis, // 9CommunicationMode.SYNC, // 10null// 11);return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
}
这里可以看到,除了tag之外其他所有参数都是必不可少的,在检查完所有的参数之后,会调用subscriptionAutomatically()方法来检查传入的消息队列的topic是否有相应的订阅topic数据存在。如果不存在,则会尝试重新建立新的topic订阅数据。
private void subscriptionAutomatically(final String topic) {if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {try {SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//topic, SubscriptionData.SUB_ALL);this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);} catch (Exception e) {}}
}
接下来构造这次拉消息操作的标志量sysFlag,并且根据消费者组名,topic,subExpression来构造这次的SubscriptionData。在确认了本次拉取消息的timeout之后,将会调用pullAPIWrapper的pullKerenImpl()来拉取消息。
pullAPIWrapper的pullKerenImpl()来拉取消息。
public PullResult pullKernelImpl(//final MessageQueue mq,// 1final String subExpression,// 2final long subVersion,// 3final long offset,// 4final int maxNums,// 5final int sysFlag,// 6final long commitOffset,// 7final long brokerSuspendMaxTimeMillis,// 8final long timeoutMillis,// 9final CommunicationMode communicationMode,// 10final PullCallback pullCallback// 11
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(//brokerAddr,//requestHeader,//timeoutMillis,//communicationMode,//pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
在尝试向broker拉取消息的第一步,首选in当然是要获得向哪个broker发送消息的地址,通过消息队列所对应的brokerName,以及brokerId通过客户端的findBrokerAddressInSubscribe()来获取相应的broker的地址。Broker地址只是简单的从客户端所保存的broker地址表中获取相应的broker地址。如果没有找到,则会尝试从地址服务器更新相应topic的地址数据,再尝试找一次。
在根据之前的数据封装完这次请求的requestHeader之后,将会调用MqClientAPIImpl的 pullMessage()方法将这个请求通过netty客户端发送给broker。
public PullResult pullMessage(//final String addr,//final PullMessageRequestHeader requestHeader,//final long timeoutMillis,//final CommunicationMode communicationMode,//final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);switch (communicationMode) {case ONEWAY:assert false;return null;case ASYNC:this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);return null;case SYNC:return this.pullMessageSync(addr, request, timeoutMillis);default:assert false;break;}return null;
}
这里由于是同步的方式,这里会选择调用相应的方式。
private PullResult pullMessageSync(//final String addr,// 1final RemotingCommand request,// 2final long timeoutMillis// 3
) throws RemotingException, InterruptedException, MQBrokerException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processPullResponse(response);
}
这里在同步方式下,会等待netty发送请求后,同步等待结果返回,并通过processPullResponse()对返回的结果进行处理。
private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {PullStatus pullStatus = PullStatus.NO_NEW_MSG;switch (response.getCode()) {case ResponseCode.SUCCESS:pullStatus = PullStatus.FOUND;break;case ResponseCode.PULL_NOT_FOUND:pullStatus = PullStatus.NO_NEW_MSG;break;case ResponseCode.PULL_RETRY_IMMEDIATELY:pullStatus = PullStatus.NO_MATCHED_MSG;break;case ResponseCode.PULL_OFFSET_MOVED:pullStatus = PullStatus.OFFSET_ILLEGAL;break;default:throw new MQBrokerException(response.getCode(), response.getRemark());}PullMessageResponseHeader responseHeader =(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
在这里会根据broker返回结果头所包含的信息,以及结果返回码生成结果的抽象类PullResultExt,PullResultExt继承自PullRequest。
private final PullStatus pullStatus;
private final long nextBeginOffset;
private final long minOffset;
private final long maxOffset;
private List<MessageExt> msgFoundList;
以上是PullRequest的成员组成,可以见得,在从broker中返回的结果中,下一次消费的起始偏移量,最大最小偏移量都存在这里,用以对确定下一次从Broker拉取消息时的开始位置。
private final long suggestWhichBrokerId;
private byte[] messageBinary;
而具体的消息信息,则保存在PullResultExt中。
在生成了PullResultExt之后,最后回到了DefaultMQPullConsumerImpl的pullSyncImpl()上,在获取了DefaultMQClient将从Broker那里得到的返回的结果之后包装生成的PullRequestExt之后,调用processPullRequest()方法对拉回来的结果进行处理。
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());if (PullStatus.FOUND == pullResult.getPullStatus()) {ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);List<MessageExt> msgListFilterAgain = msgList;if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);}}}}if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();filterMessageContext.setUnitMode(unitMode);filterMessageContext.setMsgList(msgListFilterAgain);this.executeHook(filterMessageContext);}for (MessageExt msg : msgListFilterAgain) {MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));}pullResultExt.setMsgFoundList(msgListFilterAgain);}pullResultExt.setMessageBinary(null);return pullResult;
}
在这个方法中,对消息的具体字节流进行了反序列化,并且根据订阅的tag在这里完成对于消息的过滤。
RocketMQ源码解析-PullConsumer取消息(1)相关推荐
- RocketMQ源码解析-PullConsumer取消息(2)
如果在调用DefaultMQPullConsumer的pull方法的时候添加了pullcallback参数,那么就会调用DefaultMQPullConsumerImpl的pullAsyncImpl( ...
- RocketMQ源码解析之延迟消息实现原理
原创不易,转载请注明出处 文章目录 前言 1.延时消息的demo 2.实现的原理 前言 今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场 ...
- RocketMQ源码解析之消息消费者(consume Message)
原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- RocketMQ源码分析之延迟消息
文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...
- RocketMQ源码解析之broker文件清理
原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...
- RocketMQ源码解析:Filtersrv
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
- 消息中间件RocketMQ源码解析-- --调试环境搭建
1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...
最新文章
- C/C++各种数据类型转换汇总
- vim 寄存器,剪切板
- java解码base64的png图片_使用PHP对图片进行base64编码和解码(png、jpg,声音、视频)...
- 若依前后端分离版手把手教你本地搭建环境并运行项目
- 【面试招聘】算法岗通关宝典 | 社招一年经验,字节5轮、阿里7轮
- 开放下载!《阿里云存储白皮书》全面解读阿里云存储二十年的技术演进
- matlab rem和mod,Matlab的mod和rem
- 动态规划 —— 背包问题 P02 —— 完全背包
- kafka经典面试题
- java 扫描jar包_java 扫描指定包(包括jar包)
- phpstorm ctrl 鼠标左键 无效_击败无聊的办公室重复操作,用 Python 控制鼠标和键盘...
- Android开发 入门篇(一)
- 如何简单快速的写出幸运转盘抽奖
- 怎样将图片转换成word文字
- Origin画法——简单的分布图画法
- php网络图片拼接,图片处理-PHP图片拼接如何高效的实现
- 小白学 Python 数据分析(17):Matplotlib(二)基础操作
- 分类器评估指标——混淆矩阵 ROC AUC KS AR PSI Lift Gain
- NIUSHOP wap端广告页面设置
- mac怎么查node版本_Mac更新node版本和npm版本