导语
  这篇博客要从官方给出的一张图开始说起,之前的分析我们都是简单的分析了一下消息传递的流程,以及消息传递流程过程中出现的一些类的封装,并且提出,所有的封装操作都是为了更加高效的服务于NameServer、Broker、Producer、Consumer这种模式。之前是对节点上的内容进行分析,这篇博客就来详细分析一下连接两个节点之间的内容到底是是什么?

文章目录

  • 概念详解
  • 源码分析
    • RemotingService接口
    • RemotingServer 接口
    • NettyRemotingAbstract 抽象类
      • processMessageReceived() 方法
    • 消息的通信方式
    • Reactor多线程设计
  • 总结

  在之前的分析中,我们知道,所有的客户端与服务端的交互最终都会给到一个Remoting模块,对于这个Remoting模块简单的来讲就是实现了消息之间的传输操作。对于数据的传输,需要解决的问题主要有两个IO和协议。下面就结合官方的文档来进行详细分析。

概念详解

  RocketMQ消息队列集群主要包括四个角色:NameServer、Broker(Master/Slave)、Producer和Consumer。
  基本通信过程如下:

  • (1)代理启动后,需要完成一个操作:向NameServer注册自身,然后每隔30秒向NameServer报告主题路由信息。
  • (2) 当消息生产者作为客户端发送消息时,需要根据消息的主题从本地缓存TopicPublishInfoTable获取路由信息。否则,将从NameServer检索路由信息并更新到本地缓存,同时,Producer默认每30秒从NameServer检索一次路由信息。
  • (3) 消息生产者根据2中获得的路由信息选择一个队列来发送消息;代理接收消息并将其作为消息的接收者记录在磁盘中。
  • (4) 在消息使用者根据2)获取路由信息并完成客户端的负载平衡后,选择一个或多个消息队列来拉取消息并使用它们。

  从以上1)~3)可以看出,生产者、代理和名称服务器都是相互通信的(这里只提到了MQ通信的一部分),因此如何设计一个好的网络通信模块在MQ中是非常重要的。它将决定RocketMQ集群的总体消息传递能力和最终性能。

  Rocketmq远程处理模块是rocketmq消息队列中负责网络通信的模块。它依赖并被需要网络通信的几乎所有其他模块(如rocketmq客户机、rocketmq代理、rocketmq namesrv)引用。为了实现客户机和服务器之间高效的数据请求和接收,RocketMQ消息队列定义了通信协议,并在Netty的基础上扩展了通信模块。具体的扩展如下图所示

源码分析

  结合上图在源码中可以看到几乎是与上图的关系是一样的。这里就来从上到下来分析一下下图中这些类都是什么作用,并且在后期使用的时候还会加入详细的使用分析。

RemotingService接口

  首先会看到,最顶层的是一个接口,这个接口定义了三个方法,分别负责启动、停止以及RPCHook的注入操作,对于RPC默认大家都有过了解,对于Java Hook机制也默认有了解,至于这里为什么会有这样的一个操作,先做说明,因为这里只是一个接口类,对于这个方法只是定义了实现这个接口的一个规则,至于怎么样实现这个规则则是后续的抽象类或者是具体实现类中可以看到这个规则的具体实现。

public interface RemotingService {void start();void shutdown();void registerRPCHook(RPCHook rpcHook);
}

RemotingServer 接口

  从上面的类图中可以看到RemotingService主要有两个大的方向服务端和客户端。这里就先来看看客户端的实现。

public interface RemotingServer extends RemotingService {void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);int localListenPort();Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,RemotingTimeoutException;void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,RemotingSendRequestException;}

  从上面接口类中可以在Service端这里实现了一些方法,就来看看这些方法具体都是干什么的?

  • void registerProcessor(); 注册处理器,对于这个方法传入了三个参数
    * final int requestCode 请求码
    * final NettyRequestProcessor processor Netty请求处理器
    * final ExecutorService executor 执行服务
  • void registerDefaultProcessor();注册默认处理器,传入了两个参数
    • final NettyRequestProcessor processor 请求处理器
    • ExecutorService executor 执行服务
  • int localListenPort(); 本地端口监听
  • Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); 根据请求码获取处理器对。
  • RemotingCommand invokeSync() 同步执行操作
  • void invokeAsync() 异步执行操作
  • void invokeOneway() 单向执行操作

  到这里可以隐约的感觉到上面这些方法都不是很简单的规则定义,为什么要定义这样一些规则呢?它到底支持了RocketMQ 什么样的操作?带着这些问题就进入到了具体的实现类中

