欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-conclusion/


RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。但是这样是不能究其然,更不能究其所以然。博主这里翻阅了amqp-client的java客户端的源码,通过其来学习下AMQP协议,进而更深刻的了解RabbitMQ.

注:如无特殊说明,本系列的文章采用的amqp-client版本均为3.5.3。

本系列的文章主要是来阐述客户端与broker交互需要经历那些具体步骤,需要涉及那些重要的类以及方法,整体的轮廓又是如何。

本文主要涉及的类有(本系列的blog地址):
[一]RabbitMQ-客户端源码之ConnectionFactory
[二]RabbitMQ-客户端源码之AMQConnection
[三]RabbitMQ-客户端源码之ChannelManager
[四]RabbitMQ-客户端源码之Frame
[五]RabbitMQ-客户端源码之AMQChannel
[六]RabbitMQ-客户端源码之AMQCommand
[七]RabbitMQ-客户端源码之AMQPImpl+Method
[八]RabbitMQ-客户端源码之ChannelN
[九]RabbitMQ-客户端源码之Consumer

以发送消息来看看从源码级的逻辑流转情况。

首先看看发送消息的业务代码(部分主要的代码):

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();

从上面来看,主要牵涉ConnectionFactory, Connection, Channel这个几个类(有关Connection和Channel的AMQP流转流程可以参考文中最后部分)。实际情况是怎么样的呢,我们来分析下。

首先流转过程如下:

ConnectionFactory.newConnection()-- AMQConnection.start()-- MainLoop-- Frame frame =  SocketFrameHandler.readFrame()-- AMQChannel.handleFrame(Frame frame)
  • ConnectionFactory主要用来配置一些参数,并初始化AMQConnection, 这个版本的客户端与broker底层通信用的是java的原生Socket, 处理模块为SocketFrameHandler,SocketFrameHandler也在ConnectionFactory调用newConnection()时创建。之后根据参数以及SocketFrameHandler初始化了AMQConnection对象。
  • AMQConnection的核心在于这个start()方法。包括Protocol-Header, Connection.Start/.StartOk, Connection.Tune/.TuneOk, Connection.Open/.OpenOk,以及启动MainLoop线程
  • MainLoop是AMQConnection的内部私有类,主要用来(循环)读取(SocketFrameHandler.readFrame)并封装Socket中的帧Frame, 并进行进一步的处理,这个由AMQChannel来完成
  • ChannelManager,这个在上面并没有展示出来,但是这里也需要说明下,这个是在MainLoop中处理Frame所使用的,用来管理Channel的,确切的来说是ChannelN.
AMQChannel.handleFrame(Frame frame)-- AMQCommand.handleFrame(Frame frame) -- AMQChannel.handleCompleteInboundCommand(AMQCommand command)-- ChannelN.processAsync(AMQCommand command)

这个接着上面的流程继续:

AMQChannel.handleFrame(Frame frame)首先调用AMQCommand的handleFrame(Frame frame)方法来处理,AMQCommand内部其实是调用了CommandAssember(对Method, Content-Header以及Content-Body做了一下封装,其实忽略这个类也是可以的)的handleFrame, 说白了作用就是处理下Frame帧,方法返回值是boolean, 当一个AMQComand处理完毕后返回true,否则返回false。这里就有疑问了,什么叫做处理完毕?这里就又要说到MainLoop了,MainLoop线程主要循环读取Frame帧,像Connection.Start/.StartOk这种命令(AMQCommand)一般只包括Method类型的帧(Frame),AMQCommand的handleFrame方法直接返回true,但是像Basic.Publish这种命令一般包括Method帧,Content-Header帧,以及若干Content-Body帧,就需要handleFrame多次才能返回true。

AMQP技术术语
Method: 用于在节点之间传递特定类型的AMQP命令帧。
Content: 服务器和应用程序之间传送的数据.这个术语是“message”的同义词。
Content header:描述内容属性特定类型帧。
Content body: 包含原始应用程序数据的特定类型帧.内容体帧完全不透明-服务器不以任何方式检查或修改其body内容。

public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);}
}

