导语
  在之前的博客中通过对于Producer中SendResult的跟踪找到了在Client模块下的所有的封装以及消费的过程,深入到对接Remoting模块的接口中对消息的封装以及发送回收等。但是对于具体后续操作还是没有跟进,这篇博客就从this.remotingClient.invokeSync(addr, request, timeoutMillis);方法开始进入的Message的发送、消费、持久化等功能的探讨。

文章目录

  • 从invokeSync()方法开始
  • 总结

从invokeSync()方法开始

  在之前的分析中可以看到invokeSync()方法是来自于RemotingClient接口,并且对这个接口也有一定的说明,了解了这个接口只有一个实现类NettyRemotingClient,从类名上就可以看出来使用的是Netty的支持。那么对于invokeSync()方法的支持是什么样呢?方法总体如下图所示,下面就来一步步的分析该方法

1、创建一个Channel

  对于一个Channel的管理与创建,在之前分享RabbitMQ的时候简单的提到过,这里由于涉及到了Netty,而Netty本身就是支持了NIO的操作,既然支持了NIO的操作。这里就需要知道Channel的创建逻辑。


final Channel channel = this.getAndCreateChannel(addr);

  代码本身只创建了一个Channel并且通过调用了一个this.getAndCreateChannel(addr);方法来进行获取,那么按照之前创建Channel的逻辑,首先需要建立一个Connection,然后在利用这个Connection去复用一些Channel。那么猜测this.getAndCreateChannel(addr);这个方法中一定有关于Connection的创建逻辑。进入该方法。

 private Channel getAndCreateChannel(final String addr) throws InterruptedException {if (null == addr) {return getAndCreateNameserverChannel();}ChannelWrapper cw = this.channelTables.get(addr);if (cw != null && cw.isOK()) {return cw.getChannel();}return this.createChannel(addr);
}

  从方法体重可以看到还需要进行新的判断,并且有ChannelWrapper这样一个封装对象。这样从上面的逻辑来看真正实现第一次创建的是通过this.createChannel(addr);方法来进行操作的。果然在这个方法中找到了如下的一个操作,用过Netty的都知道这个操作是什么意思?这个操作就是为NIO操作创建一个Bootstrap。对于这块内容在后面逻辑中整合进行分析。这里首先知道这个方法是提供了故意而Channel的连接。至于其他的操作,都放到后续的对于Netty使用以及对于NIO详细说明来分析。这里主要是分析Message的流转。

 if (createNewConnection) {ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);cw = new ChannelWrapper(channelFuture);this.channelTables.put(addr, cw);
}

2、调用一个前置的RPCHook

  上这个分析就是创建了一个Channel的工作,这个Channel就是需要通信的信道。一般的操作都是由这个Channel来完成对于网络请求的发送和接收操作。


doBeforeRpcHooks(addr, request);protected void doBeforeRpcHooks(String addr, RemotingCommand request) {if (rpcHooks.size() > 0) {for (RPCHook rpcHook: rpcHooks) {rpcHook.doBeforeRequest(addr, request);}}
}

  在很多的框架代码中有一个统一的规则就是将实际的工作交给一个do开头的方法来进行操作。这里在消息请求到Consumer之前先做了一个前置的Hook,这里主要完成的工作就是对这个Request进行一个属性的扩展操作。会看到其实在调用发送方法之前对于Request添加了很多的扩展属性。

3、调用发送方法
  为了实现方法的解耦操作,这里使用了下面这个方法对Request消息进行发送。这个Request就是通过Client端进行了封装的操作。这里调用的同步模式,所以先分析关于同步调用的方法


RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);

  既然怒到这个方法之后最为重要的一个点就是ResponseFuture对象,在之前的博客中有一篇是关于分析Future模式的,对于Future模式,简单的说就是我们想要的结果。那么这里是对这个结果进行了封装,但是实际上通过Netty调用的Future并不是这个。关于这方面的知识可以了解关于Netty的相关知识这里不做过多的说明,有需要的话后续的分析中会提到。

  到这里所有的消息都通过 channel.writeAndFlush(request).addListener(new ChannelFutureListener())操作发送到了Channel中开水进入到Broker进行消息进步操作。后续的操作就是从ResponseFuture对象中,获取到我们预期的RemotingCommand Response。并且对这个Response进行了检查。

