• BlockingConnection

BlockingConnection提供了最通用的连接方式

提供两个类: BlockingConnection 和 BlockingChannel

class BlockingConnection(object):def __init__(self, parameters=None, _impl_class=None):...

BlockingConnection是在pika异步的基础上提供的阻塞方法, 调用的是 AMQP协议的 Basic.Deliver and Basic.Return

在使用basic_consume接收消息, 使用basic_publish发送消息的时候仍然可以实现异步

为防止递归调用或者阻塞, blocking连接/channel 在上下文切换中实现 队列的asynchronously-delivered事件(异步通知事件), 比如在等待BlockingConnection.channel或 BlockingChannel.queue_declare时, 一旦实现嵌套的上下文, 将会同步(synchronously)调用它们, 这涉及到所有的回调函数:

1.lockingConnection.add_on_connection_blocked_callback,

2.BlockingConnection.add_on_connection_unblocked_callback, 3.BlockingChannel.basic_consume 等

避免死锁, 一直夯住: 但rabbitmq资源不足的时候, 当去连接rabbitmq的时候, rabbitmq会告诉客户端Connection.Blocked, 然后rabbitmq会暂停处理连接,直到有资源分配进行处理, 这会影响BlockingConnection和BlockingChannel

比如用户在basic_publish 使用非发布确认机制下, 遇上rabbitmq暂停处理连接,将会一直阻塞住,用户回调也不会被执行, 可能引起系统宕机, 解决办法是:

在BlockingConnection初始化时配置blocked_connection_timeout连接参数

类主要的函数方法及说明:

class BlockingConnection(object):def __init__(self, parameters=None, _impl_class=None):"""Create a new instance of the Connection object.:param None | pika.connection.Parameters | sequence parameters:Connection parameters instance or non-empty sequence of them. IfNone, a `pika.connection.Parameters` instance will be created withdefault settings. See `pika.AMQPConnectionWorkflow` for moredetails about multiple parameter configurations and retries.:param _impl_class: for tests/debugging only; implementation class;None=default:raises RuntimeError:"""def add_on_connection_blocked_callback(self, callback):回调以便在连接被阻塞(从RabbitMQ接收到Connection.Blocked)时收到通知,在这种状态下,RabbitMQ暂停处理传入数据,直到连接被解除阻塞,因此接收此通知的发布者暂停发布直到连接被解除阻塞, 可以调用 ConnectionParameters.blocked_connection_timeout 添加超时def add_on_connection_unblocked_callback(self, callback):回调,以便在连接被解除阻塞时收到通知def call_later(self, delay, callback):passdef add_callback_threadsafe(self, callback):"""connection.add_callback_threadsafe(functools.partial(channel.basic_ack, delivery_tag=...))"""回调def remove_timeout(self, timeout_id):移除超时def close(self, reply_code=200, reply_text='Normal shutdown'):reply_code(int) - 关闭的代码reply_text(str) - 关闭的文本原因def process_data_events(self, time_limit=0):passdef sleep(self, duration):延迟def channel(self, channel_number=None):建立channel通道   channel_number 整数 要使用的通道编号,默认为下一个可用通道编号@propertydef is_closed(self):返回bool值@propertydef is_open(self):返回bool值@propertydef basic_nack_supported(self):返回bool值 , 指定服务器是否支持活动连接上的basic.nack@propertydef consumer_cancel_notify_supported(self):返回bool值   服务器是否支持活动连接上的使用者取消通知@propertydef exchange_exchange_bindings_supported(self):返回bool值  活动连接是否支持交换以交换绑定@propertydef publisher_confirms_supported(self):返回bool值  活动连接是否可以使用发布者确认

BlockingChannel 通道

创建示例

import pika# Create our connection object
connection = pika.BlockingConnection()# The returned object will be a synchronous channel
channel = connection.channel()

参数:

class BlockingChannel(object):def __init__(self, channel_impl, connection):pass
    @propertydef channel_number(self):"""Channel number"""频道号码
    @propertydef connection(self):"""The channel's BlockingConnection instance"""
    @propertydef is_closed(self):是否关闭, 返回bool
    @propertydef is_open(self):通道是否开启, 返回bool
    def close(self, reply_code=0, reply_text="Normal shutdown"):关闭
    def flow(self, active):关闭和打开通道流量控制。   active(bool) - 打开流程(True)或关闭(False)
    def add_on_cancel_callback(self, callback):一个回调函数,该函数将在代理发送Basic.Cancel时调用callback -:callback(method_frame)其中method_frame类型是pika.frame.Method类型的方法spec.Basic.Cancel
    def add_on_return_callback(self, callback):回调函数,该函数将在发布的消息被拒绝并由服务器通过Basic.Return返回时调用callback(callable) - 使用callback(channel,method,properties,body),其中channel:pika.Channel方法:pika.spec.Basic.Return属性:pika.spec.BasicProperties body:bytes
    def basic_consume(self,queue, on_message_callback, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None):queue(str) - 要使用的队列on_message_callback(可调用) -用于将消息分派给用户的必需函数,定义:        on_message_callback(channel,  method,properties,body)channel:BlockingChannel方法:spec.Basic.Deliver属性:spec.BasicProperties body:bytesauto_ack(bool) - 如果设置为True,将使用自动确认模式。exclusive(bool) - 不允许队列中的其他消费者consumer_tag(str) - 您可以指定自己的消费者标签; 如果留空,将自动生成消费者标签arguments(dict) - 消费者的自定义键/值对参数
def basic_cancel(self, consumer_tag):取消消费者
    def start_consuming(self):处理I / O事件并调度计时器和basic_consume 回调,直到取消所有使用者
    def stop_consuming(self, consumer_tag=None):取消所有使用者
    def consume(self,queue,auto_ack=False,exclusive=False, arguments=None, inactivity_timeout=None):阻止队列消耗而不是通过回调。此方法是一个生成器,它将每条消息都生成为方法,属性和正文的元组。当客户通过BlockingChannel.cancel()或代理取消使用者时,活动生成器迭代器终止。参数:    queue(str) - 要使用的队列名称auto_ack(bool) - 告诉代理不要期待ack / nack响应exclusive(bool) - 不允许队列中的其他消费者arguments(dict) - 消费者的自定义键/值对参数inactivity_timeout(float) - 如果给出一个数字(以秒为单位),将导致该方法在给定的不活动时间后产生(None,None,None); 这允许用户在等待消息到达时执行伪常规维护活动。如果给出 None(默认),则该方法将阻塞,直到下一个事件到达
    def get_waiting_message_count(self):返回可以通过BlockingChannel.consume从当前队列使用者生成器检索而不阻塞的消息数
    def cancel(self):
    def basic_ack(self, delivery_tag=0, multiple=False):确认一条或多条消息。当客户端发送时,此方法确认通过Deliver或Get-Ok方法传递的一条或多条消息。当由服务器发送时,此方法确认在确认模式下在通道上使用“发布”方法发布的一条或多条消息。确认可以是针对单个消息或一组消息,包括特定消息。参数: delivery-tag(int) - 服务器分配的传递标记multiple(bool) - 如果设置为True,则将传递标记视为“最多并包含”,以便可以使用单个方法确认多个消息。如果设置为False,则传递标记引用单个邮件。如果多个字段为1,并且传递标记为零,则表示确认所有未完成的消息。
    def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):方法允许客户端拒绝一个或多个传入消息。它可用于中断和取消大量传入消息,或将无法处理的消息返回到其原始队列。参数:  delivery-tag(int) - 服务器分配的传递标记multiple(bool) - 如果设置为True,则将传递标记视为“最多并包含”,以便可以使用单个方法确认多个消息。如果设置为False,则传递标记引用单个邮件。如果多个字段为1,并且传递标记为零,则表示确认所有未完成的消息。requeue(bool) - 如果requeue为true,服务器将尝试重新排队该消息。如果requeue为false或重新排队尝试失败,则丢弃或删除消息。
    def basic_get(self, queue, auto_ack=False):从AMQP代理获取单个消息参数:  queue(str) - 从中​​获取消息的队列名称auto_ack(bool) - 告诉经纪人不要期待回复
    def basic_publish(self,exchange, routing_key, body, properties=None, mandatory=False):参数:   exchange(str) - 要发布的交流routing_key(str) - 要绑定的路由键body(字节) - 消息体; 如果没有身体,空字符串properties(pika.spec.BasicProperties) - 消息属性mandatory(bool) - 强制性标志
    def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):指定服务质量参数: prefetch_size(int) - 该字段指定预取窗口大小。如果服务器的大小等于或小于可用的预取大小(并且也属于其他预取限制),则它将提前发送消息。可以设置为零,意味着“没有特定限制”,尽管其他预取限制可能仍然适用。如果在使用者中设置了no-ack选项,则忽略prefetch-size。prefetch_count(int) - 根据整个消息指定预取窗口。该字段可以与预取大小字段结合使用; 如果预取窗口(以及通道和连接级别的窗口)都允许,则只会提前发送消息。如果在使用者中设置了no-ack选项,则忽略prefetch-count。global_qos(bool) - QoS是否适用于频道上的所有消费者