上面这个是AMQChannel中的handleFrame方法,当内部的AMQCommand的handleFrame方法返回true,即表示处理完毕一条AMQCommand之后再调用handleCompleteInboundCommand方法进行进一步处理。而这个handleCompleteInboundCommand方法的精髓在于processAsync方法,这个processAsync方法在AMQChannel中是一个抽象方法,真正的实现要看ChannelN这个类。
说到这里有一个点我没有提及,但是这个不影响主流程的阐述,这个点就是rpc的概念,具体的可以详细参考对AMQChannel类的介绍——[五]RabbitMQ-客户端源码之AMQChannel。


文中开篇的demo示例,采用wireshark工具抓包可得:

这里用来参考,以便更好的阐述Connection类和Channel类。

Connection类
AMQP是一个连接协议. 连接设计为长期的,且可运载多个通道. 连接生命周期是这样的:

  • client打开与服务器的TCP/IP连接并发送一个协议头(protocol header).这只是client发送的数据,而不是作为方法格式的数据.
  • server使用其协议版本和其它属性,包括它支持安全机制列表(Start方法)进行响应.
  • client选择一种安全机制(Start-Ok).
  • server开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询(Secure).
  • client向server发送一个认证响应(Secure-Ok). 例如,对于使用"plain"机制,响应会包含登录用户名和密码.
    (server 重复质询(Secure) 或转到协商,发送一系列参数,如最大帧大小(Tune).)
  • client接受或降低这些参数(Tune-Ok).
  • client 正式打开连接并选择一个虚拟主机(Open).
  • 服务器确认虚拟主机是一个有效的选择 (Open-Ok).
  • 客户端现在使用希望的连接.
  • 一个节点(client 或 server) 结束连接(Close).
  • 另一个节点对连接结束握手(Close-Ok).
  • server 和 client关闭它们的套接字连接.

没有为不完全打开的连接上的错误进行握手. 根据成功协议头协商(后面有详细定义),在发送或收到Open 或Open-Ok之前,如果一个节点检测到错误,这个节点必须关闭socket,而不需要发送任何进一步的数据。

Channel类
AMQP是一个多通道协议. 通道提供了一种方式来将一个重量级TCP/IP连接分成多个轻量级连接。这使得协议对于防火墙更加友好,因为端口使用是可预测的. 这也意味着传输调整和网络服务质量可以得到更好的利用。通道是独立的,它们可以同时执行不同的功能,可用带宽会在当前活动之间共享。这是令人期待的,我们鼓励多线程客户端应用程序经常使用"每个通道一个线程"编程模型.。然而,从单个client打开一个或多个AMQP servers连接也是完全可以接受的.。
通道生命周期如下:

  • client打开一个新通道(Open).
  • server确认新通道准备就绪(Open-Ok).
  • client和server按预期来使用通道.
  • 一个节点(client或server) 关闭了通道(Close).
  • 另一个节点对通道关闭进行握手(Close-Ok).

附:本系列全集

  1. [Conclusion]RabbitMQ-客户端源码之总结
  2. [一]RabbitMQ-客户端源码之ConnectionFactory
  3. [二]RabbitMQ-客户端源码之AMQConnection
  4. [三]RabbitMQ-客户端源码之ChannelManager
  5. [四]RabbitMQ-客户端源码之Frame
  6. [五]RabbitMQ-客户端源码之AMQChannel
  7. [六]RabbitMQ-客户端源码之AMQCommand
  8. [七]RabbitMQ-客户端源码之AMQPImpl+Method
  9. [八]RabbitMQ-客户端源码之ChannelN
  10. [九]RabbitMQ-客户端源码之Consumer

欢迎跳转到本文的原文链接:https://honeypps.com/mq/rabbitmq-client-source-code-of-conclusion/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


