RocketMQ源码学习

文章目录

  • RocketMQ源码学习
    • Producer 是怎么将消息发送至 Broker 的?
      • 同步发送
      • 异步发送
      • 队列选择器
      • 事务消息
        • 原理
    • Broker 是怎么处理客户端发送的消息?
      • NettyRequestProcessor
      • 发送消息
      • 事务消息
        • 处理发送消息请求
        • 处理发送结束事务请求
        • 定时任务回查逻辑
          • Broker端
          • 客户端
    • 客户端是怎样与服务端交互的?
    • 客户端怎么获取 Broker 信息?
      • 客户端
      • NameServer端
    • NameServer 是怎么管理 Broker 信息的?
    • 客户端怎么获取队列信息?
    • Broker 是怎么存储消息的?
    • RocketMQ 的消息存储架构是怎样的?

Producer 是怎么将消息发送至 Broker 的?

同步发送

RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

异步发送

  • DefaultMQProducer#send(Message, SendCallback)
  • DefaultMQProducerImpl#send(Message, SendCallback)
  • DefaultMQProducerImpl#send(Message, SendCallback, long)
  • DefaultMQProducerImpl#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)
  • DefaultMQProducerImpl#sendKernelImpl(Message, MessageQueue, CommuicationMode, SendCallback, TopicPublishInfo, long)
  • MQClientAPIImpl#sendMessage(String, String, Message, SendMessageRequestHeader, long, CommunicationMode, SendCallback, TopicPublishInfo, MQClientInstance, int, SendMessageContext, DefaultMQProducerImpl)
  • MQClientAPIImpl#sendMessageAsync(String, String, Message, long, RemotingCommand, SendCallback, TopicPublishInfo, MQClientInstance, int, AtomicInteger, SendMessageContext, DefaultMQProducerImpl)
  • NettyRemotingClient#invokeAsync(String, RemotingCommand, long, InvokeCallback)
  • NettyRemotingAbstract#invokeAsyncImpl(Channel, RemotingCommand, long, InvokeCallback)
    • this.responseTable.put(opaque, responseFuture);
    • responseFuture放入map中,等待处理(在netty接收中服务端返回的响应)
    • 在该方法中处理:NettyRemotingAbstract#processResponseCommand

队列选择器

  • DefaultMQProducer#send(Message, MessageQueueSelector, Object, SendCallback)
  • DefaultMQProducerImpl#send(Message, MessageQueueSelector, Object, SendCallback)
  • DefaultMQProducerImpl#send(Message, MessageQueueSelector, Object, SendCallback, long)
  • DefaultMQProducerImpl#sendSelectImpl(Message, MessageQueueSelector, Object, CommunicationMode, SendCallback, timeout)
    • mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));

      • ClientConfig#queueWithNamespace(MessageQueue)
      • RocketMQ提供的selector
        • org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash
        • org.apache.rocketmq.client.producer.selector.SelectMessageQueueByMachineRoom
        • org.apache.rocketmq.client.producer.selector.SelectMessageQueueByRandom
  • DefaultMQProducerImpl#sendKernelImpl(Message, MessageQueue, CommuicationMode, SendCallback, TopicPublishInfo, long)
  • 接下来的调用见异步发送

事务消息

原理

  • TransactionListenerTransactionMQProducer#setTransactionListener

    • 执行本地事务:TransactionListener#executeLocalTransaction
    • 检查本地事务:TransactionListener#checkLocalTransaction
    • 执行状态:提交消息:COMMIT_MESSAGE;回滚消息:ROLLBACK_MESSAGE;未知状态:UNKNOW
  • TransactionMQProducer#sendMessageInTransaction(Message, Object)
  • DefaultMQProducerImpl#sendMessageInTransaction(Message, LocalTransactionExecuer, Object)
    • DefaultMQProducerImpl#send(Message)

      • DefaultMQProducerImpl#send(Message)
      • 接下来的调用见同步发送
    • TransactionListener#executeLocalTransaction(Message, Object)
    • DefaultMQProducerImpl#endTransaction(SendResult, LocalTransactionState, Throwable)
      • MQClientAPIImpl#endTransactionOneway(String, EndTransactionREquestHeader, String, long)
      • RemotingClient#invokeOneway(String, RemotingCommand, long)
      • 发送消息给Broker: NettyRemotingAbstract#invokeOnewayImpl(Channel, RemotingCommand, long)

Broker 是怎么处理客户端发送的消息?

Broker接收客户端发过来的消息是从NettyRemotingAbstract#processMessageReceived(ChannelHandlerContext, RemotingCommand)开始的。在该方法中,通过RemotingCommand#getType()来判断是进入请求命令处理分支,还是响应命令处理分支。

因为是接收客户端的请求命令,所以这里进入请求命令分支:NettyRemotingAbstract#processRequestCommand(ChannelHandlerContext, RemotingCommand)