def basic_recover(self, requeue=False):此方法要求服务器重新传送指定通道上的所有未确认消息。可以重新传递零个或多个消息。此方法替换异步Recover
    def basic_reject(self, delivery_tag=None, requeue=True):拒绝传入的消息。此方法允许客户端拒绝邮件。它可用于中断和取消大量传入消息,或将无法处理的消息返回到其原始队列。参数:   delivery-tag(int) - 服务器分配的传递标记requeue(bool) - 如果requeue为true,服务器将尝试重新排队该消息。如果requeue为false或重新排队尝试失败,则丢弃或删除消息。
    def confirm_delivery(self):启用RabbitMQ专有的确认模式
    def exchange_declare(self,exchange,exchange_type='direct',passive=False,durable=False,auto_delete=False,internal=False,arguments=None):声明交换机exchange(str) - 交换名称由这些字符的非空序列组成:字母,数字,连字符,下划线,句点或冒号。exchange_type(str) - 要使用的交换类型passive(bool) - 执行声明或只是检查它是否存在durable(bool) - 重启RabbitMQauto_delete(bool) - 当不再绑定队列时删除internal(布尔) - 只能由其他交易所发布arguments(dict) - 交换的自定义键/值对参数
    def exchange_delete(self, exchange=None, if_unused=False):交换机删除
    def exchange_bind(self, destination, source, routing_key='',arguments=None):交换机绑定destination(str) - 要绑定的目标交换source(str) - 要绑定的源交换routing_key(str) - 要绑定的路由键arguments(dict) - 绑定的自定义键/值对参数
 def exchange_unbind(self,destination=None,source=None,routing_key='',arguments=None):取消绑定
    def queue_declare(self,queue, passive=False, durable=False, exclusive=False,auto_delete=False, arguments=None):声明队列,queue(str) - 队列名称; 如果为空字符串,则代理将创建唯一的队列名称passive(bool) - 只检查队列是否存在,如果不存在则引发 ChannelCloseddurable(bool) - 经纪人重新开始exclusive(bool) - 仅允许当前连接访问auto_delete(bool) - 消费者取消或断开连接后删除arguments(dict) - 队列的自定义键/值参数
    def queue_delete(self, queue, if_unused=False, if_empty=False):删除队列
    def queue_purge(self, queue):清除指定队列中的所有消息   queue清除的队列的名称
    def queue_bind(self, queue, exchange, routing_key=None, arguments=None):将队列绑定到指定的交换参数:  queue(str) - 绑定到交换的队列exchange(str) - 要绑定的源交换routing_key(str) - 要绑定的路由键arguments(dict) - 绑定的自定义键/值对参数
    def queue_unbind(self,queue,exchange=None,routing_key=None,arguments=None):从交换中取消绑定队列queue(str) - 从交换中取消绑定的队列exchange(str) - 要绑定的源交换routing_key(str) - 解除绑定的路由键arguments(dict) - 绑定的自定义键/值对参数
    def tx_select(self):选择标准交易模式def tx_commit(self):事务提交def tx_rollback(self):事务回滚

