一、前言

remoting是RocketMQ的底层通信模块,RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析,来说明remoting如何实现高性能通信的。

二、Remoting 通信模块结构

remoting 的网络通信是基于 Netty 实现,模块中类的继承关系如下:

可见通信的类继承自类RemotingService,RemotingService的定义如下:

public interface RemotingService {// 服务启动void start();// 服务停止void shutdown();//注册RPC钩子函数void registerRPCHook(RPCHook rpcHook);
}

RemotingServer:继承自RemotingService,定义了服务端的接口

public interface RemotingServer extends RemotingService {/***  注册处理器* @param requestCode   请求码* @param processor     处理器* @param executor      线程池*    这三者是绑定关系:*       根据请求的code  找到处理对应请求的处理器与线程池 并完成业务处理。*/void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);/***  注册缺省处理器* @param processor  缺省处理器* @param executor   线程池*/void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);// 获取服务端口int localListenPort();/***  根据 请求码 获取 处理器和线程池* @param requestCode  请求码* @return*/Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);/***  同步调用* @param channel   通信通道* @param request   业务请求对象* @param timeoutMillis   超时时间* @return  响应结果封装*/RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,RemotingTimeoutException;/***  异步调用* @param channel  通信通道* @param request  业务请求对象* @param timeoutMillis  超时时间* @param invokeCallback  响应结果回调对象*/void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;/***  单向调用 (不关注返回结果)* @param channel   通信通道* @param request   业务请求对象* @param timeoutMillis  超时时间*/void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,RemotingSendRequestException;
}

从上面的代码可以看出,RemotingServer的主要功能是注册请求协议处理器、请求调用方法。

NettyRemotingServer:服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);// Netty服务端启动器private final ServerBootstrap serverBootstrap;// worker组private final EventLoopGroup eventLoopGroupSelector;// boss组 private final EventLoopGroup eventLoopGroupBoss;// Netty服务端配置信息类private final NettyServerConfig nettyServerConfig;// 公共线程池   (在注册协议处理器的时候,若未给处理器指定线程池,那么就是用该公共线程池)private final ExecutorService publicExecutor;//  Netty Channel 特殊状态监听器private final ChannelEventListener  channelEventListener;// 定时器  (功能: 扫描 responseTable表,将过期的responseFuture移除)private final Timer timer = new Timer("ServerHouseKeepingService", true);// 用于在pipeline指定handler中 执行任务的线程池private DefaultEventExecutorGroup defaultEventExecutorGroup;// 服务端绑定的端口private int port = 0;private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";private static final String TLS_HANDLER_NAME = "sslHandler";private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";// 用于处理 SSL 握手连接的处理器private HandshakeHandler handshakeHandler;// 协议编码 处理器private NettyEncoder encoder;// 连接管理 处理器private NettyConnectManageHandler connectionManageHandler;// 核心业务 处理器private NettyServerHandler serverHandler;// 参数1: nettyServerConfig  Netty服务端配置信息// 参数2: channelEventListener  channel特殊状态监听器public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {// 调用父类  就是通过 Semaphore 设置请求并发限制// 1. 设置 单行请求的并发限制// 2. 设置 异步请求的并发限制super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;// 创建公共线程池 publicExecutor   线程数量为:4int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑if (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}// 加载SSL连接的相关方法 (不在本篇的分析范围内)loadSslContext();}
}

NettyRemotingServer当中重要的参数:

  1. 父类的属性 semaphoreOneway , **semaphoreAsync ** 用来控制请求并发量的
  2. serverBootstrap Netty服务器启动器
  3. nettyServerConfig Netty服务器配置信息
  4. channelEventListener Netty Channel状态监听器
  5. eventLoopGroupSelector worker组
  6. eventLoopGroupBoss boss组

NettyRemotingServer的启动

    // 启动Netty 服务器@Overridepublic void start() {// Netty pipeline中的指定 handler 采用该线程池执行this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 初始化 处理器 handler// 1. handshakeHandler  SSL连接// 2. encoder  编码器// 3. connectionManageHandler 连接管理器处理器// 4. serverHandler 核心业务处理器prepareSharableHandlers();// 下面就是 Netty 创建服务端启动器的固定流程 ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端ServerSocketChannel 类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 设置客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true)// 设置 接收缓冲区 和 发送缓冲区的 大小.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroupch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池 是 PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {//  服务器 绑定端口ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 条件成立: channel状态监听器不为空, 则创建 网络异常事件执行器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 提交定时任务,每一秒 执行一次// 扫描 responseTable 表, 将过期的 responseFuture 移除this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}

上述代码 基本上就是 模板Netty创建服务端的代码,主要做了如下几件事:

  1. 启动Netty服务器
  2. 开启 channel状态监听线程
  3. 开启 扫描 responseFuture 的定时任务

通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,使用多个线程池来处理数据

1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。
     RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。
    2、拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),
    3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。
    4、而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

