Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。

Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

Manager的dict,list使用

import multiprocessing

import time

def worker(d, key, value):

d[key] = value

if __name__ == '__main__':

mgr = multiprocessing.Manager()

d = mgr.dict()

jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))

for i in range(10)

]

for j in jobs:

j.start()

for j in jobs:

j.join()

print ('Results:' )

for key, value in enumerate(dict(d)):

print("%s=%s" % (key, value))

# the output is :

# Results:

# 0=0

# 1=1

# 2=2

# 3=3

# 4=4

# 5=5

# 6=6

# 7=7

# 8=8

# 9=9

import multiprocessing

import time

def worker(d, key, value):

d[key] = value

if __name__ == '__main__':

mgr = multiprocessing.Manager()

d = mgr.dict()

jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))

for i in range(10)

]

for j in jobs:

j.start()

for j in jobs:

j.join()

print ('Results:' )

for key, value in enumerate(dict(d)):

print("%s=%s" % (key, value))

# the output is :

# Results:

# 0=0

# 1=1

# 2=2

# 3=3

# 4=4

# 5=5

# 6=6

# 7=7

# 8=8

# 9=9

文章来源:https://www.cnblogs.com/caodneg7/p/9520069.html

Python 多进程默认不能共享全局变量

主进程与子进程是并发执行的,进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值)。如果要共享全局变量需要用(multiprocessing.Value("d",10.0),数值)(multiprocessing.Array("i",[1,2,3,4,5]),数组)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(range(5)))。进程通信(进程之间传递数据)用进程队列(multiprocessing.Queue(),单向通信),管道( multiprocessing.Pipe() ,双向通信)。

这里要注意多线程之间数据共享、多进程之间数据共享。。。。。。。多线程用全局变量(global)

全局变量,主进程与子进程是并发执行的,他们不能共享全局变量(子进程不能改变主进程中全局变量的值)

由于进程之间不共享内存,所以进程之间的通信不能像线程之间直接引用,因而需要采取一些策略来完成进程之间的数据通信。

本文记录使用 Manager 来完成进程间通信的方式。

首先描述需求:

场景:顶层逻辑负责管理,我们定义为C,由C启动A、B两个进程联合完成功能

需求:A、B联合工作过程中的数据通信

python 多进程

解决思路:

利用顶层C创建一个 Manager,由 Manager 提供数据池分发给A、B使用,从而完成两个进程之间的通信。

#-*-encoding:utf-8-*-

from multiprocessing import Process, Manager

from time import sleep

def thread_a_main(sync_data_pool): # A 进程主函数,存入100+的数

for ix in range(100, 105):

sleep(1)

sync_data_pool.append(ix)

def thread_b_main(sync_data_pool): # B 进程主函数,存入300+的数

for ix in range(300, 309):

sleep(0.6)

sync_data_pool.append(ix)

def _test_case_000(): # 测试用例

manager = Manager() # multiprocessing 中的 Manager 是一个工厂方法,直接获取一个 SyncManager 的实例

sync_data_pool = manager.list() # 利用 SyncManager 的实例来创建同步数据池

Process(target=thread_a_main, args=(sync_data_pool, )).start() # 创建并启动 A 进程

Process(target=thread_b_main, args=(sync_data_pool, )).start() # 创建并启动 B 进程

for ix in range(6): # C 进程(主进程)中实时的去查看数据池中的数据

sleep(1)

print(sync_data_pool)

if '__main__' == __name__: # 养成好习惯,将测试用例单独列出

_test_case_000()

#-*-encoding:utf-8-*-

from multiprocessing import Process, Manager

from time import sleep

def thread_a_main(sync_data_pool): # A 进程主函数,存入100+的数

for ix in range(100, 105):

sleep(1)

sync_data_pool.append(ix)

def thread_b_main(sync_data_pool): # B 进程主函数,存入300+的数

for ix in range(300, 309):

sleep(0.6)

sync_data_pool.append(ix)

def _test_case_000(): # 测试用例

manager = Manager() # multiprocessing 中的 Manager 是一个工厂方法,直接获取一个 SyncManager 的实例

sync_data_pool = manager.list() # 利用 SyncManager 的实例来创建同步数据池

Process(target=thread_a_main, args=(sync_data_pool, )).start() # 创建并启动 A 进程

Process(target=thread_b_main, args=(sync_data_pool, )).start() # 创建并启动 B 进程

for ix in range(6): # C 进程(主进程)中实时的去查看数据池中的数据

sleep(1)

print(sync_data_pool)

