Python并发编程之进程间通信

  • 一、Python进程间通信
  • 二、使用队列Queue进行进程间通信
    • 2.1 Queue实例中的方法和属性
    • 2.2 使用Queue进行进程间通信的代码示例
  • 三、使用管道Pipe进行进程间通信
    • 3.1 Pipe实例的方法和属性
    • 3.2 使用Pipe进行进程间通信的代码示例

一、Python进程间通信

multiprocessing模块支持进程间通信的两种主要形式:管道和队列。这两种方法都是实用消息传递来实现的,但队列接口有意模仿线程程序中参见的队列用法。

二、使用队列Queue进行进程间通信

Queue([maxsize])

创建共享的进程队列。maxsize是队列中允许的最大项数,如果忽略此参数,则无大小限制。底层队列使用管道和锁定实现。

2.1 Queue实例中的方法和属性

Queue的实例具有以下方法

  • q.cancel_join_thread()
  • q.close()
  • q.empty()
  • q.full()
  • q.get([block [, timeout]])
  • q.get_nowait()
  • q.join_thread()
  • q.put(item [, block [, timeout]])
  • q.put_nowait(item)
  • q.qsize()
JoinableQueue([maxsize])

创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例p除了与Queue对象相同的方法外,还具有以下方法:

  • q.task_done()
    消费者使用此方法发出信号,表示q.get()返回的项目已经被处理完毕。如果调用此方法的次数大于队列中删除的项目数量,将引发ValueError异常
  • q.join()
    生产者使用此方法进行阻塞,直到队列中的所有项目都被处理。阻塞将持续到位队列中的每个项目均调用q.task_done()方法为止。

2.2 使用Queue进行进程间通信的代码示例

import multiprocessingdef consumer(input_q):while True:item = input_q.get()# 进行处理工作,此处替换为实际有意义的工作print(item)# 发出信号通知任务完成input_q.task_done()def producer(sequence, output_q):for item in sequence:# 将项目放入队列中output_q.put(item)# 建立进程
if __name__ == '__main__':q = multiprocessing.JoinableQueue()# 运行消费者进程consumer_p = multiprocessing.Process(target=consumer, args=(q, ))consumer_p.daemon = Trueconsumer_p.start()# 生产项目,sequence代表要发送给消费者的项目序列# 在实践中,这可能是生成器的输出或通过一些其他方式生产出来sequence = [1, 2, 3, 4, 5]producer(sequence, q)# 等待所有项目被处理q.join()

这个例子中,将消费者进程设置为后台进程,因为它永久运行,而我们希望但主程序结束时它随之终止(如果忘记这么做,程序将挂起)。这里使用了JoinableQueue,以便让生产者实际了解队列中的所有项目何时被处理完毕。join()操作保证了这一点。如果忘记这个步骤,消费者进程将在有时间完成所有工作之前被终止。

如果需要,可以在同一个队列中放置多个进程,也可以从同一个队列中获取多个进程,例如,如果要构造消费者进程池,可以编写下面这样的代码:

if __name__ == "__main__":q = multiprocessing.JoinableQueue()# 启动一些消费者进程consumer_p1 = multiprocessing.Process(target=consumer, args=(q, ))consumer_p1.daemon = Trueconsumer_p1.start()consumer_p2 = multiprocessing.Process(target=consumer, args=(q, ))consumer_p2.daemon = Trueconsumer_p2.start()# 生产项目sequence = [1, 2, 3, 4, 5]producer(sequence, q)# 等待所有项目被处理q.join()

编写这类代码时,要把队列中的每个项目都序列化,然后通过管道或套接字连接发送给进程。一般的规则是:发送数据量较少的大对象比发送大量小对象要好。

在某些应用程序中,生产者需要通知消费者,它们不在生产任何项目而应该关闭。为此,编写的代码中应该使用标志(sentinel 指示完成的特殊值)。下面这个例子使用none作为标志说明这个概念:

import multiprocessingdef consumer(input_q):while True:item = input_q.get()if item is None:break# 处理项目print(item)# 关闭print("consumer done")def producer(sequence, output_q):for item in sequence:# 把项目放入队列output_q.put(item)if __name__ == '__main__':q = multiprocessing.JoinableQueue()# 启动消费者进程consumer_p = multiprocessing.Process(target=consumer, args=(q, ))consumer_p.daemon = Trueconsumer_p.start()# 生产项目sequence = [1, 2, 3, 4, 5]producer(sequence, q)# 在队列中放置标志,发出完成信号q.put(None)# 等待所有项目被处理, 然后关闭q.join()

如果像上面这个例子中那样使用标志,一定要在队列上为每个使用者都放上标志。例如,如果有三个消费者进程在使用队列上的项目,那么生产者需要在队列中放置三个标志,才能让所有消费者都关闭。

三、使用管道Pipe进行进程间通信

作为使用队列的另一种形式,还可以使用管道在进程之间执行消息传递。

Pipe([duplex])

在进程之间创建一条管道,并返回元组(conn1, conn2), 其中conn1和conn2是表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex设置为false,则conn1只能用于接收,conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法。

3.1 Pipe实例的方法和属性

Pipe()方法返回的Connection对象的实例c具有以下方法和属性

  • c.close()
  • c.fileno()
  • c.poll([timeout])
  • c.recv()
  • c.recv_bytes(buffer [, offset [, size]])

3.2 使用Pipe进行进程间通信的代码示例

可以使用与队列类似的方式使用管道。下面这个例子说明如何使用管道实现前面的生产者消费者问题:

import multiprocessing# 使用管道上的项目
def consumer(pipe):output_p, input_p = pipe# 关闭管道的输入端input_p.close()while True:try:item = output_p.recv()except EOFError:# 如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常break# 处理项目print(item)# 关闭print("consumer done")# 生产项目并将其放到队列上
def producer(sequence, input_p):for item in sequence:input_p.send(item)if __name__ == "__main__":(output_p, input_p) = multiprocessing.Pipe()# 启动消费者进程consumer_p = multiprocessing.Process(target=consumer, args=((output_p, input_p), ))consumer_p.start()# 关闭生产者中的输出管道output_p.close()# 生产项目sequence = [1, 2, 3, 4, 5]producer(sequence, input_p)# 关闭输入管道,表示完成input_p.close()# 等待消费者进程关闭consumer_p.join()

应当特别注意管道端口的正确管理。如果生产者或消费者中都没有使用管道的某个端点,就应将其关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭了管道的输入端。如果忘记执行这些步骤,程序可能会在消费者中的recv()操作上挂起。

管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:

import multiprocessing# 服务器处理
def adder(pipe):server_p, client_p = pipe# 关闭服务器中的客户端管道client_p.close()while True:try:x, y = server_p.recv()except EOFError:breakresult = x + yserver_p.send(result)# 关闭print("server done")if __name__ == "__main__":(server_p, client_p) = multiprocessing.Pipe()# 启动服务器进程adder_p = multiprocessing.Process(target=adder, args=((server_p, client_p), ))adder_p.start()# 关闭客户端中的服务器管道server_p.close()# 向服务器提出一些请求client_p.send((3, 4))print(client_p.recv())client_p.send(("hello", "world"))print(client_p.recv())# 完成, 关闭管道client_p.close()# 等待消费者进程关闭adder_p.join()

这个例子中,adder()函数以服务器的形式运行,等待消息到达管道的端点。收到之后,它会执行一些处理并将结果返回给管道。send()和recv()方法使用pickle模块对对象进行序列化。但对于使用远程过程调用的高级应用程序而言,应该使用进程池。

