RocketMQ源码学习
RocketMQ源码学习
文章目录
- RocketMQ源码学习
- Producer 是怎么将消息发送至 Broker 的?
- 同步发送
- 异步发送
- 队列选择器
- 事务消息
- 原理
- Broker 是怎么处理客户端发送的消息?
- NettyRequestProcessor
- 发送消息
- 事务消息
- 处理发送消息请求
- 处理发送结束事务请求
- 定时任务回查逻辑
- Broker端
- 客户端
- 客户端是怎样与服务端交互的?
- 客户端怎么获取 Broker 信息?
- 客户端
- NameServer端
- NameServer 是怎么管理 Broker 信息的?
- 客户端怎么获取队列信息?
- Broker 是怎么存储消息的?
- RocketMQ 的消息存储架构是怎样的?
Producer 是怎么将消息发送至 Broker 的?
同步发送
RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?
异步发送
DefaultMQProducer#send(Message, SendCallback)
DefaultMQProducerImpl#send(Message, SendCallback)
DefaultMQProducerImpl#send(Message, SendCallback, long)
DefaultMQProducerImpl#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)
DefaultMQProducerImpl#sendKernelImpl(Message, MessageQueue, CommuicationMode, SendCallback, TopicPublishInfo, long)
MQClientAPIImpl#sendMessage(String, String, Message, SendMessageRequestHeader, long, CommunicationMode, SendCallback, TopicPublishInfo, MQClientInstance, int, SendMessageContext, DefaultMQProducerImpl)
MQClientAPIImpl#sendMessageAsync(String, String, Message, long, RemotingCommand, SendCallback, TopicPublishInfo, MQClientInstance, int, AtomicInteger, SendMessageContext, DefaultMQProducerImpl)
NettyRemotingClient#invokeAsync(String, RemotingCommand, long, InvokeCallback)
NettyRemotingAbstract#invokeAsyncImpl(Channel, RemotingCommand, long, InvokeCallback)
this.responseTable.put(opaque, responseFuture);
- 将
responseFuture
放入map
中,等待处理(在netty
接收中服务端返回的响应) - 在该方法中处理:
NettyRemotingAbstract#processResponseCommand
队列选择器
DefaultMQProducer#send(Message, MessageQueueSelector, Object, SendCallback)
DefaultMQProducerImpl#send(Message, MessageQueueSelector, Object, SendCallback)
DefaultMQProducerImpl#send(Message, MessageQueueSelector, Object, SendCallback, long)
DefaultMQProducerImpl#sendSelectImpl(Message, MessageQueueSelector, Object, CommunicationMode, SendCallback, timeout)
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
ClientConfig#queueWithNamespace(MessageQueue)
RocketMQ
提供的selector
org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash
org.apache.rocketmq.client.producer.selector.SelectMessageQueueByMachineRoom
org.apache.rocketmq.client.producer.selector.SelectMessageQueueByRandom
DefaultMQProducerImpl#sendKernelImpl(Message, MessageQueue, CommuicationMode, SendCallback, TopicPublishInfo, long)
- 接下来的调用见异步发送
事务消息
原理
TransactionListener
:TransactionMQProducer#setTransactionListener
- 执行本地事务:
TransactionListener#executeLocalTransaction
- 检查本地事务:
TransactionListener#checkLocalTransaction
- 执行状态:提交消息:
COMMIT_MESSAGE
;回滚消息:ROLLBACK_MESSAGE
;未知状态:UNKNOW
- 执行本地事务:
TransactionMQProducer#sendMessageInTransaction(Message, Object)
DefaultMQProducerImpl#sendMessageInTransaction(Message, LocalTransactionExecuer, Object)
DefaultMQProducerImpl#send(Message)
DefaultMQProducerImpl#send(Message)
- 接下来的调用见同步发送
TransactionListener#executeLocalTransaction(Message, Object)
DefaultMQProducerImpl#endTransaction(SendResult, LocalTransactionState, Throwable)
MQClientAPIImpl#endTransactionOneway(String, EndTransactionREquestHeader, String, long)
RemotingClient#invokeOneway(String, RemotingCommand, long)
- 发送消息给Broker:
NettyRemotingAbstract#invokeOnewayImpl(Channel, RemotingCommand, long)
Broker 是怎么处理客户端发送的消息?
Broker
接收客户端发过来的消息是从NettyRemotingAbstract#processMessageReceived(ChannelHandlerContext, RemotingCommand)
开始的。在该方法中,通过RemotingCommand#getType()
来判断是进入请求命令处理分支,还是响应命令处理分支。
因为是接收客户端的请求命令,所以这里进入请求命令分支:NettyRemotingAbstract#processRequestCommand(ChannelHandlerContext, RemotingCommand)
在该方法中,通过this.processorTable.get(cmd.getCode())
获取对应的NettyRequestProcessor
处理器。这里使用的是状态模式,通过不同的code
值来执行不同的逻辑。只不过这种模式比较巧妙,预先定义处理逻辑,将状态和处理逻辑作为键值对存入map
中,通过map#get(status)
这样的操作来获取状态处理逻辑。使用这种方式,要注意提供默认逻辑,当status
找不到对应的处理逻辑时,默认执行该逻辑。当然,在RocketMQ
中已经提供了默认逻辑null == matched ? this.defaultRequestProcessor : matched
。
然后构建一个RequestTask
对象,并执行它。
NettyRequestProcessor
package org.apache.rocketmq.remoting.netty;import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;/*** Common remoting command processor*/
public interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception;boolean rejectRequest();
}
服务端注册处理器的地方在BrokerController#registerProcessor()
发送消息
发送消息的处理器为SendMessageProcessor
SendMessageProcessor#processRequest(ChannelHandlerContext, RemotingCommand)
AbstractSendMessageProcessor#parseRequestHeader(RemotingCommand)
SendMessageProcessor#sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
MessageStore#putMessage(MessageExtBrokerInner)
CommitLog#putMessage(MessageExtBrokerInner)
MappedFile#appendMessage(MessageExtBrokerInner, AppendMessageCallback)
MappedFile#appendMessagesInner(MessageExt, AppendMessageCallback)
- 在该回调方法中 append 消息
DefaultAppendMessageCallback#doAppend(long, ByteBuffer, int, MessageExtBrokerInner)
- 在该回调方法中 append 消息
SendMessageProcessor#handlePutMessageResult(PutMessageResult, RemotingCommand, RomotingCommand, MessageExt, SendMessageResponseHeader, SendMessageContext, ChannelHadnlerContext, int)
事务消息
事务消息在服务端有:处理发送消息请求、处理发送结束事务请求两次请求处理,还有一个定时任务回查逻辑。
处理发送消息请求
发送消息请求与发送消息处理方式是一样的,不过在SendMessageProcessor#sendMessage
中会判断是否事务消息并进行处理。
// 事务标识
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) { // 判断事务标识if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return response;}// 保存事务半消息putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
SendMessageProcessor#sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
TransactionalMessageServiceImpl#prepareMessage(MessageExtBrokerInner)
TransactionalMessageBridge#putHalfMessage(MessageExtBrokerInner)
- 保存半消息:
store.putMessage(parseHalfMessageInner(messageInner))
- 保存半消息:
处理发送结束事务请求
发送结束事务请求的处理器是EndTransactionProcessor
EndTransactionProcessor#processRequest(ChannelHandlerContext, RemotingCommand)
定时任务回查逻辑
定时任务回查分为服务端与客户端两块逻辑
Broker端
- 初始化定时任务检测对象:
BrokerController#initialTransaction()
- 启动定时任务:
BrokerController#start()
TransactionalMessageCheckService#onWaitEnd()
TransactionalMessageServiceImpl#check(long, int, AbstractTransactionalMessageCheckListener)
AbstractTransactionalMessageCheckListener#resolveHalfMsg(MessageExt)
AbstractTransactionalMessageCheckListener#sendCheckMessage(MessageExt)
- 发送检测消息:
Broker2Client#checkProducerTransactionState(String, Channel, CheckTransactionStateRequestHeader, MessageExt)
客户端
- 客户端通用处理请求方法:
ClientRemotingProcessor#processRequest(ChannelHandlerContext, RemotingCommand)
ClientRemotingProcessor#checkTransactionState(ChannelHandlerContext, RemotingCommand)
DefaultMQProducerImpl#checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)
MQClientAPIImpl#endTransactionOneway(String, EndTransactionRequestHeader, String, long)
- 接下来的调用见客户端事务消息
客户端是怎样与服务端交互的?
以从NameServer
获取路由信息为例:
MQClientAPIImpl#getTopicRouteInfoFromNameServer(String, long, boolean)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {// 创建一个实现了 CommandCustomHeader 接口的对象GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);// 通过请求码和 header 创建一个 RemotingCommand 对象// RequestCode 中有不同的请求码,通过请求码来确定请求类型,并进行相应处理// 该创建方式为通用创建请求对象方式RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);// 同步调用获取结果// 内部调用处理方式和 Broker 是怎么处理客户端发送的消息一节原理相似RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {// 返回解码反序列化后的响应体return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());
}
由上面一例,可知:
- 请求是基于
RemotingClient
对象的,RemotingClient
是一个接口,所以可以通过选择不同的实现类,选择服务器支持的协议进行交互。 - 默认使用
NettyRemotingClient
对象进行交互。基于netty
通讯框架。 - 请求对象与响应对象同为
RemotingCommand
,简化了通讯框架序列化与反序列化的代码。 RequestCode
中有不同的请求码,通过请求码来确定请求类型,并进行相应处理ResponseCode
中有不同的响应码,通过响应码来确定响应类型,并进行相应处理- 通过实现
CommandCustomHeader
接口,来实现header
的通用化处理。- 使用此方式需要注意序列化与反序列化时,对象类型及对象中字段的值是否正确处理
客户端怎么获取 Broker 信息?
以获取Broker
集群信息为例
客户端
MQClientAPIImpl
是客户端API实现类,通过该类可以了解客户端提供了哪些API接口供上层调用。
MQClientAPIImpl#getBrokerClusterInfo(long)
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
NameServer端
DefaultRequestProcessor#processRequest(ChannelHandlerContext, RemotingCommand)
DefaultRequestProcessor#getBrokerClusterInfo(ChannelHandlerContext, RemotingCommand)
RouteInfoManager#getAllClusterInfo()
public byte[] getAllClusterInfo() {// 创建集群信息类ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();// 设置 broker 名与 broker 地址的映射对象clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);// 设置 集群名与 broker 集合的映射对象clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);// 返回编码后的对象return clusterInfoSerializeWrapper.encode();
}
NameServer 是怎么管理 Broker 信息的?
客户端怎么获取队列信息?
Broker 是怎么存储消息的?
RocketMQ 的消息存储架构是怎样的?
RocketMQ源码学习相关推荐
- RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?
RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...
- RocketMQ源码学习(六)-Name Server
问题列表: Name Server 的作用是什么? Name Server 存储了Broker的什么信息? Name Server 为Producer的提供些什么信息? Name Server 为Co ...
- 结合RocketMQ 源码,带你了解并发编程的三大神器
摘要:本文结合 RocketMQ 源码,分享并发编程三大神器的相关知识点. 本文分享自华为云社区<读 RocketMQ 源码,学习并发编程三大神器>,作者:勇哥java实战分享. 这篇文章 ...
- RocketMQ 源码分析 —— 集成 Spring Boot
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...
- mq消费者组_「架构师MQ进阶」RocketMQ源码分析(四)- 源代码包结构分析
在前面第一篇中已经将源代码下载到本地了,本篇主要是介绍代码中相关模块到作用.036.Rocket-MQ-Source-code-cover.png 一.源码结构 RocketMQ源码组织方式基于Mav ...
- RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析
深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...
- RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码
转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...
- RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】
基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码. 此前我们学习了RocketMQ源码(18)-Defau ...
- RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic
此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...
最新文章
- 批处理,%~d0 cd %~dp0 代表什么意思
- Float浮点数的使用和条件
- Spring boot切换Servlet容器
- 薪资超大厂,校招天花板!Google大神云集,美团等参投,无人驾驶TOP独角兽!轻舟智航100+offer等你来!...
- 访问修饰符(C# 编程指南)
- 860. 柠檬水找零 golang
- wireshark抓包工具的使用及分析
- jQuery 倒计时
- Android.mk宏定义demo【转】
- web测试和app测试相关
- 一文看懂卷积神经网络
- oracle @id@,修改oracle用户id
- Un 进行攻击计时效果
- #数组元素相乘_C++ 矩阵相乘
- 【Docker】05 容器数据卷
- 长链剖分算法完整总结
- linux vim -b详解,linux vim
- (1)pytorch 实现 minist手写数据集(cpu/gpu)版本
- Telltale:简化了Netflix应用程序监视
- red5简介及基础知识
热门文章
- 虚拟机网络连接三种方式(桥接、NAT、主机)
- STP生成树的选举详细步骤、四个案列详解(附图,建议电脑观看)
- C语言,计算数据类型及所对应的字节数。
- Spring Boot整合Servlet,Filter,Listener,访问静态资源
- C#LeetCode刷题之#859-亲密字符串​​​​​​​​​​​​​​(Buddy Strings)
- 中国企业2017年数据_根据数据,2017年最好的免费在线课程
- di-tech2016_2016年Tech最佳愚人节笑话
- go语言web开发 排坑指南
- SpringBoot-视图解析与模板引擎
- windows文本转语音 通过java 调用python 生成exe可执行文件一条龙