if '__main__' == __name__: # 养成好习惯,将测试用例单独列出

_test_case_000()

输出结果:

[]

[300, 100, 301]

[300, 100, 301, 101, 302, 303]

[300, 100, 301, 101, 302, 303, 102, 304]

[300, 100, 301, 101, 302, 303, 102, 304, 305, 103, 306]

[300, 100, 301, 101, 302, 303, 102, 304, 305, 103, 306, 104, 307, 308]

从结果来看, 我们的300+和100+的数据并行的被插入到同一数据池中了,也就是说,通信的目的达到了

只是从目前的方式来看,接收方必须主动的去查询数据池,类似于信箱的方式了

除了注释中的内容外,还需要注意:

1、manager 要在顶层创建

2、同步数据池是通过Manager自身的工厂方法创建的,这里 manager.list() 调用一次即产生一个新的数据池,而不是返回同一个数据池实例,所以数据池的实例需要做好管理

3、可以用的数据格式不仅list,可以选择如下(参考multiprocessing.managers)

class SyncManager(BaseManager):

def BoundedSemaphore(self, value: Any = ...) -> threading.BoundedSemaphore: ...

def Condition(self, lock: Any = ...) -> threading.Condition: ...

def Event(self) -> threading.Event: ...

def Lock(self) -> threading.Lock: ...

def Namespace(self) -> _Namespace: ...

def Queue(self, maxsize: int = ...) -> queue.Queue: ...

def RLock(self) -> threading.RLock: ...

def Semaphore(self, value: Any = ...) -> threading.Semaphore: ...

def Array(self, typecode: Any, sequence: Sequence[_T]) -> Sequence[_T]: ...

def Value(self, typecode: Any, value: _T) -> _T: ...

def dict(self, sequence: Mapping[_KT, _VT] = ...) -> Dict[_KT, _VT]: ...

def list(self, sequence: Sequence[_T] = ...) -> List[_T]: ...

进程之间共享数据(数值型):

import multiprocessing

def func(num):

num.value=10.78 #子进程改变数值的值,主进程跟着改变

if __name__=="__main__":

num=multiprocessing.Value("d",10.0) # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)

print(num.value)

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num.value)

import multiprocessing

def func(num):

num.value=10.78 #子进程改变数值的值,主进程跟着改变

if __name__=="__main__":

num=multiprocessing.Value("d",10.0) # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)

print(num.value)

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num.value)

进程之间共享数据(数组型):

import multiprocessing

def func(num):

num[2]=9999 #子进程改变数组,主进程跟着改变

if __name__=="__main__":

num=multiprocessing.Array("i",[1,2,3,4,5]) #主进程与子进程共享这个数组

print(num[:])

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num[:])

import multiprocessing

def func(num):

num[2]=9999 #子进程改变数组,主进程跟着改变

if __name__=="__main__":

num=multiprocessing.Array("i",[1,2,3,4,5]) #主进程与子进程共享这个数组

print(num[:])

p=multiprocessing.Process(target=func,args=(num,))

p.start()

p.join()

print(num[:])

进程之间共享数据(dict,list):

import multiprocessing

def func(mydict,mylist):

mydict["index1"]="aaaaaa" #子进程改变dict,主进程跟着改变

mydict["index2"]="bbbbbb"

mylist.append(11) #子进程改变List,主进程跟着改变

mylist.append(22)

mylist.append(33)

if __name__=="__main__":

with multiprocessing.Manager() as MG: #重命名

mydict=multiprocessing.Manager().dict() #主进程与子进程共享这个字典

mylist=multiprocessing.Manager().list(range(5)) #主进程与子进程共享这个List

p=multiprocessing.Process(target=func,args=(mydict,mylist))

p.start()

p.join()

print(mylist)

print(mydict)

import multiprocessing

def func(mydict,mylist):

mydict["index1"]="aaaaaa" #子进程改变dict,主进程跟着改变

mydict["index2"]="bbbbbb"

mylist.append(11) #子进程改变List,主进程跟着改变

mylist.append(22)

mylist.append(33)

if __name__=="__main__":

with multiprocessing.Manager() as MG: #重命名

mydict=multiprocessing.Manager().dict() #主进程与子进程共享这个字典

mylist=multiprocessing.Manager().list(range(5)) #主进程与子进程共享这个List

p=multiprocessing.Process(target=func,args=(mydict,mylist))

p.start()

p.join()

print(mylist)

print(mydict)

-----------------------------------------------------------------

Value、Array是通过共享内存的方式共享数据