在该方法中,通过this.processorTable.get(cmd.getCode())获取对应的NettyRequestProcessor处理器。这里使用的是状态模式,通过不同的code值来执行不同的逻辑。只不过这种模式比较巧妙,预先定义处理逻辑,将状态和处理逻辑作为键值对存入map中,通过map#get(status)这样的操作来获取状态处理逻辑。使用这种方式,要注意提供默认逻辑,当status找不到对应的处理逻辑时,默认执行该逻辑。当然,在RocketMQ中已经提供了默认逻辑null == matched ? this.defaultRequestProcessor : matched

然后构建一个RequestTask对象,并执行它。

NettyRequestProcessor

package org.apache.rocketmq.remoting.netty;import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;/*** Common remoting command processor*/
public interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception;boolean rejectRequest();
}

服务端注册处理器的地方在BrokerController#registerProcessor()

发送消息

发送消息的处理器为SendMessageProcessor

  • SendMessageProcessor#processRequest(ChannelHandlerContext, RemotingCommand)

    • AbstractSendMessageProcessor#parseRequestHeader(RemotingCommand)
    • SendMessageProcessor#sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
      • MessageStore#putMessage(MessageExtBrokerInner)

        • CommitLog#putMessage(MessageExtBrokerInner)

          • MappedFile#appendMessage(MessageExtBrokerInner, AppendMessageCallback)

            • MappedFile#appendMessagesInner(MessageExt, AppendMessageCallback)

              • 在该回调方法中 append 消息DefaultAppendMessageCallback#doAppend(long, ByteBuffer, int, MessageExtBrokerInner)
      • SendMessageProcessor#handlePutMessageResult(PutMessageResult, RemotingCommand, RomotingCommand, MessageExt, SendMessageResponseHeader, SendMessageContext, ChannelHadnlerContext, int)

事务消息

事务消息在服务端有:处理发送消息请求、处理发送结束事务请求两次请求处理,还有一个定时任务回查逻辑。

处理发送消息请求

发送消息请求与发送消息处理方式是一样的,不过在SendMessageProcessor#sendMessage中会判断是否事务消息并进行处理。

// 事务标识
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {  // 判断事务标识if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return response;}// 保存事务半消息putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
  • SendMessageProcessor#sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
  • TransactionalMessageServiceImpl#prepareMessage(MessageExtBrokerInner)
  • TransactionalMessageBridge#putHalfMessage(MessageExtBrokerInner)
    • 保存半消息:store.putMessage(parseHalfMessageInner(messageInner))

处理发送结束事务请求

发送结束事务请求的处理器是EndTransactionProcessor

  • EndTransactionProcessor#processRequest(ChannelHandlerContext, RemotingCommand)

定时任务回查逻辑

定时任务回查分为服务端与客户端两块逻辑

Broker端
  • 初始化定时任务检测对象:BrokerController#initialTransaction()
  • 启动定时任务:BrokerController#start()
  • TransactionalMessageCheckService#onWaitEnd()
  • TransactionalMessageServiceImpl#check(long, int, AbstractTransactionalMessageCheckListener)
  • AbstractTransactionalMessageCheckListener#resolveHalfMsg(MessageExt)
  • AbstractTransactionalMessageCheckListener#sendCheckMessage(MessageExt)
  • 发送检测消息:Broker2Client#checkProducerTransactionState(String, Channel, CheckTransactionStateRequestHeader, MessageExt)
客户端
  • 客户端通用处理请求方法:ClientRemotingProcessor#processRequest(ChannelHandlerContext, RemotingCommand)
  • ClientRemotingProcessor#checkTransactionState(ChannelHandlerContext, RemotingCommand)
  • DefaultMQProducerImpl#checkTransactionState(String, MessageExt, CheckTransactionStateRequestHeader)
  • MQClientAPIImpl#endTransactionOneway(String, EndTransactionRequestHeader, String, long)
  • 接下来的调用见客户端事务消息

客户端是怎样与服务端交互的?

以从NameServer获取路由信息为例:

  • MQClientAPIImpl#getTopicRouteInfoFromNameServer(String, long, boolean)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {// 创建一个实现了 CommandCustomHeader 接口的对象GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);// 通过请求码和 header 创建一个 RemotingCommand 对象// RequestCode 中有不同的请求码,通过请求码来确定请求类型,并进行相应处理// 该创建方式为通用创建请求对象方式RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);// 同步调用获取结果// 内部调用处理方式和 Broker 是怎么处理客户端发送的消息一节原理相似RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {byte[] body = response.getBody();if (body != null) {// 返回解码反序列化后的响应体return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());
}

