php rabbitmq非阻塞,Python-RabbitMQ-RPC(非阻塞版)
服务器端:rpc_server.py
import pika,time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n -2)
def on_request(ch, method, props, body):
n = int(body)
print("[.] fib(%s)" % n)
response = fib(n)#斐波那契的执行结果赋值给reponse
#再把得到的消息发回给客户端
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties = pika.BasicProperties(
correlation_id= \
props.correlation_id
),
body =str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)#确保消息被消费,代表任务完成
#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
queue='rpc_queue')
print(" [x] Awaiting RPC request")
channel.start_consuming()
客户端:rpc_client.py
import pika,sys,uuid
import time
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue#获取queue名字
self.channel.basic_consume(self.on_response,#只要收到就调用on_response()
no_ack=True,
queue=self.callback_queue
)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:#判断服务器端corr_id和本地corr_id相等,才往下走
self.response = body#response收到body的消息表示response不为空
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,#指定返回到那个queue
correlation_id=self.corr_id,
),
body=str(n))#传字符串,把30传进来
while self.response is None:
#收到消息,就会触发on_response(),没消息就继续往下走循环
self.connection.process_data_events()#非阻塞版的start_consuming
print("no msg...")#只要走到这,就相当于没消息
time.sleep(0.5)
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(8)
print(" [.] Got %r" % response)
php rabbitmq非阻塞,Python-RabbitMQ-RPC(非阻塞版)相关推荐
- 【Python进阶学习】gRPC在Python的异步非阻塞实现方式
gRPC在Python的异步非阻塞实现方式 前言 问题&分析 问题阐述 原因分析 解决方案 服务端 原服务端实现方式 aio的服务端实现方式: 客户端 异步非阻塞方式 同步阻塞方式 性能优化效 ...
- Python|线程和进程|阻塞|非阻塞|同步|异步|生成器和协程|资源竞争|进程间通信|aiohttp库|daemon属性值详解|语言基础50课:学习(11)
文章目录 系列目录 原项目地址 第34课:Python中的并发编程-1 线程和进程 多线程编程 使用 Thread 类创建线程对象 继承 Thread 类自定义线程 使用线程池 守护线程 资源竞争 G ...
- python socket recv非阻塞_socket非阻塞recv大坑
Python中,socket用来实现网络通信,它默认的recv是一个阻塞的函数,也就是说,当运行到recv时,会在这个位置一直等待直到有数据传输过来,我在网上一篇文章看到: Sunmmary Pyth ...
- RabbitMQ+PHP 教程六(RPC)
(using php-amqplib) 前提必读 本教程假设RabbitMQ是安装在标准端口上运行(5672).如果您使用不同的主机.端口或凭据,则连接设置需要调整. 如果您在本教程中遇到困难,可以通 ...
- Python函数的非固定参数
一.概述 在上一篇博客中我已经写了,位置参数和关键字参数,下面我们来谈谈默认参数和参数组 二.默认参数 默认参数指的是,我们在传参之前,先给参数制定一个默认的值.当我们调用函数时,默认参数是非必须传递 ...
- java 多线程阻塞队列 与 阻塞方法与和非阻塞方法
Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移 ...
- AIO,BIO,NIO:同步阻塞式IO,同步非阻塞IO,异步非阻塞IO
BIO,同步阻塞式IO,简单理解:一个连接一个线程 NIO,同步非阻塞IO,简单理解:一个请求一个线程 AIO,异步非阻塞IO,简单理解:一个有效请求一个线程 IO:阻塞IO BIO:同步阻塞IO.服 ...
- Python中的非可变型的数据类型(immutable type)
1 致谢 感谢 Ned Batchelder博士的讲授, 视频链接如下: https://www.youtube.com/watch?v=_AEJHKGk9ns 2 Python中的非可变型的数据类型 ...
- 深入理解非阻塞同步IO和非阻塞异步IO
这两篇文章分析了Linux下的5种IO模型 http://blog.csdn.net/historyasamirror/article/details/5778378 http://blog.csdn ...
- 阻塞式IO和非阻塞式IO
什么是阻塞式IO,什么是非阻塞式IO?区分他们有何用? 阻塞式IO:IO即input/output,阻塞式IO指的是"一旦输入/输出工作没有完成,则程序阻塞,直到输入/输出工作完成" ...
最新文章
- 验证url 地址是否是图片
- java mail urlname_javamail收发信件时,服务器,收发方的名称应该怎样设置才有效呢
- php服务docker化,docker化你的PHP应用环境Nginx PHP-FPM
- nodejs+webpack+vue以及npm安装对应的库
- 蓝桥杯2015初赛-三羊献瑞-枚举
- java io流_浅谈IO流(一)-流的基本概念以及java的常见流
- QEMU CVE-2020-14364 漏洞分析(含 PoC 演示)
- Java 8日期– LocalDate,LocalDateTime,即时
- 使用嵌套类/临时类保存数据
- 注册OCX控件并在VS2013的mfc程序中使用及常见问题总结
- CSS day_14(6.29) Sass基本规则和语法、云服务器的购买和使用
- nf_regester
- 局域网查看工具V1.60.exe与局域网助手(LanHelper)的试用
- Python-3 EXCEL 操作-1
- python-collections
- 浅谈www.baidu.com和baidu.com
- 新手零基础21天Python打卡计划开始啦
- 智能录音笔的工作原理
- 显示卡影片播放硬件加速,作法原理完全解说
- 最新JustNews资讯博客类模板源码+WordPress内核