NettyRemotingAbstract 抽象类

  从上面类图中看到在实现具体的NettyRemotingServer类的的时候还继承了一个NettyRemotingAbstract的抽象类,从类名上可以看到这里这个类做的操作是对NettyRemoting相关的操作。那么下面就来分析一下这个类具体做了那些操作。

  在上面类中有以下的一些方法是需要去深入的了解

processMessageReceived() 方法

  在前面的分析中,对于Consumer端怎么收到Message进行了方法追踪,最后找到了一个方法,就是这个方法,由于NettyRemotingAbstract抽象类有两个子类,一个是Server端的子类一个是Client的子类,这里研究的是Server端,所以下来看看这个方法在Server端是怎么进行调用的。

  上面的方法在Server端是通过如下的一段代码进行调用的,通过下面代码可以看到之前也曾经分析过,其中有两个值的注意的地方,一个就是@ChannelHandler.Sharable 注解,在之前的分析中也聊到过。另一个需要注意的地方就是这个Handler所继承的SimpleChannelInboundHandler,这个类是由Netty底层提供。对于这个类的作用在之前的时候提到过,在客户端,当 channelRead0() 方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler负责释放指向保存该消息的ByteBuf的内存引用。对于这个类的其他高级用法会在后续的Netty详解系列中提到,这里只需要知道它的这个方法以及简单的用法即可。从这里可以知道也就是说processMessageReceived()方法中的内容是RocketMQ中的关键的IO操作。在之前说到的接下来就是找到它是以什么样的协议来实现

  之前的分析中其实已经接触到了有关协议的操作,在分析Producer端发送消息的时候,有一段代码是有关RequestHeader的封装,那么在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap<String, String> 请求自定义扩展信息 响应自定义扩展信息

可见传输内容主要可以分为以下4部分:

  • (1) 消息长度:总长度,四个字节存储,占用一个int类型;
  • (2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
  • (3) 消息头数据:经过序列化后的消息头数据;
  • (4) 消息主体数据:消息主体的二进制字节数据内容;

  会看到在channelRead0方法中传入了两个参数ChannelHandlerContext,RemotingCommand,也就是说要实现之前提到的两个点IO和协议。在processMessageReceived()方法中的逻辑就是对这些内容进行Request和Response的拆封,也就是流向的控制。

消息的通信方式

  在之前的分析中我们知道。在RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。下面就,主要介绍RocketMQ的异步通信流程。

  如上图所示,提到了两个类,NettyClientHandler,NettyServerHandler,这两个类作为NettyRemotingClient类和NettyRemotingServer类的内部类存在,在上面的类图中也有看到,通过上面的代码结合流程图不难发现,其实在两个Handler类中所调用的就是上面提到的processMessageReceived()方法。下面就来结合源码来分析一下上面 这个流程。

NettyRemotingClient
1、装配远程处理的的命令和配置回调函数
  在之前分析Producer发送消息的时候提到过一个方法SendResult sendKernelImpl(),当时称这个方法为与给核心调用发送消息的方法,在这个方法中对于Message进行了封装将其真正的变成了一个可以作为请求响应的对象而存在,有点类似于OSI模型中对于数据的层层封装。继续追踪改方法,以SendResult的获取为目标,发现实际上进入底层对于SendResult的封装是在processSendResponse()中,但是分析这个方法会看到其实这个方法有一个参数就已经是Response对象了,也就是说在这个方法调用之前就已经有响应了,其实在调用下面这个方法的时候就已经看到它其中调用了一个this.remotingClient.invokeSync()的方法

private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response);}

2、给连接创建一个Channel并且绑定一个地址以及端口
  调用完上面这个方法之后就看到它传入的是两个参数一个是addr,一个是Request对象,这里会看到this.remotingClient.invokeSync()方法就是来自于RemotingClient而这个对象并且调用的这个方法就是NettyRemotingClient类中的方法。

  如图所示

  • 第一步,获取到了Channel的对象,也许有人会问这里获取到的是Channel对象,那么什么时候建立的Connection呢?也许看过之前分析的人都还记的,在start()方法分析的时候这个Connection就已经被创建了。
  • 第二步,判断Channel是否是可用的,并且这里调用了其中的一个this.invokeSyncImpl(channel, request, timeoutMillis - costTime); 方法这个方法其实不是该类的方法,而是它的父类也就是之前说过的NettyRemotingAbstract类的方法由于NettyRemotingClient继承了这个抽象类。
  • 第三步,返回Response进行封装