4、调用后置的RPCHook
  这个调用的深入会发现这个方法并没有被实现,是个空方法没有方法体。


doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);

总结

  从invokeSync()方法开始,描述了整个的消息核心发送过程,调用很清晰,实现的功能也比较简单。理解这个里需要对Netty有关的知识和NIO有关的知识做一定的了解。后续的分享中也会涉及到有关知识的讨论。

从源码分析RocketMQ系列-Producer的invokeSync()方法相关推荐

  1. 从源码分析RocketMQ系列-Producer的SendResult来自哪里?

    导语   对于消息中间件大家都应该不陌生,现在比较主流的消息中间件有Kafka.RabbitMQ.RocketMQ.ActiveMQ等等.前段时间花了很长时间分析了关于RocketMQ源码,之前也分享 ...

  2. 从源码分析RocketMQ系列-Producer的SendResult的封装

    导语   通过之前博客的Producer的SendResult来自哪里分析到发送的核心机制,了解了在发送之前被使用的几个Hook,以及发送消息的RequestHeader的封装,但是这些封装都被一个t ...

  3. 从源码分析RocketMQ系列-Remoting通信架构源码详解

    导语   这篇博客要从官方给出的一张图开始说起,之前的分析我们都是简单的分析了一下消息传递的流程,以及消息传递流程过程中出现的一些类的封装,并且提出,所有的封装操作都是为了更加高效的服务于NameSe ...

  4. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  5. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  6. 从源码分析RocketMQ系列-MQClientInstance类详解

    导语   在之前的分析中,看到有一个类MQClientInstance,这个无论是在Producer端还是在Consumer端都是很重要的一个类,很多的功能都是从这个类发起的,这边分享中就来详细的看看 ...

  7. 从源码分析RocketMQ系列-RocketMQ消息设计详解

    1 消息存储   消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构.PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘方式三 ...

  8. 从源码分析RocketMQ系列-Consumer消息接收逻辑

    导语   在前面的分析中分析了关于Producer发送消息的逻辑,并且追踪到了在DefaultMQPushConsumerImpl 类中的有对应的消息监听方法,这个消息监听的方法是从Consumer调 ...

  9. 从源码分析RocketMQ系列-start()方法详解

    导语   在之前的分析中主要介绍的是关于Producer 发送消息的逻辑,但是在实例代码中有一个操作是producer.start()方法,在Consumer中看到的方法是consumer.start ...

最新文章

  1. vivo 亿级优惠券系统架构设计与实践
  2. 聊聊Elasticsearch的ExponentiallyWeightedMovingAverage
  3. [洛谷P3387]【模板】缩点
  4. libQtCore.so.4相关错误
  5. c语言 如何创建adt_C语言探索之旅 | 第二部分第六课:创建你自己的变量类型
  6. Tkinter Learning:tkinter上实现视频流的播放(附源码)
  7. @Autowired和@Resource注解的区别?
  8. 【APICloud系列|38】 微信登录分享、QQ登录分享实现方法
  9. 一步步编写操作系统 51 加载内核4
  10. C#中获取当前时间:System.DateTime.Now.ToString()用法
  11. 前端工具 git笔记
  12. 小甲鱼python课后题简书_Python 练习题汇总
  13. 一种提升语音识别准确率的方法与流程
  14. 疯狂Android讲义
  15. BZOJ 4552 [Tjoi2016Heoi2016]排序 ——线段树 二分答案
  16. 微信小程序教程笔记4
  17. Win10安装Nek5000
  18. java运行 .class文件_运行java的class文件方法详解
  19. smb协议讲解_SMB协议操作共享文件
  20. 黑客教父龚蔚:扫码应用要警惕 公共WiFi攻不破支付软件

热门文章

  1. linux 解压安卓kernel,android kernel | 环境搭建 + 第一次尝试
  2. @Repository详解
  3. 越努力越幸运--动态数组vector
  4. DiscuzX系列命令执行分析公开(三连弹)
  5. 数字图像处理,图像锐化算法的C++实现
  6. 实验8 SQLite数据库操作
  7. IIS OCIEnvCreate failed with return code -1
  8. C# 代码重启windows服务
  9. C#窗体间的数据传值(转)
  10. 一步步教你实现富文本编辑器(第四部分)