pika详解(二) BlockingConnection相关推荐

  1. 安卓 linux init.rc,[原创]Android init.rc文件解析过程详解(二)

    Android init.rc文件解析过程详解(二) 3.parse_new_section代码如下: void parse_new_section(struct parse_state *state ...

  2. [转]文件IO详解(二)---文件描述符(fd)和inode号的关系

    原文:https://www.cnblogs.com/frank-yxs/p/5925563.html 文件IO详解(二)---文件描述符(fd)和inode号的关系 ---------------- ...

  3. PopUpWindow使用详解(二)——进阶及答疑

    相关文章: 1.<PopUpWindow使用详解(一)--基本使用> 2.<PopUpWindow使用详解(二)--进阶及答疑> 上篇为大家基本讲述了有关PopupWindow ...

  4. Android init.rc文件解析过程详解(二)

    Android init.rc文件解析过程详解(二) 3.parse_new_section代码如下: void parse_new_section(struct parse_state *state ...

  5. linux 进程间通信 dbus-glib【实例】详解二(下) 消息和消息总线(ListActivatableNames和服务器的自动启动)(附代码)

    linux 进程间通信 dbus-glib[实例]详解一(附代码)(d-feet工具使用) linux 进程间通信 dbus-glib[实例]详解二(上) 消息和消息总线(附代码) linux 进程间 ...

  6. linux 进程间通信 dbus-glib【实例】详解二(上) 消息和消息总线(附代码)

    linux 进程间通信 dbus-glib[实例]详解一(附代码)(d-feet工具使用) linux 进程间通信 dbus-glib[实例]详解二(上) 消息和消息总线(附代码) linux 进程间 ...

  7. Android Gradle 自定义Task详解二:进阶

    转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/78523958 本文出自[赵彦军的博客] 系列目录 Android Gradle使用 ...

  8. Android Loader 异步加载详解二:探寻Loader内部机制

    Android Loader 异步加载详解二:探寻Loader内部机制 转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/7025991 ...

  9. EXT核心API详解(二)-Array/Date/Function/Number/String

    EXT核心API详解(二)-Array/Date/Function/Number/String Array类 indexOf( Object o )  Number object是否在数组中,找不到返 ...

  10. OS--进程间通信详解(二)

    OS–进程间通信详解(二) 文章目录 OS--进程间通信详解(二) 一.进程间通信 1.互斥量 Futexes Pthreads中的互斥量 2.管程 3.消息传递 消息传递系统的设计要点 用消息传递解 ...

最新文章

  1. SAP MM初阶之ME12里为啥只能维护少量条件类型的价格?
  2. 【软件期刊01】2017-02-22
  3. MPLS多协议标签交换原理—Vecloud微云
  4. 什么是CDP(连续数据保护)?
  5. linux数据库redis主从配置,redis介绍及主从配置
  6. 51单片机蜂鸣器加数码管
  7. 申请 iOS开发者计划 (IOS Developer Program IDP)
  8. Java项目使用jib打包docker镜像的简单记录
  9. VR全景图在家装行业的应用及发展
  10. 【C++】利用DFS求解水洼数目问题
  11. 北大计算机图灵班,北大2019“图灵班”计划招60人,在北大什么条件才能进图灵班?...
  12. 对话推荐CRS论文精读KBRD:Towards Knowledge-Based Recommender Dialog System
  13. 深入理解Java虚拟机(周志明)——读书笔记1
  14. 成功解决:XXX不在 sudoers 文件中,此事将被报告
  15. 前端学习笔记--注册表单
  16. 二手物品商城java web,java|web|jsp校园二手网站|二手商品交易市场|平台|毕业设计课设|...
  17. Android App开发实战项目之购物车(附源码 超详细必看)
  18. 制作Apple安装U盘
  19. 学习现代 JavaScript 编程的最佳教程
  20. 山东理工大学计算机考研复试分数线,山东理工大学2020考研分数线_山东理工大学2020考研复试分数线 - 考研营...

热门文章

  1. linux系统触摸板双击,在Ubuntu 18.04系统中搞定触摸板多点触控
  2. 什么样的网页适合使用框架
  3. 第一章概述-------第一节--1.2互联网概述
  4. 今天收到一封非常牛B的离职信
  5. Spring框架初学习
  6. Oracle导入dmp文件步骤
  7. iphone 6 设置自定义铃声(未越狱)
  8. MySQL数据库表数据迁移--ibd的使用
  9. Tree Booster 的参数
  10. python什么是高阶函数_对于高阶函数的理解是什么?