3、获取信号量
  在上面的分析中看到调用了NettyRemotingAbstract类中的invokeSyncImpl()方法
4、获取到一个想要得到的Response对象
5、调用NettyChannel的writeAndFlush方法发送Message

  其实在上面图中我们会看到将3、4、5 三个步骤都进行了操作,这从代码中可以看到,这里有一个信号量opaque,这个信号量是被Request参数进行设置的。从它的操作逻辑中可以看到,这个信号量进入之后被设置到了ResponseTable中,在最后返回成功的时候将这个信号量从table中移除,也就是说这个与RocketMQ的重试机制有关,会看到调用了writeAndFlush()方法将request参数进行发送。

NettyServerHandler
1、读取消息通过Netty接收消息并且进行处理
  在客户端逻辑发送成功之后,在之前的分析中从Consumer的接收方法中进行了追踪。最后找到到了下面这个类,也就是是在处理流程中的NettyServerHandler类看到其实它也调用了一个方法,这个方法其实就是上面分析中提到的processMessageReceived(ctx, msg)方法。这个方法也是被NettyRemotingAbstract抽象类所提供

  根据上面的流程分析,其实这里进入之后,从ctx中获取到的命令应该是Request的命令。也就是说执行的是 processRequestCommand(ctx, cmd)方法
NettyRemotingServer
1、调用processMessageReceived(ctx, msg);方法接收数据
  到这里调用的就是 processRequestCommand(ctx, cmd)方法,而这个方法一个最关键的执行逻辑就是如下的代码

2、通过响应码分配到不同的处理器执行处理
3、处理器线程获取到响应的标记
4、数据动态的被执行一种处理方式
5、设置一个opaque 到响应信息中并且发送到客户端
  从上面代码截图中可以看到上面的这些代码执行的就是上面的这些被Thread所执行的流程。在这个流程中最为值得注意的就是对于处理器以及执行器这两个操作在这个类中被封装成了Pair<NettyRequestProcessor, ExecutorService>这个对象。具体的逻辑就不在多说了,代码中都有所体现。
6、提交到执行线程池,会调到客户端

  从上面代码逻辑中可以看到,最终Run对象被一个叫做RequestTask的对象封装了,并且调用了
pair.getObject2().submit(requestTask);这样的一个方法将其进行了提交,在这个操作的时候会看到pair的第二个对象也就是ExecutorService这个执行服务。从流程图中可以看到,在客户端进行处理的时候也就是在SendResult进行封装的时候其实使用到的是多线程中分装的对象。
客户端处理
1、客户端处理线程处理消息
2、通过opaque绑定一个ResponseFuture
3、执行调用的回调线程
  在上面执行发送的时候就已经提到过对于SendResult对象的封装就是来自于ResponseFuture对象,而这个对象在Netty的概念中被称为想要获取到的结果在Netty中是是Future的概念,在RocketMQ中被进行了封装。

  到这里整个的流程就已经分析的很清楚了,接下来就来看看在上面的系统中看到的一个多线程Run的使用

Reactor多线程设计

  首先来介绍一下关于Reactor是什么,首先它是多线程中的一种使用模式组要有如下一些关键点

  • 1、事件驱动(event handling)
  • 2、可以处理一个或者多个输入源(one or more inputs)
  • 3、通过Service Handler 同步的将输入时间Event采用多路复用分发给相应的Request Handler处理


多个Reactor模型

  • 1、mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。

  • 2、subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另其扔给worker线程池来完成。

  了解过Netty的人知道在使用Netty的时候,有一个NIOEventLoop的创建,我们创建了两个一个,一个叫做BossGroup一个叫做WorkerGroup。这个使用与上面这个有点类似。RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。如下图所示。


  在上面定义RemotingService接口的时候看到了一个方法是关于默认的处理器设置,而且还提供了扩展的处理器执行器注册。结合上面的内容就可以简单的理解上图中说的意思了

  上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。

  • 1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。
  • 2、拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,
  • 3、根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

线程说明

