amqp协议与pika库浅析
AMQP协议
简介
高级消息队列协议使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能。
为了完全实现消息中间件的互操作性,需要充分定义网络协议和消息代理服务的功能语义。
一套确定的消息交换功能,也就是“高级消息交换协议模型”。AMQP模型包括一套用于路由和存储消息的功能模块,以及一套在这些模块之间交换消息的规则。
一个网络线级协议(数据传输格式),客户端应用可以通过这个协议与消息代理和它实现的AMQP模型进行交互通信。
有关AMQP协议的文档可登陆官网,或者参考中文版博文amqp中文翻译文档
文档已经详细解释了
一.AMQP整体概述
下面的图显示了整体AMQ模型:
在AMQP之前的服务器中,它们会通过实现了特定类型路由和缓存的庞大引擎来完成. AMQ模块使用较小的模块结合更多样和稳健的方案来实现. 它把这些任务分成了两个不同角色:
交换器, 它接受来自生产者的消息并将它们路由到消息队列.
消息队列, 它存储消息消息并把它们转发给消费者应用程序.
AMQP提供了运行时程序语义,主要有两方面:
1. 运行时通过该协议可创建任意的交换器和消息队列类型的能力(有些是在标准中定义的,但可以添加其他作为服务器扩展)。
2. 运行时通过协议包装交换器和消息队列来创建任何需要的消息处理系统的能力.
1.消息队列(Message Queue)
消息队列用于在内存或磁盘上存储消息, 并将它们依次投递给一个或多个消费者应用程序.消息队列是消息存储和分发的实体. 每个消息队列是完全独立的,且是一个相当聪明的对象。
消息队列有多个属性:私有的或共享的, 持久的或临时的,客户端命名的或服务器端命名的等等.
通过选择希望的属性,我们可以使用消息队列来实现传统的中间件实体,如:
共享存储转发队列:它可以持有消息,并以round-robin方式在消费者之间分发消息.存储转发队列通常是在多个消费者之间是持久化的.
私有回复队列:它可以持有消息,并把消息转发给单个消费者. 回复队列通常是临时的,服务端命名的,且对于某个消费者来说是私有的.
私有订阅队列:它可持有来自不同订阅源的消息,并将它们转发给单个消费者.
订阅队列通常是临时的,服务器端命名的,并对于某个消费者来说是私有的.
2.交换器接收来自生产者应用程序的消息,并将它们按照事先约定的规则路由到消息队列中.
这些预先约定的规则或条件称为绑定. 交换器会与路由引擎匹配.
也就是说,他们会检查消息,并使用他们的绑定表来决定如何将这些消息转发到消息队列或其他交换器中。交换器永远不会存储信息。“交换器”一词是指一类算法或算法实例。
更确切的说,我们谈到了交换器类型和交换器实例.
AMQP定义了许多交换器类型,它们覆盖了常见消息路由分发的基础类型. AMQP服务器提供了这些交换器的默认实例.使用AMQP的应用程序可额外创建它们自己的交换器实例.交换器类型是命名的,这样创建交换器的应用程序就可以告知服务器他们使用的交换器类型.
交换器实现也可以是命名的,这样应用程序可指定如何绑定队列来发布消息.
交换器还可以做更多的消息路由.它们可作为服务器内的智能路由代理,按需接受消息和生产消息. 交换器概念的目的是定义一套模型或标准,使得可以合理地扩展AMQP服务器,因为可扩展性会对互操作产生影响.
下面的图展示了通过AMQ模块服务器的消息流:
客户端与服务端的消息流如图所示。
1.消息队列属性
当客户端程序创建了消息队列时,它可以选择一些重要的属性:
name - 如果没有指定,服务器会选择一个名称,并将其提供给客户端.一般来说,当应用程序共享消息队列时,它们已经对消息队列名称作了事先的约定,当一个应用程序需要出于其自身目的来要求队列时,它可让服务器提供一个名称.
exclusive - 如果设置了,队列将只属于当前连接,且在连接关闭时删除.
durable - 如果设置了, 消息会进行存储,并在服务器重启时激活. 当服务器重启时,它可能会丢失瞬时消息.
2. 队列生命周期
这里主要有两种消息队列生命周期:
持久化消息队列:它们可被多个消费者共享,并可独立地存在- 即.不管是否有消费者接收它们,它都可以继续存在收集消息.
临时消息队列:对某个消费者是私有的,只能绑定到此消费者.当消费者断开连接时,消息队列将被删除.
也存在一些变化,如共享消息队列会在最后一个消费才断开连接时删除消息队列.下面的图展示了临时消息队列创建和删除的过程:
1.协议命令 (类&方法)
中间件是复杂的,我们在设计协议结构的挑战是要驯服其复杂性。
我们的方法是基于类来建立传统API模型,这个类中包含方法,并定义了方法明确应该做什么.
这会导致大量的命令集合,但一个命令应该相对容易理解.
AMQP命令组合在类中.每个类都覆盖了一个特定功能领域.有此类是可选的 -每个节点都实现了需要支持的类.
有两种不同方法对话:
同步请求-响应,在其中一个节点发送请求,另一个节点发送回复.
同步请求和响应适用于性能不是关键的地方.
异步通知, 在其中,一个节点发送消息但不希望得到回复.异步方法适用于性能是至关重要的地方.
为使处理方法简单,我们为每个异步请求定义了不同的回复. 也就是说,没有哪个方法可作为两个不同请求的回复.这意味着一个节点,发送一个同步请求后,可以接受和处理传入的方法,直到得到一个有效的同步答复. 这使得AMQP与更加传统的RPC协议是有区别的.
二.AMQP协议主要类及流程
1).Connection类
AMQP是一个连接协议. 连接设计为长期的,且可运载多个通道. 连接生命周期是这样的:
client打开与服务器的TCP/IP连接并发送一个协议头(protocol header).这只是client发送的数据,而不是作为方法格式的数据.
server使用其协议版本和其它属性,包括它支持安全机制列表(Start方法)进行响应.
client选择一种安全机制(Start-Ok).
server开始认证过程, 它使用SASL的质询-响应模型(challenge-response model). 它向客户端发送一个质询(Secure).
client向server发送一个认证响应(Secure-Ok). 例如,对于使用”plain”机制,响应会包含登录用户名和密码.
server 重复质询(Secure) 或转到协商,发送一系列参数,如最大帧大小(Tune).
client接受或降低这些参数(Tune-Ok).
client 正式打开连接并选择一个虚拟主机(Open).
服务器确认虚拟主机是一个有效的选择 (Open-Ok).
客户端现在使用希望的连接.
一个节点(client 或 server) 结束连接(Close).
另一个节点对连接结束握手(Close-Ok).
server 和 client关闭它们的套接字连接.
没有为不完全打开的连接上的错误进行握手. 根据成功协议头协商(后面有详细定义),在发送或收到Open 或Open-Ok之前,如果一个节点检测到错误,这个节点必须关闭socket,而不需要发送任何进一步的数据。
2).Channel 类
AMQP是一个多通道协议. 通道提供了一种方式来将一个重量级TCP/IP连接分成多个轻量级连接.
这使得协议对于防火墙更加友好,因为端口使用是可预测的. 这也意味着传输调整和网络服务质量可以得到更好的利用.
通道是独立的,它们可以同时执行不同的功能,可用带宽会在当前活动之间共享.
这是令人期待的,我们鼓励多线程客户端应用程序经常使用”每个通道一个线程”编程模型.
然而,从单个client打开一个或多个AMQP servers连接也是完全可以接受的.
通道生命周期如下:
1. client打开一个新通道(Open).
2. server确认新通道准备就绪(Open-Ok).
3. client和server按预期来使用通道.
4. 一个节点(client或server) 关闭了通道(Close).
5. 另一个节点对通道关闭进行握手(Close-Ok).
3).Exchange 类
交换器类让应用程序来管理服务器上的交换器。这个类可以让应用程序脚本自己布线(而不是依赖于一些配置接口)。注:大多数应用程序不需要这个级别的复杂度,传统的中间件是不太可能能够支持这种语义。
交换器生命周期如下:
1. client 请求server确保交换器是否存在(Declare). client可细化到,”如果交换器不存在则进行创建”,或 "如果交换器不存在,警告我,不需要创建”.
2. client发布消息到交换器.
3. client可选择删除交换器(Delete).
4).Queue 类
queue类可让应用程序来管理服务器上的消息队列. 在几乎所有消费消息的应用程序中,这是基本步骤,至少要验证期望的消息队列是否实际存在.
持久化消息队列的生命周期相当简单:
1. client断言消息队列存在(Declare, 使用”passive”参数).
2. server确认消息队列存在(Declare-Ok).
3. client从消息队列中读取消息。
临时消息队列的生命周期更加有趣:
1. client创建消息队列(Declare,不提供队列名称,服务器会分配一个名称). server 确认(Declare-Ok).
2. client 在消息队列上启动一个消费者. 消费者的精确功能是由Basic类定义的。
3. client 取消消费者, 要么是显示取消,要么是通过关闭通道/连接隐式取消的
4. 当最后一个消费者从消息队列中消失的时候,在过了礼貌性超时后,server会删除消息队列.
AMQP 像消息队列一样为主题订阅实现了分发机制. 这使结构更多有趣,订阅可以在合作订阅应用程序池中进行负载均衡.
订阅生命周期涉及到额外的绑定阶段:
1. client 创建消息队列(Declare),server进行确认(Declare-Ok).
2. client 绑定消息队列到一个topic交换器 (Bind),server进行确认(Bind-Ok).
3. client像前面的例子来使用消息队列.
5).Basic 类
Basic 类实现本规范中描述的消息功能.它支持如下主要语义:
从client发送消息给server, 异步发生(Publish)
启动和停止消费者(Consume, Cancel)
从server发送消息给client, 异步发生(Deliver, Return)
应答消息(Ack, Reject)
同步从消息队列中取消息 (Get).
6).Transaction 类
AMQP 支持两种类型的事务:
1. 自动事务: 每个发布的消息和应答都处理为独立事务.
2. Server 本地事务, 服务器会缓存发布的消息和应答,并会根据需要由client来提交它们.
Transaction 类(“tx”) 使应用程序可访问第二种类型,即服务器事务。这个类的语义是:
1. 应用程序要求在每个通道中都有事务(Select).
2. 应用程序做一些工作(Publish, Ack).
3. 应用程序提交或回滚工作(Commit, Roll-back).
4. 应用程序做一些工作,循环往复。
三.AMQP 传输架构
这个章节解释了命令是如何映射到线路协议的.
1). 一般描述
AMQP是二进制协议. 信息被组织成各种类型的帧(frames). Frames可以携带协议方法和其它信息.所有 帧(frames)都有同样的格式: 帧头(frame header),帧负载(frame payload)和帧尾(frame end).帧负载( frame payload)的格式依赖于帧类型(frame type).
我们假设有一个可靠的面向流的网络传输层(TCP/IP或相当的).
在单个套接字连接中,可以存在多个独立控制线程,它们被称为通道.
每个帧都使用通道编号来编号.通过交织它们的帧,不同的通道共享连接。对于任何给定的通道,帧运行在一个严格的序列,这样可以用来驱动一个协议解析器(通常是一个状态机).
我们使用一组小的数据类型,如位,整数,字符串和字段表来构造帧。帧字段是紧密包装的,不会使得它们缓慢或解析复杂。从协议规范中生成框架层是相对简单的。
线路级格式的设计是可扩展性,一般可以用于任意的高层协议(不只是AMQP)。我们假设AMQP将来会扩展、改进,随时间的推移线路级格式仍然会得到支持。
2).数据类型
AMQP数据类型用于方法帧中,它们是:
Integers ( 1到8个字节),用来表示大小,数量,范围等等. Integers通常是无符号的,在帧中可能是未对齐的.
Bits,用来表示开/关值.位被包装成字节。
短字符串(short string),用来保存短的文本属性.短字符串限制为255个字节,可以在无缓冲区溢出的情况下进行解析.
长字符串(long string),用来保存二进制数据块.
字段表(Field tables),用来保存名称-值对(name-value pairs). 字段值类型可以是字符串,整数等等.
3).协议协商
AMQP client 和server 可对协议进行协商.这就是说当client连接时,server可处理client能接受或修改的操作.当两个点对结果达成一致时, 连接会继续前行.协商是一种有用的技术,因为它让我们可以断言假设和前提条件。在AMQP,我们协商协议的下面方面:
实现协议和版本. server 可在同一个端口上保存多个协议.
加密参数和两者之间的认证.这是功能层的一部分,以前解释过。
最大帧大小,通道数量,以及其它操作限制.
达成一致的限制可能会使两者重新分配关键缓存区以避免死锁.每个传入的帧要么服从达成的限制(这是安全的),或者超过它们(在这种情况下,另一方必须断开连接).这非常符合"它要么工作,要么就完全不工作”的AMQP哲学.
两个节点达成一致的最低限度为:
服务器必须告诉客户端它提出了什么限制。
客户端进行响应,并可能减少其连接的限制。
4). 限制帧
TCP/IP是一个流协议,即没有限制帧的内建机制. 现有协议可以几种不同的方式解决这个问题:
每个连接中只发送单个帧.这很简单,但很慢.
在流中添加帧定界符.这很简单,但解析较慢.
计算帧的大小, 并在每个帧的前面发送大小。这是简单和快速,和我们的选择.
5). 帧细节
所有的帧都由一个头(header,7个字节),任意大小的负载(payload),和一个检测错误的帧结束(frame-end)字节组成:
要读取一个帧,我们必须:
1. 读取header,检查帧类型(frame type)和通道(channel).
2. 根据帧类型,我们读取负载并进行处理.
3. 读取帧结束字节.
在实际实现中,如果性能很关键的话,我们应该使用读前缓冲(read-ahead buffering)”或“收集读取(gathering reads)”,以避免为了读一个帧而做三次独立的系统调用。
5.1 方法帧
方法帧可以携带高级协议命令(我们称之为方法(methods)).一个方法帧携带一个命令. 方法帧负载有下面的格式:
要处理一个方法帧,我们必须:
1. 读取方法帧负载.
2. 将其拆包成结构. 方法通常有相同的结构,因此我们可以快速对方法进行拆包.
3. 检查在当前上下文中是否允许出现方法.
4. 检查方法参数是否有效.
5.执行方法.
方法主体(bodies) 由AMQP数据字段(位,整数, 字符串和字符串表组成)构成. 编组代码直接从协议规范中生成,因此是非常快速地.
5.2 内容帧
内容是我们通常AMQP服务器在客户端与客户端之间传送和应用数据. 粗略地说,内容是由一组属性加上一个二进制数据部分组成的。它所允许的属性集合由Basic类定义,而这些属性的形式为内容头帧(content header frame)。其数据可以是任何大小,也有可能被分解成几个(或多个)块,每一个都有内容体帧(content body frame)。
看一个特定通道的帧,当它们在线路上传输时,我们可能会看到下面这样的东西:
某些方法(如Basic.Publish, Basic.Deliver等等.)通常情况下定义为传输内容.
当一个节点发送像这样的方法帧时,它总是会遵循一个内容头帧(conent header frame)和零个或多个内容体帧(content body frame)的形式.
一个内容头帧有下面的格式:
某些方法(如Basic.Publish, Basic.Deliver等等.)通常情况下定义为传输内容.
当一个节点发送像这样的方法帧时,它总是会遵循一个内容头帧(conent header frame)和零个或多个内容体帧(content body frame)的形式.
一个内容头帧有下面的格式:
我们将内容体放置在不同的帧中(并不包含在方法中),因此AMQP可支持零拷贝技术,这样其内容就不需要编组或编码. 我们将内容属性安放在它们自己的帧中,以便收件人可以有选择地丢弃他们不想处理的内容。
5.3 心跳帧
心跳是一种设计用来撤销(undo)TCP/IP功能的技术,也就是说在长时间超时后,它有能力通过关闭broker物理连接来进行恢复.在某些情景下,我们需要快速知道节点连接是否断开了,或者是由于什么原因不能响应了.因为心跳可以在较低水平上进行,我们在传输层次上按节点交换的特定帧类型来处理,而不是按类方法.
6). 错误处理
AMQP使用异常来处理错误.任何操作错误(未找到消息队列,访问权限不足)都会导致一个通道异常. 任何结构化的错误(无效参数,坏序列的方法.)都会导致一个连接异常.异常会关闭通道或连接,同时也会向客户端应用返回响应码和响应文本.我们使用了类似于HTTP等协议和其它大多数协议中的三位回复代码和文字回复文本方案.
7). 关闭通道和连接
连接或通道,对于客户端来说,当其发送Open时则被认为是“打开”的,对于服务器端来说,当其发送Open-Ok时则被认为是打开的。基于这一点,一个希望关闭通道或连接的对等体也必须使用握手协议来这样做。
可出于任何原因,可能会正常地或异常地关闭一个通道或连接-因此必须仔细小心。
对于突然或意外关闭,并不能得到快速探测,因此当发生异常时,我们可能会丢失错误回复代码。
正确的设计是对于所有关闭必须进行握手,使我们关闭后对方知道相应的情况。
当一个节点决定关闭一个通道或连接时,它发送一个Close方法。接收节点必须使用Close-Ok来响应Close,然后双方可以关闭他们的通道或连接。请注意,如果节点忽略了关闭,当两个节点同时发送Close时,可能会发生死锁。
此上只作为概述,详细文档说明请查阅官方文档。
pika浅析
Pika Python AMQP Client Library,pika是一个实现了AMQP协议的Python客户端。
本次分析的是pika 0.5.0版本,本次启动RabbitMQ作为服务端。
在上文的介绍中,客户端的架构和实现思路。
Connection类介绍
1.参考demo_get.py
import sys
import pika
import asyncore
import timeconn = pika.AsyncoreConnection(pika.ConnectionParameters((len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',credentials = pika.PlainCredentials('guest', 'guest'))) # 选用异步io处理,并输入连接地址和认证用户名与密码print 'Connected to %r' % (conn.server_properties,)qname = (len(sys.argv) > 2) and sys.argv[2] or 'test' # 设置队列名称ch = conn.channel() # 创建频道
ch.queue_declare(queue=qname, durable=True, exclusive=False, auto_delete=False) # 申请创建一个队列名称while conn.is_alive(): # 如果连接没有断开就一直查询服务端队列中是否有数据result = ch.basic_get(queue = qname) print result if isinstance(result, pika.spec.Basic.GetEmpty): # 如果返回的结果为空passelif isinstance(result, pika.spec.Basic.GetOk): # 如果申请队列成功ch.basic_ack(delivery_tag = result.delivery_tag) # 则回复服务端该通道的值else:raise Exception("Hmm, that's unexpected. basic_get should have returned either ""Basic.GetOk or Basic.GetEmpty",result)time.sleep(1)
主要是想服务端申请一个新的消息队列,然后一直获取该队列里是否有数据
2.参考demo_send.py
import sys
import pika
import asyncoreconn = pika.AsyncoreConnection(pika.ConnectionParameters((len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',credentials=pika.PlainCredentials('guest', 'guest'))) # 选用异步io处理,并输入连接地址和认证用户名与密码ch = conn.channel() # 创建频道
ch.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False) # 申请创建一个队列名称,如果该队列已经存在就返回该队列值ch.basic_publish(exchange='',routing_key="test",body="Hello World!",properties=pika.BasicProperties(content_type = "text/plain",delivery_mode = 2, # persistent),block_on_flow_control = True) # 在频道上使用默认的exchange,路由为test,内容为bodyconn.close() # 关闭连接,先将数据送出
pika.asyncore_loop() # 将发布的数据发出
3.参考demo_receive.py
import sys
import pika
import asyncoreconn = pika.AsyncoreConnection(pika.ConnectionParameters((len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',credentials = pika.PlainCredentials('guest', 'guest'),heartbeat = 10)) # 选用异步io处理,并输入连接地址和认证用户名与密码,并设置心跳时间print 'Connected to %r' % (conn.server_properties,)qname = (len(sys.argv) > 2) and sys.argv[2] or 'test'ch = conn.channel() # 创建频道
ch.queue_declare(queue=qname, durable=True, exclusive=False, auto_delete=False) # 申请创建一个队列名称,如果该队列已经存在就返回该队列值def handle_delivery(ch, method, header, body):print "method=%r" % (method,)print "header=%r" % (header,)print " body=%r" % (body,)ch.basic_ack(delivery_tag = method.delivery_tag) # 消费完成后向服务器发送消费完成标志ch.basic_consume(handle_delivery, queue = qname) # 消费指定队列上的数据,
pika.asyncore_loop() # 开启循环
print 'Close reason:', conn.connection_close
通过这几个例子我们先分析connection类,在pika中提供了两个链接类型异步链接和阻塞链接,现在分析异步链接
class AsyncoreConnection(pika.connection.Connection):def delayed_call(self, delay_sec, callback):add_oneshot_timer_rel(delay_sec, callback) # 添加时间和回调函数到时间堆timer_heap中def connect(self, host, port):self.dispatcher = RabbitDispatcher(self) # 初始化一个实例self.dispatcher.create_socket(socket.AF_INET, socket.SOCK_STREAM) # 新建一个连接实例self.dispatcher.connect((host, port or spec.PORT)) # 连接server端def disconnect_transport(self):if self.dispatcher: # 如果该实例有则关闭该连接实例self.dispatcher.close()def drain_events(self):loop(count = 1) # 处理网络io事件
基础类 connection
class Connection:def __init__(self, parameters, wait_for_open = True, reconnection_strategy = None):self.parameters = parameters # 参数self.reconnection_strategy = reconnection_strategy or NullReconnectionStrategy() # 重连参数处理方法self.connection_state_change_event = event.Event() # handler添加删除调用等处理类self._reset_per_connection_state() # 重置连接状态self.reconnect() # 重新连接if wait_for_open:self.wait_for_open() # 等待server返回opendef _reset_per_connection_state(self):self.state = codec.ConnectionState() # 连接状态self.server_properties = None # 服务器连接属性self.outbound_buffer = simplebuffer.SimpleBuffer() # 读写缓冲区self.frame_handler = self._login1 # 帧处理方法self.channels = {} # 通道self.next_channel = 0 # 下一个频道self.connection_open = False # 连接是否打开self.connection_close = None # 连接是否关闭self.bytes_sent = 0 # 已经发送的数据长度self.bytes_received = 0 # 已经接受的数据长度self.heartbeat_checker = None # 心跳包监测def delayed_call(self, delay_sec, callback): # 延迟调用函数子类必须实现"""Subclasses should override to call the callback after thespecified number of seconds have elapsed, using a timer, or athread, or similar."""raise NotImplementedError('Subclass Responsibility')def reconnect(self): # 重新连接self.ensure_closed() # 确保该连接已经关闭self.reconnection_strategy.on_connect_attempt(self) # 调用重连策略实例的尝试连接函数self._reset_per_connection_state() # 重置连接状态try:self.connect(self.parameters.host, self.parameters.port or spec.PORT) # 重新连接self.send_frame(self._local_protocol_header()) # 发送协议头信息except:self.reconnection_strategy.on_connect_attempt_failure(self) # 如果连接失败调用重连策略实例的连接失败函数raisedef connect(self, host, port): # 连接"""Subclasses should override to set up the outboundsocket."""raise NotImplementedError('Subclass Responsibility')def _local_protocol_header(self):return codec.FrameProtocolHeader(1,1,spec.PROTOCOL_VERSION[0],spec.PROTOCOL_VERSION[1]) # 将协议消息封装好def on_connected(self):self.reconnection_strategy.on_transport_connected(self)def handle_connection_open(self):self.reconnection_strategy.on_connection_open(self)self.connection_state_change_event.fire(self, True) # def handle_connection_close(self):self.reconnection_strategy.on_connection_closed(self)self.connection_state_change_event.fire(self, False)def addStateChangeHandler(self, handler, key = None):self.connection_state_change_event.addHandler(handler, key) # 添加处理handler和Keyif self.connection_open:handler(self, True)elif self.connection_close:handler(self, False)def delStateChangeHandler(self, key):self.connection_state_change_event.delHandler(key) # 删除handlerdef _set_connection_close(self, c):if not self.connection_close: # 如果连接没有关闭self.connection_close = c for chan in self.channels.values(): # 遍历频道列表chan._set_channel_close(c) # 关闭通道连接self.connection_open = False # 连接打开标志位设置falseself.handle_connection_close() # 关闭连接def close(self, code = 200, text = 'Normal shutdown'):if self.connection_open: # 如果打开连接标志位打开self.connection_open = False # 设置标志位为falsec = spec.Connection.Close(reply_code = code, # reply_text = text,class_id = 0,method_id = 0)self._rpc(0, c, [spec.Connection.CloseOk]) # 调用远端rpc方法self._set_connection_close(c) # 设置连接关闭self._disconnect_transport() # 关闭传输通道 def ensure_closed(self):if self.is_alive():self.close()def _disconnect_transport(self, reason = None):self.disconnect_transport() # 关闭连接实例self.on_disconnected(reason) # 关闭连接def disconnect_transport(self):"""Subclasses should override this to cause the underlyingtransport (socket) to close."""raise NotImplementedError('Subclass Responsibility')def on_disconnected(self, reason = 'Socket closed'):self._set_connection_close(spec.Connection.Close(reply_code = 0,reply_text = reason,class_id = 0,method_id = 0)) # 发送连接关闭的方法self.reconnection_strategy.on_transport_disconnected(self)def suggested_buffer_size(self):b = self.state.frame_max # 如果帧数据没有设置 则设置131072if not b: b = 131072return bdef on_data_available(self, buf): # 处理接受到的数据while buf:(consumed_count, frame) = self.state.handle_input(buf) # 将二进制数据解析成amqp帧数据self.bytes_received = self.bytes_received + consumed_countbuf = buf[consumed_count:]if frame:self.frame_handler(frame) # 当进行验证登录连接后,最后改方法调用_generic_frame_handler,如果为心跳# 则不处理,如果是频道的方法则调用channel的方法处理def _next_channel_number(self):tries = 0limit = self.state.channel_max or 32767while self.next_channel in self.channels:self.next_channel = (self.next_channel + 1) % limittries = tries + 1if self.next_channel == 0:self.next_channel = 1if tries > limit:raise NoFreeChannels()return self.next_channeldef _set_channel(self, channel_number, channel):self.channels[channel_number] = channeldef _ensure_channel(self, channel_number):if self.connection_close: # 检查连接没有关闭raise ConnectionClosed(self.connection_close)return self.channels[channel_number]._ensure() # 确保频道没有关闭def reset_channel(self, channel_number): # 删除频道号对应的handler和频道号if channel_number in self.channels:del self.channels[channel_number]def send_frame(self, frame):marshalled_frame = frame.marshal() # 将通道和发送的方法序列化self.bytes_sent = self.bytes_sent + len(marshalled_frame) # 记录总的发送数据量self.outbound_buffer.write(marshalled_frame) # 将数据写入到写缓冲区中#print 'Wrote %r' % (frame, )def send_method(self, channel_number, method, content = None): self.send_frame(codec.FrameMethod(channel_number, method)) # 发送帧,props = Nonebody = Noneif isinstance(content, tuple):props = content[0] # 如果是元组,则第一个数据为头部属性帧body = content[1] # 第二个数据为内容帧else:body = content # 只有内容帧if props: length = 0if body: length = len(body)self.send_frame(codec.FrameHeader(channel_number, length, props)) # 发送头部帧数据if body:maxpiece = (self.state.frame_max - \codec.ConnectionState.HEADER_SIZE - \codec.ConnectionState.FOOTER_SIZE) # 获取一个帧的最大长度body_buf = simplebuffer.SimpleBuffer( body ) # 写入到缓冲区while body_buf:piecelen = min(len(body_buf), maxpiece) # 比较如果要发出的数据大小与最大传输帧数据大小比较piece = body_buf.read_and_consume( piecelen ) # 从缓冲区中读出数据self.send_frame(codec.FrameBody(channel_number, piece)) # 发送内容帧直到缓冲区为空def _rpc(self, channel_number, method, acceptable_replies):channel = self._ensure_channel(channel_number) # 检查连接没有关闭self.send_method(channel_number, method) # 发送相应频道的方法return channel.wait_for_reply(acceptable_replies) # 注册回调函数def _login1(self, frame):if isinstance(frame, codec.FrameProtocolHeader): # 第一次链接时,返回的帧头一定是协议头raise ProtocolVersionMismatch(self._local_protocol_header(),frame)if (frame.method.version_major, frame.method.version_minor) != spec.PROTOCOL_VERSION: # 检查客户端与服务端支持的协议是否是一个版本raise ProtocolVersionMismatch(self._local_protocol_header(),frame)self.server_properties = frame.method.server_properties # 保存服务端的属性credentials = self.parameters.credentials or default_credentials # 认证信息response = credentials.response_for(frame.method) # 封装用户名与密码信息if not response:raise LoginError("No acceptable SASL mechanism for the given credentials",credentials)self.send_method(0, spec.Connection.StartOk(client_properties = \{"product": "Pika Python AMQP Client Library"},mechanism = response[0],response = response[1])) # 通过0号频道返回服务端打开ok函数并发送用户名与密码验证self.erase_credentials()self.frame_handler = self._login2 # 更换帧处理函数def erase_credentials(self):"""Override if in some context you need the object to forgetits login credentials after successfully opening a connection."""passdef _login2(self, frame):channel_max = combine_tuning(self.parameters.channel_max, frame.method.channel_max)frame_max = combine_tuning(self.parameters.frame_max, frame.method.frame_max)heartbeat = combine_tuning(self.parameters.heartbeat, frame.method.heartbeat)if heartbeat:self.heartbeat_checker = HeartbeatChecker(self, heartbeat) # 设置心跳包的参数self.state.tune(channel_max, frame_max) # 设置好协议中的频道号与频道最大长度self.send_method(0, spec.Connection.TuneOk(channel_max = channel_max,frame_max = frame_max,heartbeat = heartbeat)) # 给服务端返回TuneOk方法self.frame_handler = self._generic_frame_handler # 更新帧处理函数self._install_channel0() # 注册0频道self.known_hosts = \self._rpc(0, spec.Connection.Open(virtual_host = \self.parameters.virtual_host,insist = True),[spec.Connection.OpenOk]).known_hostsself.connection_open = Trueself.handle_connection_open()def is_alive(self):return self.connection_open and not self.connection_closedef _install_channel0(self):c = channel.ChannelHandler(self, 0)c.async_map[spec.Connection.Close] = self._async_connection_closedef channel(self):return channel.Channel(channel.ChannelHandler(self)) # 新建一个频道def wait_for_open(self):while (not self.connection_open) and \(self.reconnection_strategy.can_reconnect() or (not self.connection_close)):self.drain_events() # 确定链接已经打开def drain_events(self):"""Subclasses should override as required to wait for a fewevents -- perhaps running the dispatch loop once, or a smallnumber of times -- and dispatch them, and then to returncontrol to this method's caller, which will be waiting forsomething to have been set by one of the event handlers."""raise NotImplementedError('Subclass Responsibility')def _async_connection_close(self, method_frame, header_frame, body):self.send_method(0, spec.Connection.CloseOk())self._set_connection_close(method_frame.method)def _generic_frame_handler(self, frame):#print "GENERIC_FRAME_HANDLER", frameif isinstance(frame, codec.FrameHeartbeat): # 如果是心跳帧则不处理pass # we already counted the received bytes for our heartbeat checkerelse: self.channels[frame.channel_number].frame_handler(frame) # 如果是其他类型的帧调用对应频道的帧处理函数
connection主要就是接受服务端发来的数据,并根据协议解析成对应的帧,将要发送出去的数据进行解析成amqp协议的二进制数据。
其中connection对接受的数据解析主要是由ConnectionState类来实现,并将解析出来的数据返回给handler继续处理。
class ConnectionState:HEADER_SIZE = 7 # 头部数据大小FOOTER_SIZE = 1 # 尾部数据大小def __init__(self):self.channel_max = Noneself.frame_max = Noneself._return_to_idle() # 将数据重置def tune(self, channel_max, frame_max):self.channel_max = channel_max # 设置服务端返回的最大频道数self.frame_max = frame_max # 服务端的最大帧长度def _return_to_idle(self):self.inbound_buffer = []self.inbound_available = 0self.target_size = ConnectionState.HEADER_SIZE # 先解析头部数据大小self.state = self._waiting_for_header # 等待解析头部数据def _inbound(self):return ''.join(self.inbound_buffer) # 将解析出的数据拼接def handle_input(self, received_data):total_bytes_consumed = 0 # 处理数据的统计while True:if not received_data: # 如果处理完毕或者传入数据为空,则返回处理的数据大小return (total_bytes_consumed, None)bytes_consumed = self.target_size - self.inbound_available # target_size会根据接受的帧类型不同而变化,第一次接受头帧,长度就为七if len(received_data) < bytes_consumed: bytes_consumed = len(received_data)self.inbound_buffer.append(received_data[:bytes_consumed]) # 处理头部的数据self.inbound_available = self.inbound_available + bytes_consumed # 记录已经消费的数据received_data = received_data[bytes_consumed:] # 获取未被消费的数据total_bytes_consumed = total_bytes_consumed + bytes_consumed # 记录总的消费大小if self.inbound_available < self.target_size: return (total_bytes_consumed, None)maybe_result = self.state(self._inbound()) # 解析头部数据if maybe_result:return (total_bytes_consumed, maybe_result)def _waiting_for_header(self, inbound):# Here we switch state without resetting the inbound_buffer,# because we want to keep the frame header.if inbound[:3] == 'AMQ':# Protocol header.self.target_size = 8self.state = self._waiting_for_protocol_header # 如果是AMQ开头就是消息头解析消息头数据else:self.target_size = struct.unpack_from('>I', inbound, 3)[0] + \ConnectionState.HEADER_SIZE + \ConnectionState.FOOTER_SIZE # 如果不是头部消息数据,则获取帧的实际长度self.state = self._waiting_for_body # 解析帧body数据def _waiting_for_body(self, inbound):if ord(inbound[-1]) != spec.FRAME_END: # 如果数据结尾不是固定帧结尾则报错raise InvalidFrameError("Invalid frame end byte", inbound[-1])self._return_to_idle() # 重置数据(frame_type, channel_number) = struct.unpack_from('>BH', inbound, 0) # 解析帧类型, 解析出频道号if frame_type == spec.FRAME_METHOD: # 如果为方法帧method_id = struct.unpack_from('>I', inbound, ConnectionState.HEADER_SIZE)[0] # 解析出方法的id method = spec.methods[method_id]() # 查找出该方法并实例化改类method.decode(inbound, ConnectionState.HEADER_SIZE + 4) # 调用该实例解析数据return FrameMethod(channel_number, method) # 返回帧方法对象elif frame_type == spec.FRAME_HEADER: # 如果该帧为头部帧(class_id, body_size) = struct.unpack_from('>HxxQ', inbound,ConnectionState.HEADER_SIZE) # 解析出类的id,解析出body大小props = spec.props[class_id]() # 解析出属性props.decode(inbound, ConnectionState.HEADER_SIZE + 12) # 解析出属性的数据return FrameHeader(channel_number, body_size, props) # 返回头部帧格式elif frame_type == spec.FRAME_BODY: # 如果帧类型为bodyreturn FrameBody(channel_number,inbound[ConnectionState.HEADER_SIZE : -ConnectionState.FOOTER_SIZE]) # 解析body的数据elif frame_type == spec.FRAME_HEARTBEAT: # 如果为心跳帧return FrameHeartbeat() # 返回心跳帧else:# Ignore the frame.return Nonedef _waiting_for_protocol_header(self, inbound):if inbound[3] != 'P':raise InvalidProtocolHeader(inbound)self._return_to_idle() # 重置数据(th, tl, vh, vl) = struct.unpack_from('BBBB', inbound, 4) # 解析协议头部数据return FrameProtocolHeader(th, tl, vh, vl)
ch = conn.channel()
def channel(self):return channel.Channel(channel.ChannelHandler(self)) # 新建一个频道
由于amqp协议是多通道协议,那就意味着一个连接可以处理多个频道,在新建频道的时候,会实例化一个ChannelHandler当做参数实例化一个Channel实例
class ChannelHandler:def __init__(self, connection, channel_number = None):self.connection = connection # 连接实例self.frame_handler = self._handle_method # 帧处理方法self.channel_close = None # 频道关闭标志位self.async_map = {} self.reply_map = Noneself.flow_active = True ## we are permitted to transmit, so True.self.channel_state_change_event = event.Event() # 频道事件更改实例self.flow_active_change_event = event.Event()if channel_number is None:self.channel_number = connection._next_channel_number() # 获取一个频道号else:self.channel_number = channel_number connection._set_channel(self.channel_number, self) # 在连接实例中保存频道号对应的handlerdef _async_channel_close(self, method_frame, header_frame, body): # 关闭频道self._set_channel_close(method_frame.method) self.connection.send_method(self.channel_number, spec.Channel.CloseOk()) # 发送关闭连接def _async_channel_flow(self, method_frame, header_frame, body):self.flow_active = method_frame.method.activeself.flow_active_change_event.fire(self, self.flow_active)self.connection.send_method(self.channel_number,spec.Channel.FlowOk(active = self.flow_active))def _ensure(self): # 检查该频道是否关闭if self.channel_close:raise ChannelClosed(self.channel_close)return selfdef _set_channel_close(self, c): # 关闭该频道if not self.channel_close:self.channel_close = cself.connection.reset_channel(self.channel_number)self.channel_state_change_event.fire(self, False)def addStateChangeHandler(self, handler, key = None):self.channel_state_change_event.addHandler(handler, key)handler(self, not self.channel_close)def addFlowChangeHandler(self, handler, key = None):self.flow_active_change_event.addHandler(handler, key)handler(self, self.flow_active)def wait_for_reply(self, acceptable_replies):if not acceptable_replies: # 如果没有回复的注册帧就返回# One-way.returnif self.reply_map is not None:raise RecursiveOperationDetected([p.NAME for p in acceptable_replies])reply = [None]def set_reply(r):reply[0] = rself.reply_map = {}for possibility in acceptable_replies:self.reply_map[possibility] = set_reply # 将传入的回复值保持while True: # 循环直到收到回复self._ensure() self.connection.drain_events() # 调用事件循环检查if reply[0]: return reply[0] # 当注册事件回复后,则调用方式此时结束循环def _handle_async(self, method_frame, header_frame, body):method = method_frame.methodmethodClass = method.__class__if self.reply_map is not None and methodClass in self.reply_map: # 如果该方法在等待回复的处理方法中if header_frame is not None:method._set_content(header_frame.properties, body)handler = self.reply_map[methodClass] # 找到注册的方法self.reply_map = Nonehandler(method) # 调用注册等待处理的handler处理elif methodClass in self.async_map: self.async_map[methodClass](method_frame, header_frame, body) # 如果是channel注册的方法,则调用该方法处理 Channel.Close spec.Channel.Flowelse: # spec.Basic.Deliver spec.Basic.Returnself.connection.close(spec.NOT_IMPLEMENTED,'Pika: method not implemented: ' + methodClass.NAME) # 如果都不是则关闭连接def _handle_method(self, frame):if not isinstance(frame, codec.FrameMethod): # 如果该帧不是方法帧raise UnexpectedFrameError(frame)if spec.has_content(frame.method.INDEX): # 如果是注册中的方法self.frame_handler = self._make_header_handler(frame) # 替换注册方法中的帧处理方法else:self._handle_async(frame, None, None) # 调用频道中注册的方法处理def _make_header_handler(self, method_frame):def handler(header_frame):if not isinstance(header_frame, codec.FrameHeader): # 如果传入的帧不是头帧则报错raise UnexpectedFrameError(header_frame)self._install_body_handler(method_frame, header_frame) # 处理保存的方法帧和头帧return handlerdef _install_body_handler(self, method_frame, header_frame): seen_so_far = [0]body_fragments = []def handler(body_frame):if not isinstance(body_frame, codec.FrameBody): # 处理body帧,如果不是body帧则报错raise UnexpectedFrameError(body_frame)fragment = body_frame.fragment # 内容帧的内容数据seen_so_far[0] = seen_so_far[0] + len(fragment) # 保存已经处理的帧数据长度body_fragments.append(fragment) # 保存到数组中if seen_so_far[0] == header_frame.body_size: # 如果记录的处理的内容帧长度和头帧的数据长度一致则调用finishfinish()elif seen_so_far[0] > header_frame.body_size: # 如果大于长度则报错raise BodyTooLongError()else:passdef finish():self.frame_handler = self._handle_method # 接受完内容指针后重置帧处理方法self._handle_async(method_frame, header_frame, ''.join(body_fragments)) # 处理方法帧,头帧,和内容数据if header_frame.body_size == 0: # 如果头帧的数据大小为9 则调用finishfinish()else:self.frame_handler = handler # 重置帧处理方法def _rpc(self, method, acceptable_replies):self._ensure() # 检查通道是否关闭return self.connection._rpc(self.channel_number, method, acceptable_replies) # 调用连接的_rpc方法def content_transmission_forbidden(self):return not self.flow_active
handler主要就是解析频道的帧数据,并处理,管理频道的连接关闭等。
Channel类主要就是封装了handler的一些方法
class Channel(spec.DriverMixin):def __init__(self, handler):self.handler = handler # channel实例self.callbacks = {} # 回调函数self.pending = {} self.next_consumer_tag = 0 handler.async_map[spec.Channel.Close] = handler._async_channel_close # 注册handler中的async_map处理函数handler.async_map[spec.Channel.Flow] = handler._async_channel_flowhandler.async_map[spec.Basic.Deliver] = self._async_basic_deliverhandler.async_map[spec.Basic.Return] = self._async_basic_returnself.handler._rpc(spec.Channel.Open(), [spec.Channel.OpenOk]) # 注册打开openok函数def addStateChangeHandler(self, handler, key = None):self.handler.addStateChangeHandler(handler, key)def addFlowChangeHandler(self, handler, key = None):self.handler.addFlowChangeHandler(handler, key)def _async_basic_deliver(self, method_frame, header_frame, body):"""Cope with reentrancy. If a particular consumer is still active when anotherdelivery appears for it, queue the deliveries up until it finally exits."""consumer_tag = method_frame.method.consumer_tagif consumer_tag not in self.pending:q = []self.pending[consumer_tag] = qconsumer = self.callbacks[consumer_tag]consumer(self, method_frame.method, header_frame.properties, body)while q:(m, p, b) = q.pop(0)consumer(self, m, p, b)del self.pending[consumer_tag]else:self.pending[consumer_tag].append((method_frame.method, header_frame.properties, body))def _async_basic_return(self, method_frame, header_frame, body):raise NotImplementedError("Basic.Return")def close(self, code = 0, text = 'Normal close'):try:c = spec.Channel.Close(reply_code = code,reply_text = text,class_id = 0,method_id = 0) # 关闭频道连接self.handler._rpc(c, [spec.Channel.CloseOk]) # 调用handler的rpc方法except ChannelClosed:passself.handler._set_channel_close(c) # 设置通道关闭return self.handler.channel_close def basic_publish(self, exchange, routing_key, body, properties = None, mandatory = False, immediate = False, block_on_flow_control = False):if self.handler.content_transmission_forbidden():if block_on_flow_control:while self.handler.content_transmission_forbidden():self.handler.connection.drain_events()else:raise ContentTransmissionForbidden(self)properties = properties or spec.BasicProperties()self.handler.connection.send_method(self.handler.channel_number,spec.Basic.Publish(exchange = exchange,routing_key = routing_key,mandatory = mandatory,immediate = immediate),(properties, body))def basic_consume(self, consumer, queue = '', no_ack = False, exclusive = False, consumer_tag = None):tag = consumer_tagif not tag:tag = 'ctag' + str(self.next_consumer_tag)self.next_consumer_tag += 1if tag in self.callbacks:raise DuplicateConsumerTag(tag)self.callbacks[tag] = consumerreturn self.handler._rpc(spec.Basic.Consume(queue = queue,consumer_tag = tag,no_ack = no_ack,exclusive = exclusive),[spec.Basic.ConsumeOk]).consumer_tagdef basic_cancel(self, consumer_tag):if not consumer_tag in self.callbacks:raise UnknownConsumerTag(consumer_tag)self.handler._rpc(spec.Basic.Cancel(consumer_tag = consumer_tag),[spec.Basic.CancelOk])del self.callbacks[consumer_tag]
由于_rpc调用的是由amqp协议定好的数据,所以只需要将相应方法的编码和解码方式规定好后,就可以通过方法帧传递达到rpc的效果
class Connection(pika.specbase.Class):INDEX = 0x000A ## 10NAME = 'Connection'class Start(pika.specbase.Method):INDEX = 0x000A000A ## 10, 10; 655370NAME = 'Connection.Start'def __init__(self, version_major = 0, version_minor = 8, server_properties = None, mechanisms = 'PLAIN', locales = 'en_US'):self.version_major = version_majorself.version_minor = version_minorself.server_properties = server_propertiesself.mechanisms = mechanismsself.locales = localesdef decode(self, encoded, offset = 0):self.version_major = struct.unpack_from('B', encoded, offset)[0] # 协议解析版本offset = offset + 1self.version_minor = struct.unpack_from('B', encoded, offset)[0]offset = offset + 1(self.server_properties, offset) = pika.table.decode_table(encoded, offset) # 解析服务器属性length = struct.unpack_from('>I', encoded, offset)[0] # 解析长度offset = offset + 4 self.mechanisms = encoded[offset : offset + length] # 解析文本信息offset = offset + lengthlength = struct.unpack_from('>I', encoded, offset)[0] offset = offset + 4self.locales = encoded[offset : offset + length]offset = offset + lengthreturn selfdef encode(self):pieces = []pieces.append(struct.pack('B', self.version_major)) # 将客户端的协议转换成二进制pieces.append(struct.pack('B', self.version_minor))pika.table.encode_table(pieces, self.server_properties)pieces.append(struct.pack('>I', len(self.mechanisms))) pieces.append(self.mechanisms)pieces.append(struct.pack('>I', len(self.locales)))pieces.append(self.locales)return pieces # 返回转换后的二进制协议
其他的方法都是如此详细情况可看spec.py文件
分析demp_get.py中的basic_get的执行流程
我们展开分析demp_get.py
我们之间调到所有连接信息已经初始化完成后。
此时.
result = ch.basic_get(queue = qname)
此时调用Channel中的
def basic_get(self, ticket = 1, queue = None, no_ack = False):return self.handler._rpc(Basic.Get(ticket = ticket, queue = queue, no_ack = no_ack),[Basic.GetOk, Basic.GetEmpty])
此时调用了handler的_rpc方法:
self._ensure() # 检查通道是否关闭return self.connection._rpc(self.channel_number, method, acceptable_replies) # 调用连接的_rpc方法
这里写代码片
此时调用了connections的_rpc方法:
def _rpc(self, channel_number, method, acceptable_replies):channel = self._ensure_channel(channel_number) # 检查连接没有关闭self.send_method(channel_number, method) # 发送相应频道的方法return channel.wait_for_reply(acceptable_replies) # 注册回调函数
调用了send_method方法:
def send_method(self, channel_number, method, content = None): self.send_frame(codec.FrameMethod(channel_number, method)) # 发送帧,props = Nonebody = Noneif isinstance(content, tuple):props = content[0] # 如果是元组,则第一个数据为头部属性帧body = content[1] # 第二个数据为内容帧else:body = content # 只有内容帧if props: length = 0if body: length = len(body)self.send_frame(codec.FrameHeader(channel_number, length, props)) # 发送头部帧数据if body:maxpiece = (self.state.frame_max - \codec.ConnectionState.HEADER_SIZE - \codec.ConnectionState.FOOTER_SIZE) # 获取一个帧的最大长度body_buf = simplebuffer.SimpleBuffer( body ) # 写入到缓冲区while body_buf:piecelen = min(len(body_buf), maxpiece) # 比较如果要发出的数据大小与最大传输帧数据大小比较piece = body_buf.read_and_consume( piecelen ) # 从缓冲区中读出数据self.send_frame(codec.FrameBody(channel_number, piece)) # 发送内容帧直到缓冲区为空
如果是basic_get就直接调用send_frame方法:
def send_frame(self, frame):marshalled_frame = frame.marshal() # 将通道和发送的方法序列化self.bytes_sent = self.bytes_sent + len(marshalled_frame) # 记录总的发送数据量self.outbound_buffer.write(marshalled_frame) # 将数据写入到写缓冲区中
此时就将对应方法帧的数据进行二进制协议编码后直接就写到写缓冲区中,此时异步io的写事件就会注册,此时就会将数据发送出去。
当send_method执行完成后,就执行:
return channel.wait_for_reply(acceptable_replies) # 注册回调函数
调用了channel的wait_for_reply方法
def wait_for_reply(self, acceptable_replies):if not acceptable_replies: # 如果没有回复的注册帧就返回# One-way.returnif self.reply_map is not None:raise RecursiveOperationDetected([p.NAME for p in acceptable_replies])reply = [None]def set_reply(r):reply[0] = rself.reply_map = {}for possibility in acceptable_replies:self.reply_map[possibility] = set_reply # 将传入的回复值保持while True: # 循环直到收到回复self._ensure() self.connection.drain_events() # 调用事件循环检查if reply[0]: return reply[0] # 当注册事件回复后,则调用方式此时结束循环
由于目前选择的是异步io,所以self.connection.drain_events()
调用
def drain_events(self):loop(count = 1) # 处理网络io事件
此时,调用loop函数
def loop(map = None, count = None):if map is None:map = asyncore.socket_map # 调用asyncore的连接mapif count is None:while (map or timer_heap):loop1(map)else:while (map or timer_heap) and count > 0: # 执行循环时间loop1(map)count = count - 1run_timers_internal() # 检查并执行到期的回调函数
调用loop1函数
def loop1(map):if map:asyncore.loop(timeout = next_event_timeout(), map = map, count = 1) # 调用asyncore包中的事件循环函数else:time.sleep(next_event_timeout())
此时连接的注册处理类为RabbitDispatcher:
class RabbitDispatcher(asyncore.dispatcher):def __init__(self, connection):asyncore.dispatcher.__init__(self) # 调用asyncore的构造方法,添加监听连接对象self.connection = connection # 连接的实例def handle_connect(self):self.connection.on_connected() # 当连接成功的时候的处理方法def handle_close(self):self.connection.on_disconnected() # 连接关闭的时候调用的方法self.connection.dispatcher = Noneself.close() # 关闭连接def handle_read(self):try:buf = self.recv(self.connection.suggested_buffer_size()) #读连接中有数据读时,避免一个帧的数据解析出记录读出的buf (设计不是很好)except socket.error, exn:if hasattr(exn, 'errno') and (exn.errno == EAGAIN):# Weird, but happens very occasionally.returnelse:self.handle_close() # 如果报错就关闭连接returnif not buf: # 如果没有数据则关闭连接self.close()returnself.connection.on_data_available(buf) # 处理接受的数据def writable(self):return bool(self.connection.outbound_buffer) # 如果连接的写缓冲区中有数据则该连接可写def handle_write(self):fragment = self.connection.outbound_buffer.read() # 从缓冲区中读出数据r = self.send(fragment) # 发送出数据 self.connection.outbound_buffer.consume(r) # 发送多少数据就从缓冲区中清除多少数据
当服务端将处理数据返回时,会触发handle_read方法,当数据接收后,就会调用
self.connection.on_data_available(buf) # 处理接受的数据
该方法如下
def on_data_available(self, buf): # 处理接受到的数据while buf:(consumed_count, frame) = self.state.handle_input(buf) # 将二进制数据解析成amqp帧数据self.bytes_received = self.bytes_received + consumed_countbuf = buf[consumed_count:]if frame:self.frame_handler(frame) # 当进行验证登录连接后,最后改方法调用_generic_frame_handler,如果为心跳# 则不处理,如果是频道的方法则调用channel的方法处理
当客户端与服务端连接完成后frame_handler就会被设置成_generic_frame_handler,该方法会调用对应通道frame_handler
def _generic_frame_handler(self, frame):#print "GENERIC_FRAME_HANDLER", frameif isinstance(frame, codec.FrameHeartbeat): # 如果是心跳帧则不处理pass # we already counted the received bytes for our heartbeat checkerelse: self.channels[frame.channel_number].frame_handler(frame) # 如果是其他类型的帧调用对应频道的帧处理函数
在channel中的frame_handler,该方法会是_handle_method:
def _handle_method(self, frame):if not isinstance(frame, codec.FrameMethod): # 如果该帧不是方法帧raise UnexpectedFrameError(frame)if spec.has_content(frame.method.INDEX): # 如果是注册中的方法self.frame_handler = self._make_header_handler(frame) # 替换注册方法中的帧处理方法else:self._handle_async(frame, None, None) # 调用频道中注册的方法处理
此时返回的方法不会在注册列表中找到,会执行_handle_async
def _handle_async(self, method_frame, header_frame, body):method = method_frame.methodmethodClass = method.__class__if self.reply_map is not None and methodClass in self.reply_map: # 如果该方法在等待回复的处理方法中if header_frame is not None:method._set_content(header_frame.properties, body)handler = self.reply_map[methodClass] # 找到注册的方法self.reply_map = Nonehandler(method) # 调用注册等待处理的handler处理elif methodClass in self.async_map: self.async_map[methodClass](method_frame, header_frame, body) # 如果是channel注册的方法,则调用该方法处理 Channel.Close spec.Channel.Flowelse: # spec.Basic.Deliver spec.Basic.Returnself.connection.close(spec.NOT_IMPLEMENTED,'Pika: method not implemented: ' + methodClass.NAME) # 如果都不是则关闭连接
此时找到注册的回调函数后,就会执行handler(method) :
此时
reply = [None]def set_reply(r):reply[0] = rself.reply_map = {}for possibility in acceptable_replies:self.reply_map[possibility] = set_reply # 将传入的回复值保持while True: # 循环直到收到回复self._ensure() self.connection.drain_events() # 调用事件循环检查if reply[0]: return reply[0] # 当注册事件回复后,则调用方式此时结束循环
此时reply[0]值不为空,此时循环结束,从而完成结果的返回得到result
至此,一个basic_get的请求完成。
amqp协议与pika库浅析相关推荐
- RabbitMQ学习笔记和AMQP协议浅析
目录 RabbitMQ MQ的相关概念 消息队列协议 消息持久化 消息的分发策略 docker安装RabbitMQ AMQP协议 RabbitMQ的几种模式 简单simple模式 发布/订阅fanou ...
- RabbitMQ MQTT协议和AMQP协议
RabbitMQ MQTT协议和AMQP协议 1 序言... 1 1.1 RabbitMq结构... 1 1.2 RabbitMq消息接收... 4 1.3 Ex ...
- 物联网常见协议之Amqp协议及使用场景解析
摘要:本文围绕AMQP协议,为大家详细解析AMQP协议.核心技术亮点.多协议之间的对比以及使用实践. 本文分享自华为云社区<物联网常见协议之Amqp协议及使用场景解析>,作者:张俭. 引言 ...
- RabbitMQ简介以及AMQP协议
RabbitMQ能为你做些什么? 消息系统允许软件.应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步 ...
- RabbitMQ与AMQP协议详解
消息队列的历史 了解一件事情的来龙去脉,将不会对它感到神秘.让我们来看看消息队列(Message Queue)这项技术的发展历史. Message Queue的需求由来已久,80年代最早在金融交易中, ...
- RabbitMq详解之AMQP协议
AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信.通过了解Rabb ...
- JMS规范和AMQP协议
1.JMS经典模式详解 JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middle ...
- amqp协议_AMQP协议、模型及RabbitMQ常用组件
大家好,我是小T 今天咱们来介绍RabbitMQ的消息发送的原理 ^-^ RabbitMQ作为一款消息中间件,它的核心功能主要是消息的收发.消息收发的媒介是通过网络传输来实现的. RabbitMQ最底 ...
- 2-RabbitMQ核心概念及AMQP协议
RabbitMQ核心概念及AMQP协议 [root@Centos ~]# rabbitmq-plugins list Configured: E = explicitly enabled; e = i ...
最新文章
- 二维码QR Code不是一个产品,是一个功能
- 使用 github 和 jitpack 构建 android 依赖
- Android OpenGL ES 2.0绘制简单三角形
- STM32 基础系列教程 21 - NVIC
- python爬虫之喜马拉雅非vip音频下载
- 计算机u盘启动进不去怎么办,U盘启动盘怎么进不了PE系统 该如何解决
- 外星人m15键盘灯光设置_机·教学贴:ALIENWARE m15的音效设置还能怎样玩?
- ceph-deploy的calamari命令
- 润乾打印控制解决方案
- 小白易学-ps印章制作图文教程+百余个视频教程,见者有份
- AI脚本插件开发-批量加边框-图层边框-可视边界-黑色描边-插件制作源码-illustrator插件开发
- Linux定时任务与开机自启动脚本
- JavaWeb(4)JavaScript高级
- windows与ipad互联传文件
- 腾讯造「国产」机器狗,花式走梅花桩
- 聊聊gorm的OnConflict
- 玩转CVM:Web服务搭建
- 混频对两路同频同源射频信号相位差的影响
- 招商头条:商务部2019年将推进自贸区自贸港建设;深圳去年减免税额2411亿元;西安3D打印特色小镇项目签约
- 难产视频软件测试,让i3、i5无路可走 AMD Ryzen 3 3300X/ 3100首发评测
热门文章
- 全国大学生数学建模竞赛中,哈工大被禁用MATLAB
- 程序员会懂的冷笑话:各大编程语言的内心独白
- 讯飞智能语音先锋者:等到人机交互与人类交流一样自然时,真正的智能时代就来了...
- “数学不行,还能干点啥?”面试官+CTO:干啥都费劲!
- AI研究过于集中狭隘,我们是不是该反思了?
- 免费开源!新学期必收藏的AI学习资源,从课件、工具到源码都齐了
- 售价1万7的华为Mate X很贵吗?
- 手机芯片谁是AI之王?高通、联发科均超华为
- 冠军揭晓!京东Alpha开发者大赛Pick谁上了C位
- 一行代码,解决空指针问题.