Python并发编程之进程间通信相关推荐

  1. Python并发编程理论篇

    Python并发编程理论篇 前言 很多人学习python,不知道从何学起. 很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手. 很多已经做案例的人,却不知道如何去学习更加高深的知识 ...

  2. Python并发编程系列之多进程(multiprocessing)

    1 引言 本篇博文主要对Python中并发编程中的多进程相关内容展开详细介绍,Python进程主要在multiprocessing模块中,本博文以multiprocessing种Process类为中心 ...

  3. Python|并发编程|爬虫|单线程|多线程|异步I/O|360图片|Selenium及JavaScript|Scrapy框架|BOM 和 DOM 操作简介|语言基础50课:学习(12)

    文章目录 系列目录 原项目地址 第37课:并发编程在爬虫中的应用 单线程版本 多线程版本 异步I/O版本 总结 第38课:抓取网页动态内容 Selenium 介绍 使用Selenium 加载页面 查找 ...

  4. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  5. python并发编程之协程

    python并发编程之协程 1.协程: 单线程实现并发 在应用程序里控制多个任务的切换+保存状态 优点: 应用程序级别速度要远远高于操作系统的切换 缺点: 多个任务一旦有一个阻塞没有切,整个线程都阻塞 ...

  6. python并发编程调优_Python并发编程-并发解决方案概述

    Python并发编程-并发解决方案概述 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.并发和并行区别 1>.并行(parallel) 同时做某些事,可以互不干扰的同一个时 ...

  7. 揭秘Python并发编程——协程

    原文链接:https://baijiahao.baidu.com/s?id=1649450510185145678&wfr=spider&for=pc Python并发编程一直是进阶当 ...

  8. python并发编程之semaphore(信号量)_浅谈Python并发编程之进程(守护进程、锁、信号量)...

    前言:本博文是对Python并发编程之进程的知识延伸,主要讲解:守护进程.锁.信号量. 友情链接: 一.守护进程(daemon) 1.1 守护进程概念 首先我们都知道:正常情况下,主进程默认等待子进程 ...

  9. python 并发编程 多线程 目录

    线程理论 python 并发编程 多线程 开启线程的两种方式 python 并发编程 多线程与多进程的区别 python 并发编程 多线程 Thread对象的其他属性或方法 python 并发编程 多 ...

最新文章

  1. 基于WeUI的Angular2开发
  2. 关于资产发现-嗅探法
  3. Rnotebook中用python画图
  4. 天平应什么放置_天平是否应该放干燥剂?
  5. mybatis:延迟加载时不要在get/set方法上面添加final关键字(原创)
  6. mongodb环境安装
  7. Atitit。Cas机制 软件开发 编程语言 无锁机制 java c# php
  8. Starter Kit for ASP.NET 2.0 家族又添新丁!
  9. 语言中的petchar运用_自闭症儿童语言障碍家庭训练,需要融入这些方法
  10. [洛谷P3550][POI2013]TAK-Taxis
  11. 第三章:变量与字符串等基础知识
  12. Spring boot web开发实战
  13. c++接口与实现的分离
  14. 7.性能之巅 洞悉系统、企业与云计算 --- 内存
  15. 长大了还不如小时候那么能够坚持了
  16. Android UI学习之TextView
  17. django pdf转html5,pytho pdfkit 将网页django2.0教程内容打印成pdf文档
  18. win10+乌班图双系统安装(详细)
  19. “普通高中数学课程标准(实验)”解读
  20. HashMap底层实现原理概述

热门文章

  1. 2023年,前端开发就业前景好吗?
  2. 02. 禁止修改 IP 上网 ❀ 飞塔 (Fortinet5.4) 防火墙
  3. java 调用webDriver实现访问网页(谷歌浏览器)
  4. 个性化Ubuntu壁纸如何添加
  5. 咏南ISAPI中间件
  6. 小红书探店流程有哪些?小红书探店博主如何联系?
  7. 全新系列手机 配索尼4800万摄像头
  8. 外国人坐地铁到底玩不玩手机?
  9. 如何使用Movavi Video Editor去除影片中的声音或音乐
  10. 今日简报 每日精选12条新闻简报 每天一分钟 知晓天下事 4月23日