线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor 主线程
N NettyServerEPOLLSelector_%d_%d Reactor 线程池
M1 NettyServerCodecThread_%d Worker线程池
M2 RemotingExecutorThread_%d 业务processor处理线程池

总结

  到这里整个的关于发送信息各个节点以及节点之间怎么进行通信的操作就分析完了,虽然看上去很繁琐,但是从分析源码的过程中也体会到了一些设计思想,从这些设计思想中,收获了不少的经验。为以后的工作和生活积累经验。通过源码分析可以知道怎么将所学的知识应用到实际的工作中,让知识变成真正的价值。后续还会继续分析关于RocketMQ中的其他操作敬请期待。

从源码分析RocketMQ系列-Remoting通信架构源码详解相关推荐

  1. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  2. 从源码分析RocketMQ系列-MQClientInstance类详解

    导语   在之前的分析中,看到有一个类MQClientInstance,这个无论是在Producer端还是在Consumer端都是很重要的一个类,很多的功能都是从这个类发起的,这边分享中就来详细的看看 ...

  3. 从源码分析RocketMQ系列-RocketMQ消息设计详解

    1 消息存储   消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...

  4. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  5. 从源码分析RocketMQ系列-Consumer消息接收逻辑

    导语   在前面的分析中分析了关于Producer发送消息的逻辑,并且追踪到了在DefaultMQPushConsumerImpl 类中的有对应的消息监听方法,这个消息监听的方法是从Consumer调 ...

  6. 从源码分析RocketMQ系列-Producer的SendResult来自哪里?

    导语   对于消息中间件大家都应该不陌生,现在比较主流的消息中间件有Kafka.RabbitMQ.RocketMQ.ActiveMQ等等.前段时间花了很长时间分析了关于RocketMQ源码,之前也分享 ...

  7. 从源码分析RocketMQ系列-Producer的SendResult的封装

    导语   通过之前博客的Producer的SendResult来自哪里分析到发送的核心机制,了解了在发送之前被使用的几个Hook,以及发送消息的RequestHeader的封装,但是这些封装都被一个t ...

  8. 从源码分析RocketMQ系列-start()方法详解

    导语   在之前的分析中主要介绍的是关于Producer 发送消息的逻辑,但是在实例代码中有一个操作是producer.start()方法,在Consumer中看到的方法是consumer.start ...

  9. 从源码分析RocketMQ系列-Producer的invokeSync()方法

    导语   在之前的博客中通过对于Producer中SendResult的跟踪找到了在Client模块下的所有的封装以及消费的过程,深入到对接Remoting模块的接口中对消息的封装以及发送回收等.但是 ...

最新文章

  1. Java异常体系结构
  2. RHEL在VM虚拟机下仅主机模式不能联网的解决方法
  3. python读取遥感 dat_基于python批量处理dat文件及科学计算方法详解
  4. 【教程】写CSDN博客时 调整图片大小,图片居中
  5. HDU1016(DFS)
  6. tfw文件如何导入cad_如何将CAD的线稿导入PS并和底色分离
  7. Linux学习——echo和read命令用法
  8. 内置函数补充 之 反射
  9. https提供安全的web通讯
  10. Java设计模式与实践
  11. 【VSLAM学习记录2】初识cmake
  12. java hacker code_我陷入了Java的第一个hackerrank挑战
  13. 基于用户的协同过滤Movielens电影推荐系统简单实例
  14. 微信小程序tabBar不显示问题
  15. 如何给图片去底色?不用ps即可轻松搞定
  16. 上班要了解的一些法律条例
  17. html网上日记本设计,个人博客的设计_网上日记本的开发ASP334
  18. 成长之路——发现问题、提出问题和解决问题
  19. Dubbo 实现原理与源码解析系列 —— 精品合集
  20. SQL server添加表并添加备注,使用sql创建

热门文章

  1. Vue.js 2.x笔记:指令(4)
  2. 90国央行齐聚华盛顿研讨区块链:“这一切意味着什么”
  3. spring-102-spring全注解快速实现事务
  4. mysql5.5和5.6版本间的坑
  5. 中文乱码问题解决方法总结
  6. 2013年工作中遇到的20个问题:241-260
  7. 2012年回忆录及2013年目标设立
  8. 2012年十大项目月度得分榜
  9. Fedora Linux中配置JDK5或JDK6环境变量
  10. Redis配置文件(3)常见的配置修改