NettyRemotingAbstract:抽象类NettyRemotingAbstractNettyRemotingServer的父类,主要定义了请求并发量、控制响应对象和各种请求处理函数。

public abstract class NettyRemotingAbstract {// 控制 单向请求的 并发量protected final Semaphore semaphoreOneway;// 控制 异步请求的 并发量protected final Semaphore semaphoreAsync;// 响应对象映射表  (key: opaque  value:responseFuture)protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =new ConcurrentHashMap<Integer, ResponseFuture>(256);// 请求处理器映射表 (key: requestCode  value:(processor,executor)  )protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);// Netty事件监听线程池protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();// 默认的请求处理器对  包含(processor,executor) protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;// SSL相关protected volatile SslContext sslContext;// 扩展钩子protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
}

RocketMQ的底层通信模块remoting 源码解析相关推荐

  1. 跟我学RocketMQ之批量消息发送源码解析

    上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送.本文中,我们就一起来集中分析一下批量消息的发送是怎样的 ...

  2. solr调用lucene底层实现倒排索引源码解析

    1.什么是Lucene? 作为一个开放源代码项目,Lucene从问世之后,引发了开放源代码社群的巨大反响,程序员们不仅使用它构建具体的全文检索应用,而且将之集成到各种系统软件中去,以及构建Web应用, ...

  3. IO多路复用底层原理及源码解析

    基本概念 1. 关于linux文件描述符 在Linux中,一切都是文件,除了文本文件.源文件.二进制文件等,一个硬件设备也可以被映射为一个虚拟的文件,称为设备文件.例如,stdin 称为标准输入文件, ...

  4. RocketMQ:消息ACK机制源码解析

    消息消费进度 概述 消费者消费消息过程中,为了避免消息的重复消费,应将消息消费进度保存起来,当其他消费者再对消息进行消费时,读取已消费的消息偏移量,对之后的消息进行消费即可. 消息模式分为两种: 集群 ...

  5. [源码解析] 机器学习参数服务器 Paracel (1)-----总体架构

    [源码解析] 机器学习参数服务器 Paracel (1)-----总体架构 文章目录 [源码解析] 机器学习参数服务器 Paracel (1)-----总体架构 0x00 摘要 0x01使用 1.1 ...

  6. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  7. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  8. java treeset原理_Java集合 --- TreeSet底层实现和原理(源码解析)

    概述 文章的内容基于JDK1.7进行分析,之所以选用这个版本,是因为1.8的有些类做了改动,增加了阅读的难度,虽然是1.7,但是对于1.8做了重大改动的内容,文章也会进行说明. TreeSet实现了S ...

  9. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  10. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

最新文章

  1. 大话数据结构书籍及配套源码
  2. 使用深度V8.1 系统后打开部分文件夹缓慢
  3. java set path_Java Path.setEffect方法代码示例
  4. [html] 举例说明锚点定位有什么作用?
  5. SprinBoot2.X 集成 Flowable6.6 工作流引擎
  6. php 两个符号怎么打,怎么打出圈2符号
  7. [转载] python存数据库、c++读数据库_如何从C中读取python pickle数据库/文件?
  8. linux停止tomcat为什么要kill其掉进程 而不是直接shutdown.sh
  9. android 模拟器 403,Android403R2模拟器安装.doc
  10. 用xshell7和xftp7连接虚拟机CentOS7.6的步骤
  11. 【Java基础知识 17】聊一聊同步代码块
  12. 串口485接法图_RS232转换为RS485的接线方法最好有图
  13. 维基百科怎么做_维基百科创建修改技巧分享!
  14. 假定我们要建立一个学术论文数据库,存储如下信息: •学术期刊有期刊编号、期刊名、发行单位; •作者有作者编号、作者姓名、电子邮件; •论文有论文编号、论文标题、摘要、正文; •每篇论文只被一个
  15. 工业设计算计算机类专业吗,北大工学院工业设计工程数一计算机方向经验贴
  16. delphi调试需要管理员权限程序报错“Unable to create process:请求的操作需要提升”
  17. 卡1有信号 卡2无服务器,为什么卡1无服务卡2有
  18. 穆利堂推荐机会来了你做好准备了吗?怎么让机会找到你?
  19. 腾讯开发者登录不上去
  20. 图解ReentrantLock底层公平锁和非公平锁实现原理

热门文章

  1. 正确打开db文件的方式,避免乱码和无意义内容
  2. uboot移植——uboot的硬件驱动部分
  3. 斯皮尔曼相关系数范围_Spearman Rank(斯皮尔曼等级)相关系数
  4. 在线定时任务表达式生成连接
  5. 十四五规划下建筑企业智慧建造数字化转型规划战略
  6. 2021年武汉理工大学计算机考研复试详解 计算机学硕
  7. 小程序 发送模板消息的功能实现
  8. AForge 拍照 GetCurrentVideoFrame().GetHbitmap 内存不足
  9. 初次编译cximage遇到的一些错误
  10. ETL调度工具 taskctl-> Designer 设计IDE环境