Manager是通过共享进程的方式共享数据。

Value\Array

实例代码:

import multiprocessing

#Value/Array

def func1(a,arr):

a.value=3.14

for i in range(len(arr)):

arr[i]=-arr[i]

if __name__ == '__main__':

num=multiprocessing.Value('d',1.0)#num=0

arr=multiprocessing.Array('i',range(10))#arr=range(10)

p=multiprocessing.Process(target=func1,args=(num,arr))

p.start()

p.join()

print num.value

print arr[:]

import multiprocessing

#Value/Array

def func1(a,arr):

a.value=3.14

for i in range(len(arr)):

arr[i]=-arr[i]

if __name__ == '__main__':

num=multiprocessing.Value('d',1.0)#num=0

arr=multiprocessing.Array('i',range(10))#arr=range(10)

p=multiprocessing.Process(target=func1,args=(num,arr))

p.start()

p.join()

print num.value

print arr[:]

执行结果:

3.14

[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

1

2

3

3.14

[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

1

2

3

Manager管理的共享数据类型有:Value、Array、dict、list、Lock、Semaphore等等,同时Manager还可以共享类的实例对象。

实例代码:

from multiprocessing import Process,Manager

def func1(shareList,shareValue,shareDict,lock):

with lock:

shareValue.value+=1

shareDict[1]='1'

shareDict[2]='2'

for i in xrange(len(shareList)):

shareList[i]+=1

if __name__ == '__main__':

manager=Manager()

list1=manager.list([1,2,3,4,5])

dict1=manager.dict()

array1=manager.Array('i',range(10))

value1=manager.Value('i',1)

lock=manager.Lock()

proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)]

for p in proc:

p.start()

for p in proc:

p.join()

print list1

print dict1

print array1

print value1

from multiprocessing import Process,Manager

def func1(shareList,shareValue,shareDict,lock):

with lock:

shareValue.value+=1

shareDict[1]='1'

shareDict[2]='2'

for i in xrange(len(shareList)):

shareList[i]+=1

if __name__ == '__main__':

manager=Manager()

list1=manager.list([1,2,3,4,5])

dict1=manager.dict()

array1=manager.Array('i',range(10))

value1=manager.Value('i',1)

lock=manager.Lock()

proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)]

for p in proc:

p.start()

for p in proc:

p.join()

print list1

print dict1

print array1

print value1

执行结果:

[21, 22, 23, 24, 25]

{1: '1', 2: '2'}

array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

Value('i', 21)

[21, 22, 23, 24, 25]

{1: '1', 2: '2'}

array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

Value('i', 21)

通过Manager进程间共享实例对象:

from multiprocessing import Process,Value,Lock

from multiprocessing.managers import BaseManager

class Employee(object):

def __init__(self,name,salary):

self.name=name

self.salary=Value('i',salary)

def increase(self):

self.salary.value+=100

def getPay(self):

return self.name+':'+str(self.salary.value)

class MyManager(BaseManager):

pass

def Manager2():

m=MyManager()

m.start()

return m

MyManager.register('Employee',Employee)

def func1(em,lock):

with lock:

em.increase()

if __name__ == '__main__':

manager=Manager2()

em=manager.Employee('zhangsan',1000)

lock=Lock()

proces=[Process(target=func1,args=(em,lock))for i in xrange(10)]

for p in proces:

p.start()

for p in proces:

p.join()

print em.getPay()

from multiprocessing import Process,Value,Lock

from multiprocessing.managers import BaseManager

class Employee(object):

def __init__(self,name,salary):

self.name=name

self.salary=Value('i',salary)

def increase(self):

self.salary.value+=100

def getPay(self):

return self.name+':'+str(self.salary.value)

class MyManager(BaseManager):

pass

def Manager2():

m=MyManager()

m.start()

return m

MyManager.register('Employee',Employee)

def func1(em,lock):

with lock:

em.increase()

if __name__ == '__main__':

manager=Manager2()

em=manager.Employee('zhangsan',1000)

lock=Lock()

proces=[Process(target=func1,args=(em,lock))for i in xrange(10)]

for p in proces:

p.start()

for p in proces:

p.join()

print em.getPay()

资料来源:https://blog.csdn.net/houyanhua1/article/details/78236514

https://blog.csdn.net/houyanhua1/article/details/78244288

https://blog.csdn.net/lechunluo3/article/details/79005910

