python 管道队列_关于python:Multiprocessing-管道与队列
Python的多处理程序包中的队列和管道之间的根本区别是什么?
在什么情况下应该选择一种? 什么时候使用Pipe()有优势? 什么时候使用Queue()有优势?
Pipe()只能有两个端点。
Queue()可以有多个生产者和消费者。
何时使用它们
如果需要两个以上的点进行通信,请使用Queue()。
如果您需要绝对性能,则Pipe()会更快,因为Queue()是建立在Pipe()之上的。
绩效基准
假设您要生成两个进程并在它们之间尽快发送消息。这些是使用Pipe()和Queue()进行的类似测试之间的拖动竞赛的计时结果。这是在运行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上进行的。
仅供参考,我将JoinableQueue()的结果作为奖励; JoinableQueue()在调用queue.task_done()时负责任务(它甚至不知道特定任务,它只计算队列中未完成的任务),因此queue.join()知道工作已完成。
此答案底部的每个代码...
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
总结Pipe()大约是Queue()的三倍。除非您确实必须拥有这些好处,否则甚至不要考虑JoinableQueue()。
奖励材料2
除非您知道一些捷径,否则多处理会在信息流中引入微妙的变化,使调试变得困难。例如,在许多情况下,当您通过字典建立索引时,您的脚本可能运行良好,但是某些输入很少会失败。
通常,当整个python进程崩溃时,我们会获得有关失败的线索;但是,如果多处理功能崩溃,则不会在控制台上打印未经请求的崩溃回溯。很难找到未知的多处理崩溃,而又不知道导致进程崩溃的线索。
我发现跟踪多处理崩溃信息的最简单方法是将整个多处理功能包装在try / except中并使用traceback.print_exc():
import traceback
def reader(args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print"FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
现在,当您发现崩溃时,您会看到类似以下内容的信息:
FATAL: reader([{'crash', 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File"foo.py", line 19, in __init__
self.run(task_q, result_q)
File"foo.py", line 46, in run
raise ValueError
ValueError
源代码:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
@Jonathan"总而言之,Pipe()比Queue()快三倍"
但是Pipe()不能安全地与多个生产者/消费者一起使用。
优秀的!好的答案,很高兴您提供了基准!我只有两个小问题:(1)"快几个数量级"有点夸大其词。差异为x3,约为一个数量级的三分之一。只是说。 ;-); (2)比较公平的比较是正在运行的N个工作程序,每个工作人员都通过点对点管道与主线程进行通信,而运行中的N个工作程序的性能都是从单个点对多点队列中提取的。
对您的"奖金材料" ...是的。如果您是Process的子类,请将大部分run方法放在try块中。这也是记录异常的有用方法。复制普通异常输出:sys.stderr.write(.join(traceback.format_exception(*(sys.exc_info()))))
通过管道将错误消息发送到另一个进程并在另一个进程中处理错误会更好吗?
@ alexpinho98-但是您将需要一些带外数据以及相关的信令模式,以指示您发送的不是常规数据而是错误数据。鉴于发起过程已经处于不可预测的状态,这可能要问的太多了。
@迈克,只是想说你很棒。这个答案对我很有帮助。
@JJC要对自己的测验进行测验,3x大约是一个数量级,而不是三分之一-sqrt(10)=?3。
在multi-pipe.py中,如何知道在调用inp_p.close之前将所有项放入管道。
@ideoutrea,同意显式比隐式好
python 管道队列_关于python:Multiprocessing-管道与队列相关推荐
- python 时间序列预测_使用Python进行动手时间序列预测
python 时间序列预测 Time series analysis is the endeavor of extracting meaningful summary and statistical ...
- python 概率分布模型_使用python的概率模型进行公司估值
python 概率分布模型 Note from Towards Data Science's editors: While we allow independent authors to publis ...
- python线性表和队列_[笔记]python数据结构之线性表:linkedlist链表,stack栈,queue队列...
python数据结构之线性表 python内置了很多高级数据结构,list,dict,tuple,string,set等,在使用的时候十分舒心.但是,如果从一个初学者的角度利用python学习数据结构 ...
- python使用redis队列_【Python】python使用redis做队列服务
系统中引入消息队列机制是对系统一个非常大的改善.例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中.你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确 ...
- python实现队列_用Python实现的数据结构与算法:队列
一.概述 队列(Queue)是一种先进先出(FIFO)的线性数据结构,插入操作在队尾(rear)进行,删除操作在队首(front)进行. 二.ADT 队列ADT(抽象数据类型)一般提供以下接口: Qu ...
- python 网页编程_通过Python编程检索网页
python 网页编程 The internet and the World Wide Web (WWW), is probably the most prominent source of info ...
- python培训班-Python培训机构_高品质Python线下开发培训班推荐-黑马程序员
Python编程基础基础班1 课时:15天技术点:97项测验:2次学习方式:线下面授 学习目标 1.掌握Python开发环境基本配置|2.掌握运算符.表达式.流程控制语句.数组等的使用|3.掌握字符串 ...
- python移动图形工作站_让Python跑得更快
原标题:让Python跑得更快 点击关注 异步图书,置顶公众号 每天与你分享 IT好书 技术干货 职场知识 Tips 参与文末话题讨论,即有机会获得异步图书一本. Python很容易学.你之所以阅读本 ...
- python编程图文_深入Python多进程编程基础——图文版
多进程编程知识是Python程序员进阶高级的必备知识点,我们平时习惯了使用multiprocessing库来操纵多进程,但是并不知道它的具体实现原理.下面我对多进程的常用知识点都简单列了一遍,使用原生 ...
- python queue 调试_学Python不是盲目的,是有做过功课认真去了解的
有多少伙伴是因为一句'人生苦短,我用Python'萌生想法学Python的!我跟大家更新过很多Python学习教程普及过多次的Python相关知识,不过大家还是还得计划一下Python学习路线!Pyt ...
最新文章
- Data Structure - 返回单链表的中间结点
- 字符集问题的初步探讨-乱码的产生
- 基于机器学习的捡球机器人设计与实现(探索)第2篇——7步完成opencv的安装(20190112)
- 在tomcat上部署项目,实现类似添加这样的功能之后,tomcat要运行很久,解决办法
- spring cloud Alibaba 的 Nacos学习笔记
- rabbitmq实现秒杀中订单流量削峰
- linux服务器拓扑图,Linux服务器作为网关的DNS分离解析服务(CentOS 7版本)
- 对一个存储过程语法的解读
- 改动文件后缀的C语言实现
- Zabbix触发器配置指定生效星期监控CPU使用率
- Atitit uke协会产业分类法 艾提拉产业分类法五大类法 目录 1. 配第-克拉克定理概述 产业趋势 有形财物的生产转向无形的服务性生产	1 1.1. 农工商趋势法	1 1.2. 1940年,英
- 心电信号matlab电路仿真实例,基于matlab的心电信号分析系统的设计与仿真.docx
- pandas求协方差、相关系数、显著性检验
- 鸿基酒店应收应付报表生成系统
- 以太网卡支持的工作模式
- 武汉城建学院计算机专业柴曲,采访对话丨在世界一流的柴院学音乐,是怎样一种感受?...
- linux centos single,centos 6中single-request-reopen参数说明
- 获取Android_Studio模拟器root权限的方法
- Java编程笔记9:容器(下)
- TTCN中PTC的运行流程