[Conclusion]RabbitMQ-客户端源码之总结相关推荐

  1. RabbitMQ 客户端源码系列 - Channel

    前言 续上次分享 RabbitMQ 客户端源码系列 - Connection ,继续分享Channel相关的源码分析 (com.rabbitmq:amqp-client:4.8.3) 友情提醒:本次分 ...

  2. RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码

    目录 第十章-RabbitMQ之Spring客户端源码 1. 前言 2. 客户端消费代码 2.1 消费的实现方式 2.2 消费中注解解释 2.3 推测Spring实现过程 3.MQ消费源码分析 3.1 ...

  3. Spring源码深度解析(郝佳)-学习-Spring消息-整合RabbitMQ及源码解析

      我们经常在Spring项目中或者Spring Boot项目中使用RabbitMQ,一般使用的时候,己经由前人将配置配置好了,我们只需要写一个注解或者调用一个消息发送或者接收消息的监听器即可,但是底 ...

  4. boost::asio异步模式的C/S客户端源码实现

    异步模式的服务器源码 //g++ -g async_tcp_server.cpp -o async_tcp_server -lboost_system //#include <iostream& ...

  5. zookeeper 客户端_zookeeper进阶-客户端源码详解

    流程图 先看一下客户端源码的流程图 总体流程 总体流程 开启SendThread线程 开启EventThread 总结 下面根据源码讲解,大家整合源码和流程图一起看最好,本篇内容比较多建议收藏起来看. ...

  6. grpc-go客户端源码分析

    grpc-go客户端源码分析 代码讲解基于v1.37.0版本. 和grpc-go服务端源码分析一样,我们先看一段示例代码, const (address = "localhost:50051 ...

  7. WordPress Blog Android客户端源码分析(一)

    一直想找一个大型的Android开源项目进行分析,由于自身和导师课程需要选择了wordpress的Android客户端源码进行学习和解读.源码github官方下载地址:开源项目地址.分析源码的最佳手段 ...

  8. 使用live555客户端源码遇到的问题及解决方法

    使用live555客户端源码拉rtsp流遇到两个问题,正常测试拉取海康摄像头没问题: 1.拉有些厂商的rtsp流会间隔一段时间断开连接: 2.与大华摄像头建立连接时,发送DESCRIBE命令后很长时间 ...

  9. swift实现饭否应用客户端源码

    swift 版 iOS 饭否客户端 源码下载:http://code.662p.com/view/13318.html 饭否是中国大陆地区第一家提供微博服务的网站,被称为中国版Twitter.用户可通 ...

  10. TeamTalk客户端源码分析七

    TeamTalk客户端源码分析七 一,CBaseSocket类 二,select模型 三,样例分析:登录功能 上篇文章我们分析了network模块中的引用计数,智能锁,异步回调机制以及数据的序列化和反 ...

最新文章

  1. 独角兽之名:解读华云数据背后智慧商业谋略
  2. python代码怎么设置,如何设置PyCharm中的Python代码模版(推荐)
  3. UEFI主板GPT方式安装CentOS 6.4
  4. 三十八、Java集合中的ConcurrentHashMap
  5. 积分和人民币比率_通过比率路由到旧版和现代应用程序–通过Spring Cloud的Netflix Zuul...
  6. JavaScript中带有示例的Math.log10()方法
  7. 配电柜测试软件,低压配电柜测试方法及流程.docx
  8. 【洛谷2986】【USACO10MAR】伟大的奶牛聚集
  9. mysql8.0创建属性_MySQL8.0新特性——资源管理
  10. XCode5.1.1怎样实现代码块自己主动排版
  11. 【英语学习】【Level 08】U03 My Choice L5 The star that shines the brightest
  12. 简记ShuffleNetV1V2
  13. 使用 Java8的 stream对list数据去重,使用filter()过滤列表,list转map
  14. ESP32利用wi-fi获取B站粉丝数
  15. 刚刚,百度宣布造车!
  16. `算法竞赛题解` LCP 03. 机器人大冒险
  17. 林轩田机器学习基石--The Learning Problem
  18. MyBatis批量操作和多参数查询
  19. deny后加to do还是doing_区别的to do和doing用法的小技巧
  20. 【系列文章】面向自动驾驶的三维点云处理与学习(1)

热门文章

  1. 200819C阶段一C++面向对象的编程思想
  2. Perl 教学 Perl5中的引用(指针)
  3. React中的状态管理---Mobx
  4. 为Ubuntu Linux安装Docker CE Edge
  5. MySQL授权用户及密码恢复设置
  6. Juniper ex4200 端口镜像问题
  7. 自动完成--autoComplete插件(2)
  8. nagios nrpe
  9. SqliteHelper整理(转载)
  10. PyTorch-模型可视化工具TorchSummary