python 多进程共享变量manager_python 进程间共享数据 multiprocessing 通信问题 — Manager...相关推荐

  1. python进程间共享数据_python 进程间共享数据 (二)

    Python中进程间共享数据,除了基本的queue,pipe和value+array外,还提供了更高层次的封装.使用multiprocessing.Manager可以简单地使用这些高级接口. Mana ...

  2. windows核心编程之进程间共享数据

    有时候我们会遇到window进程间共享数据的需求,例如说我想知道系统当前有多少某个进程的实例. 我们能够在程序中定义一个全局变量.初始化为0.每当程序启动后就加1.当然我们我们能够借助第三方介质来储存 ...

  3. VC 利用DLL共享区间在进程间共享数据及进程间广播消息

    在进程间共享数据有很多种方法,剪贴板,映射文件等都可以实现,这里介绍用 DLL 的共享区间在进程间共享数据,及共享数据有变化时及时的反馈给各相关进程. 一.在DLL中设置共享区间 在DLL中是用数据段 ...

  4. DLL入门浅析(5)——使用DLL在进程间共享数据

    在Win16环境中,DLL的全局数据对每个载入它的进程来说都是相同的,因为所有的进程用的都收同一块地址空间:而在Win32环境中,情况却发生了变化,每个进程都有了它自己的地址空间,DLL函数中的代码所 ...

  5. 如何在进程间共享数据

    1.引言 在Windows程序中,各个进程之间常常需要交换数据,进行数据通讯.WIN32 API提供了许多函数使我们能够方便高效的进行进程间的通讯,通过这些函数我们可以控制不同进程间的数据交换,就如同 ...

  6. c++ 内存映射文件进程间共享数据

    int main(int argc, char *argv[])   {       //RecursiveDelete("C:\\20_128\\");       //Self ...

  7. 进程锁、事件、进程队列、进程间共享数据、生产者消费者模型

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 进程锁(Lock) 锁的基本概念 锁的基本用法 模拟12306抢票软件 信号量:Semaphone 概念 Semaphore ...

  8. android 共享数据,android进程间共享简单数据

    我们知道,在android中,保存简单的数据最方便的就是使用SharedPreferences,然而,SharedPreferences虽然说也可以设置成进程间共享数据,但是并不可靠(更致命的是,不同 ...

  9. python多进程间通信_Python 多进程编程之 进程间的通信(Queue)

    Python 多进程编程之 进程间的通信(Queue) 1,进程间通信 Process有时是需要通信的,操作系统提供了很多机制来实现进程之间的通信,而Queue就是其中的一个方法 ----这是操作系统 ...

最新文章

  1. 磁盘显示RAW要如何办啊
  2. 递归与非递归法实现链表相加 CC150 V5 2.5题 java版
  3. 自定义地图怎么做成html,自定义html为谷歌地图制作标记
  4. N76E003---看门狗
  5. 【阿里内推001期】听说你要做中台,阿里中台部门招Java开发
  6. react-navigation createBottomTabNavigator 刷新问题
  7. 转 markdown编写规则、语法
  8. 斐波那契数列基本性质
  9. vue---EleElement UI 表格导出功能
  10. Hadoop--基础知识点--4--hadoop集群-docker搭建
  11. 波形发生器电路的设计(实现正弦波、方波和三角波的输出)
  12. 安徽师范大学计算机学院在哪个校区,2021年安徽师范大学有几个校区,大一新生在哪个校区...
  13. 输入文字时自动带空格解决办法
  14. ArcGIS10.8下载及安装教程(附安装步骤)
  15. 服务器ftp日志文件在哪里,ftp服务器的日志在哪
  16. 机器视觉工程师前景如何,计算机视觉工程师薪资
  17. html中视屏音量大小,调整视频音量 编辑视频怎么调节视频中的音量大小/批量调节视频音量...
  18. 屏幕复制 android,一键扫出截图中文字!屏幕文字复制App
  19. 懒人必备 自动识别语音给视频添加字幕
  20. 如何设置网络投票制作投票链接售价多少钱平台投票

热门文章

  1. 对PostgreSQL中 index only scan 的初步理解
  2. const的用法,特别是用在函数前面与后面的区别
  3. linux设备模型之Class
  4. linux下怎么编译贪吃蛇,Linux 环境下C语言编译实现贪吃蛇游戏(转载)
  5. 设计模式--命令(Command)模式
  6. 剑指 Offer 52. 两个链表的第一个公共节点(C语言)
  7. 剑指 Offer 06. 从尾到头打印链表(C语言)
  8. 13-Introduction to security
  9. 布隆过滤器Redis缓存穿透雪崩击穿热点key
  10. IDA分析shellcode导入windows结构体