python进程通信方式总结(一):管道与信号量
进程介绍
一个独立进程不受其他进程执行的影响,而一个协作进程可能会受到其他执行进程的影响,尽管可以认为那些独立运行的进程将非常高效地执行,但实际上,在许多情况下,可以利用合作性质来提高计算速度,便利性和模块化。进程间通信(IPC)是一种机制,允许进程彼此通信并同步其动作。这些过程之间的通信可以看作是它们之间进行合作的一种方法。
进程主要通过以下两者相互通信:
- 共享内存
- 讯息传递
而在实际使用情况中,我们又可以将其分为7种,如下图所示:
下面就对上面列举的方式在python中进行逐个说明,可能我理解的内容与理论有些出入,因为我是从实际使用上总结,欢迎私信或者评论。
python进程方式
进程通信方式说明
- 管道pipe:管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
- 命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
- 消息队列MessageQueue:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
- 共享存储SharedMemory:共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。
- 信号量Semaphore:信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
- 套接字Socket:套解口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同及其间的进程通信。
- 信号 ( sinal ) : 信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。
上述七种方式对应于python中有三个主要的包能完成操作,分别是:
- subprocess:可以在当前程序中执行其他程序或命令;
- mmap:提供一种基于内存的进程间通信机制;
- multiprocessing:提供支持多处理器技术的多进程编程接口,并且接口的设计最大程度地保持了和threading模块的一致,便于理解和使用。
下面针对七种方式与python的三种模块进行分别讲解:
python利用管道通信
multiprocessing.pipe
关于管道,我用的倒不是很多,通常在python中,使用管道一般都是subprocess模块中的popen,并且是在调用外部shell程序,对应的还有stdin,stdout的状态,而我也是在写本篇博文,找资料的时候才知道multiprocessing竟然也有一个pipe,但这个通信方式给我的感觉更像是双向队列,并且拆分成了两个,具体的demo参考:
# coding:utf-8
from multiprocessing import Process, Pipedef func(conn2):conn2.send("I am a child process.")print("Message from the parent process:", conn2.recv())conn2.close()if __name__ == '__main__':conn1, conn2 = Pipe() # 建立管道,拿到管道的两端,双工通信方式,两端都可以收发消息p = Process(target=func, args=(conn2,)) # 将管道的一端给子进程p.start() # 开启子进程print("Message from the child process:", conn1.recv()) # 主进程接受来自子进程的消息conn1.send("I am the main process.") # 主进程给子进程发送消息conn1.close()
demo中数据从conn1流向conn2,而conn2的消息发送给了conn1,这种叫全双工模式,因为有个默认值duplex参数为True,为False就只能1进2出。
上述是建立在一种比较理想的测试环境下进行的,我没有具体看过multiprocessing的源码,因为是调用的C语言包,听说里面的弯弯绕绕还是挺多的,但从一些issue里得知,multi的pipe有线程不安全问题,还有数据接收端会在没数据的时候卡住,关于前面这个问题,也能用一个例子来解释:
from threading import Thread, Locknumber = 0def target():global numberfor _ in range(1000000):number += 1thread_01 = Thread(target=target)
thread_02 = Thread(target=target)
thread_01.start()
thread_02.start()
thread_01.join()
thread_02.join()
print(number)
多跑例子几次,我们会发现每次输出的number都不相同,原因就是如果没有锁的机制,多个线程先后更改数据造成所得到的数据是脏数据,这就叫线程不安全。而解决的方法就是加锁,就是上面代码注释的那部分替换。
但一般都是用pipe都会使用进程去开,那么就避免了探讨安不安全的问题。关于第二个问题,我在实验过后发现确实如此,如果发送和接收数据不对等,程序会卡住,且没有任何报错,所以,可能因为pipe的种种限制,以及支持场景较少,而直接采用queue来进行了二次封装,线程threading模块同样做了相关改进,根据某些资料说是queue自身实现了锁原语,因此它才能实现人工原子操作。
subprocess.popen
popen的通用格式为:
subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
具体参数为:
参数名 | 参数说明 |
---|---|
args | 要执行的命令或可执行文件的路径 |
bufsize | 控制 stdin, stdout, stderr 等参数指定的文件的缓冲,和打开文件的 open()函数中的参数 bufsize 含义相同 |
executable | 如果这个参数不是 None,将替代参数 args 作为可执行程序 |
stdin | 指定程序的标准输入 |
stdout | 指定程序的标准输出 |
stderr | 指定程序的标准错误输出 |
preexec_fn | 默认是None,否则必须是一个函数或者可调用对象,在子进程中首先执行这个函数,然后再去执行为子进程指定的程序或Shell。 |
close_fds | 布尔型变量,为 True 时,在子进程执行前强制关闭所有除 stdin,stdout和stderr外的文件; |
shell | 布尔型变量,明确要求使用shell运行程序,与参数 executable 一同指定子进程运行 |
cwd | 代表路径的字符串,指定子进程运行的工作目录,要求这个目录必须存在; |
env | 字典,键和值都是为子进程定义环境变量的字符串; |
universal_newline | 布尔型变量,为 True 时,stdout 和 stderr 以通用换行(universal newline)模式打开 |
creationfalgs | 最后这两个参数是Windows中才有的参数,传递给Win32的CreateProcess API调用。 |
然后关于这个的应用场景,一般都是通过它调用一个进程去处理shell语句,我是根据它做ffmpeg的调用,用来生成视频,以及一些其它的流媒体。下面是调用shell的一个demo:
import os,time
from subprocess import *
from multiprocessing import *def run_shell_cmd(cmd_str, index):print('run shell cmd index %d'%(index,))proc = Popen(['/bin/zsh', '-c', cmd_str],stdout=PIPE)time.sleep(1)outs = proc.stdout.readlines()proc.stdout.close()proc.terminate()return def multi_process_exc():pool = cmd_str = 'ps -ef | grep chromium'for x in range(10): p = Process(target=run_shell_cmd, args=(cmd_str,x))p.start()pool.append(p)for p in pool:p.join()
if __name__ == "__main__":multi_process_exc()
subprocess模块能说的不多,因为我也用得不多,当然,除了这个,还有很多管道的例子,比如opencv官网下的一个issue,就有人用win32pipe来做信息传输:
#!/usr/bin/env python
import cv2
import win32pipe, win32file
from threading import Threaddef runPipe(): p = win32pipe.CreateNamedPipe(r'\\.\pipe\myNamedPipe',win32pipe.PIPE_ACCESS_DUPLEX,win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_WAIT,1, 1024, 1024, 0, None) win32pipe.ConnectNamedPipe(p, None) with open("D:\\Streams\\mystream.ts", 'rb') as input:while True:data = input.read(1024)if not data:breakwin32file.WriteFile(p, data) def extract():cap = cv2.VideoCapture(r'\\.\pipe\myNamedPipe') fnum = 0while(True):# Capture frame-by-frameret, frame = cap.read() print fnum, "pts:", cap.get(cv2.cv.CV_CAP_PROP_POS_MSEC)fnum = fnum + 1 # When everything done, release the capturecap.release() if __name__ == "__main__": thr = Thread(target=extract)thr.start()runPipe()print "bye"
但是到今年,目前也应该被淘汰了,目前主流都是基于queue,这将留在下一篇讲解。
python利用信号量通信
这个东西基本就没有用到过了,但仔细想想,其实很多底层都有用到,任何一个web框架,它都和上下文有些关系,像Django里的signal,flask中也是内置信号,它与设计模式中的观察者基本一致,后续我会再说明生产者消费者模型在queue里,所以这里简单提一下,关于在多进程中直接使用,multiprocessing与threading中都是叫Semaphore,Semaphore和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过,demo为:
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphoredef home(name, se):se.acquire() # 拿到一把钥匙print('%s进入了房间' % name)time.sleep(random.randint(1, 5))print('******************%s走出来房间' % name)se.release() # 还回一把钥匙if __name__ == '__main__':se = Semaphore(2) # 创建信号量的对象,有两把钥匙for i in range(7):p = Process(target=home, args=('tom{}'.format(i), se))p.start()
"""
tom1进入了房间
tom0进入了房间
******************tom1走出来房间
tom2进入了房间
******************tom0走出来房间
tom3进入了房间
******************tom3走出来房间
tom4进入了房间
******************tom2走出来房间
tom5进入了房间
******************tom5走出来房间
tom6进入了房间
******************tom4走出来房间
******************tom6走出来房间
"""
关于实际应用,可以看一道lc。
我们提供一个类:"""
class FooBar {public void foo() {for (int i = 0; i < n; i++) {print("foo");}}public void bar() {for (int i = 0; i < n; i++) {print("bar");}}
}
"""两个不同的线程将会共用一个 FooBar 实例。其中一个线程将会调用 foo() 方法,另一个线程将会调用 bar() 方法。请设计修改程序,以确保 "foobar" 被输出 n 次。
import threading
class FooBar:def __init__(self, n):self.n = nself.foo_lock = threading.Semaphore()self.foo_lock.acquire()self.bar_lock = threading.Semaphore()self.bar_lock.acquire()def foo(self, printFoo: 'Callable[[], None]') -> None:for i in range(self.n):# printFoo() outputs "foo". Do not change or remove this line.printFoo()self.bar_lock.release()self.foo_lock.acquire()def bar(self, printBar: 'Callable[[], None]') -> None:for i in range(self.n):# printBar() outputs "bar". Do not change or remove this line.self.bar_lock.acquire()printBar()self.foo_lock.release()
python进程通信方式总结(一):管道与信号量相关推荐
- python进程通信方式有几种_python全栈开发基础【第二十一篇】互斥锁以及进程之间的三种通信方式(IPC)以及生产者个消费者模型...
一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理. 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行 ...
- Linux —进程间的五种通信方式—(半双工管道、命名管道、消息队列、信号、共享内存),外加信号量。直接上代码:
无名管道pipe(半双工):(仅限同一个程序运行) 创建无名管道会生成特殊文件,只存在于内存中 #include <stdio.h> #include <stdlib.h> # ...
- python 进程间同步_python之路29 -- 多进程与进程同步(进程锁、信号量、事件)与进程间的通讯(队列和管道、生产者与消费者模型)与进程池...
所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了.至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠 ...
- 网络编程7_ multiprocessing类-管道.数据共享, 信号量,事件,进程池
一. multiprocessing类 6. 管道 进程间通信(ipc)方式二: 管道会导致数据不安全的情况, 后面我们会说到为什么会带来数据不安全的问题 创建管道的类: ...
- Linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存
Linux进程间通信--进程,信号,管道,消息队列,信号量,共享内存 参考:<linux编程从入门到精通>,<Linux C程序设计大全>,<unix环境高级编程> ...
- python管道通信_Python进程通信之匿名管道实例讲解
匿名管道 管道是一个单向通道,有点类似共享内存缓存.管道有两端,包括输入端和输出端.对于一个进程的而言,它只能看到管道一端,即要么是输入端要么是输出端. os.pipe()返回2个文件描述符(r, w ...
- 进程通信:匿名管道和命名管道
一.进程间通信方式 管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用.进程的亲缘关系通常是指父子进程关系. 有名管道 (named pipe) : ...
- python进程池win出错_解决windows下python3使用multiprocessing.Pool出现的问题
例如: from multiprocessing import Pool def f(x): return x*x pool = Pool(processes=4) r=pool.map(f, ran ...
- linux进程通信1:进程通信概述,管道通信原理(无名管道,有名管道),管道编程实战
进程通信概述,管道通信原理(无名管道,有名管道),管道编程实战 1.进程间通信概述: 举例1: 你手机微信和别人手机微信通信 举例2: 如:父子进程wait 和 exit之间的通信 进程间通信(IPC ...
最新文章
- java 集合 接口_Java集合之Collection接口
- idea里maven设置本地仓库报错原因
- c# 两个数的加减乘除
- linux应用程序安装PPT免费序,linux下应用程序安装的总结
- 利用jsonp实现跨域请求
- jquery学习笔记之选择器
- Oracle 11g Release 1 (11.1)——自动存储管理(Automatic Storage Management,ASM)
- c语言中isupper用法,isupper - [ C语言中文开发手册 ] - 在线原生手册 - php中文网
- java不想被gc,Java GC 必知必会
- KEIL编译时Warning: C3008W: splitting LDM/STM has no benefit的消除
- baocms7.0版本一元云购报错Call-time pass-by-reference has been removed处理办法
- 大型施工程机械设备资产管理融资租赁方案
- iphone输入法换行_iphone打字怎么换行?iphone输入法换行教程
- Java 解析复杂表格excel
- linux裁剪图片的软件,【美图秀秀Linux版】美图秀秀Linux版下载 v1.0.0.0 免费最新版-趣致软件园...
- CADENAS为BELFUSE创建新的电子元件3DCAD产品目录
- 离散信源信息量、平均信息量的计算
- 看雪pwn入门--基础篇
- android工具类怎么写,用kotlin写了一些android常用的一些工具类
- mysql+check+男或女_关于MYSQL 检察check约束
热门文章
- 基于ATTCK框架解析勒索病毒攻击
- C++OpenCV系统学习(17)——图像分割与抠图(4)Grabcut
- 计算机管理 服务无响应,电脑任务栏假死点击没反应的解决方法(win7与xp)
- Ring3无敌进程让你的进程变得和smss.exe一样支持64
- 基于layui 的数据表格复杂表头导出到excel文件中
- 静态链接之与静态库的链接
- Win10玩游戏提示由于找不到xinput1 3.dll无法继续执行代码
- js从字符串中提取身份证号,连续18位数字
- 高可用架构之高可用的应用和服务
- ARPA x 京东数科:隐私计算如何赋能未来金融数据共享