由上面一例,可知:

  1. 请求是基于RemotingClient对象的,RemotingClient是一个接口,所以可以通过选择不同的实现类,选择服务器支持的协议进行交互。
  2. 默认使用NettyRemotingClient对象进行交互。基于netty通讯框架。
  3. 请求对象与响应对象同为RemotingCommand,简化了通讯框架序列化与反序列化的代码。
  4. RequestCode中有不同的请求码,通过请求码来确定请求类型,并进行相应处理
  5. ResponseCode中有不同的响应码,通过响应码来确定响应类型,并进行相应处理
  6. 通过实现CommandCustomHeader接口,来实现header的通用化处理。
    • 使用此方式需要注意序列化与反序列化时,对象类型及对象中字段的值是否正确处理

客户端怎么获取 Broker 信息?

以获取Broker集群信息为例

客户端

MQClientAPIImpl是客户端API实现类,通过该类可以了解客户端提供了哪些API接口供上层调用。

  • MQClientAPIImpl#getBrokerClusterInfo(long)

    • RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
    • RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);

NameServer端

  • DefaultRequestProcessor#processRequest(ChannelHandlerContext, RemotingCommand)
  • DefaultRequestProcessor#getBrokerClusterInfo(ChannelHandlerContext, RemotingCommand)
  • RouteInfoManager#getAllClusterInfo()
public byte[] getAllClusterInfo() {// 创建集群信息类ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();// 设置 broker 名与 broker 地址的映射对象clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);// 设置 集群名与 broker 集合的映射对象clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);// 返回编码后的对象return clusterInfoSerializeWrapper.encode();
}

NameServer 是怎么管理 Broker 信息的?

客户端怎么获取队列信息?

Broker 是怎么存储消息的?

RocketMQ 的消息存储架构是怎样的?

RocketMQ源码学习相关推荐

  1. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

  2. RocketMQ源码学习(六)-Name Server

    问题列表: Name Server 的作用是什么? Name Server 存储了Broker的什么信息? Name Server 为Producer的提供些什么信息? Name Server 为Co ...

  3. 结合RocketMQ 源码,带你了解并发编程的三大神器

    摘要:本文结合 RocketMQ 源码,分享并发编程三大神器的相关知识点. 本文分享自华为云社区<读 RocketMQ 源码,学习并发编程三大神器>,作者:勇哥java实战分享. 这篇文章 ...

  4. RocketMQ 源码分析 —— 集成 Spring Boot

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~ 中文详细注释的开源项目 RP ...

  5. mq消费者组_「架构师MQ进阶」RocketMQ源码分析(四)- 源代码包结构分析

    在前面第一篇中已经将源代码下载到本地了,本篇主要是介绍代码中相关模块到作用.036.Rocket-MQ-Source-code-cover.png 一.源码结构 RocketMQ源码组织方式基于Mav ...

  6. RocketMQ源码(十)—Broker 消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析

    深入的介绍了broker的消息刷盘服务源码解析,以及高性能的刷盘机制. 学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者是异步: 1. 同步刷盘:如上图所示,只有消息真正 ...

  7. RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码

    转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...

  8. RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码. 此前我们学习了RocketMQ源码(18)-Defau ...

  9. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

最新文章

  1. 批处理,%~d0 cd %~dp0 代表什么意思
  2. Float浮点数的使用和条件
  3. Spring boot切换Servlet容器
  4. 薪资超大厂,校招天花板!Google大神云集,美团等参投,无人驾驶TOP独角兽!轻舟智航100+offer等你来!...
  5. 访问修饰符(C# 编程指南)
  6. 860. 柠檬水找零 golang
  7. wireshark抓包工具的使用及分析
  8. jQuery 倒计时
  9. Android.mk宏定义demo【转】
  10. web测试和app测试相关
  11. 一文看懂卷积神经网络
  12. oracle @id@,修改oracle用户id
  13. Un 进行攻击计时效果
  14. #数组元素相乘_C++ 矩阵相乘
  15. 【Docker】05 容器数据卷
  16. 长链剖分算法完整总结
  17. linux vim -b详解,linux vim
  18. (1)pytorch 实现 minist手写数据集(cpu/gpu)版本
  19. Telltale:简化了Netflix应用程序监视
  20. red5简介及基础知识

热门文章

  1. 虚拟机网络连接三种方式(桥接、NAT、主机)
  2. STP生成树的选举详细步骤、四个案列详解(附图,建议电脑观看)
  3. C语言,计算数据类型及所对应的字节数。
  4. Spring Boot整合Servlet,Filter,Listener,访问静态资源
  5. C#LeetCode刷题之#859-亲密字符串​​​​​​​​​​​​​​(Buddy Strings)
  6. 中国企业2017年数据_根据数据,2017年最好的免费在线课程
  7. di-tech2016_2016年Tech最佳愚人节笑话
  8. go语言web开发 排坑指南
  9. SpringBoot-视图解析与模板引擎
  10. windows文本转语音 通过java 调用python 生成exe可执行文件一条龙