一、数据共享

1.进程间的通信应该尽量避免共享数据的方式

2.进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的。

虽然进程间数据独立,但可以用过Manager实现数据共享,事实上Manager的功能远不止于此。

1 命令就是一个程序,按回车就会执行(这个只是在windows情况下)

2 tasklist 查看进程

3 tasklist | findstr pycharm #(findstr是进行过滤的),|就是管道(tasklist执行的内容就放到管道里面了,

4 管道后面的findstr pycharm就接收了)

3.(IPC)进程之间的通信有两种实现方式:管道和队列

1 from multiprocessing import Manager,Process,Lock

2 def work(dic,mutex):

3 # mutex.acquire()

4 # dic['count']-=1

5 # mutex.release()

6 # 也可以这样加锁

7 with mutex:

8 dic['count'] -= 1

9 if __name__ == '__main__':

10 mutex = Lock()

11 m = Manager() #实现共享,由于字典是共享的字典,所以得加个锁

12 share_dic = m.dict({'count':100})

13 p_l = []

14 for i in range(100):

15 p = Process(target=work,args=(share_dic,mutex))

16 p_l.append(p) #先添加进去

17 p.start()

18 for i in p_l:

19 i.join()

20 print(share_dic)

21 # 共享就意味着会有竞争,

22

23数据共享 View Code

二、进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:很明显需要并发执行的任务通常要远大于核数

一个操作系统不可能无限开启进程,通常有几个核就开几个进程

进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

那么什么是进程池呢?进程池就是控制进程数目

1 ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

进程池的结构:

创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

1.创建进程池

1 Pool([numprocess [,initializer [, initargs]]]):创建进程池

2.参数介绍

1 numprocess:要创建的进程数,如果省略,将默认为cpu_count()的值,可os.cpu_count()查看

2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None

3 initargs:是要传给initializer的参数组

3.方法介绍

1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行

2 func(*args,**kwargs),然后返回结果。

3 需要强调的是:此操作并不会在所有池工作进程中并执行func函数。

4 如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()

5 函数或者使用p.apply_async()

6

7

8 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,

9 callback是可调用对象,接收输入参数。当func的结果变为可用时,

10 将理解传递给callback。callback禁止执行任何阻塞操作,

11 否则将接收其他异步操作中的结果。

12

13 p.close():关闭进程池,防止进一步操作。禁止往进程池内在添加任务(需要注意的是一定要写在close()的上方)

14

1 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

应用1:

1 from multiprocessing import Pool

2 import os,time

3 def task(n):

4 print('[%s] is running'%os.getpid())

5 time.sleep(2)

6 print('[%s] is done'%os.getpid())

7 return n**2

8 if __name__ == '__main__':

9 # print(os.cpu_count()) #查看cpu个数

10 p = Pool(4) #最大四个进程

11 for i in range(1,7):#开7个任务

12 res = p.apply(task,args=(i,)) #同步的,等着一个运行完才执行另一个

13 print('本次任务的结束:%s'%res)

14 p.close()#禁止往进程池内在添加任务

15 p.join() #在等进程池

16 print('主')apply同步进程池(阻塞)(串行) View Code

1 # ----------------

2 # 那么我们为什么要用进程池呢?这是因为进程池使用来控制进程数目的,

3 # 我们需要几个就开几个进程。如果不用进程池实现并发的话,会开很多的进程

4 # 如果你开的进程特别多,那么你的机器就会很卡,所以我们把进程控制好,用几个就

5 # 开几个,也不会太占用内存

6 from multiprocessing import Pool

7 import os,time

8 def walk(n):

9 print('task[%s] running...'%os.getpid())

10 time.sleep(3)

11 return n**2

12 if __name__ == '__main__':

13 p = Pool(4)

14 res_obj_l = []

15 for i in range(10):

16 res = p.apply_async(walk,args=(i,))

17 # print(res) #打印出来的是对象

18 res_obj_l.append(res) #那么现在拿到的是一个列表,怎么得到值呢?我们用个.get方法

19 p.close() #禁止往进程池里添加任务

20 p.join()

21 # print(res_obj_l)

22 print([obj.get() for obj in res_obj_l]) #这样就得到了

23apply_async异步进程池(非阻塞)(并行) View Code

那么什么是同步,什么是异步呢?

同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去

异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。

什么是串行,什么是并行呢?

举例:能并排开几辆车的就可以说是“并行”,只能一辆一辆开的就属于“串行”了。很明显,并行的速度要比串行的快得多。(并行互不影响,串行的等着一个完了才能接着另一个)

应用2:

使用进程池维护固定数目的进程(以前客户端和服务端的改进

1 from socket import *

2 from multiprocessing import Pool

3 s = socket(AF_INET,SOCK_STREAM)

4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用

5 s.bind(('127.0.0.1',8081))

6 s.listen(5)

7 print('start running...')

8 def talk(coon,addr):

9 while True: # 通信循环

10 try:

11 cmd = coon.recv(1024)

12 print(cmd.decode('utf-8'))

13 if not cmd: break

14 coon.send(cmd.upper())

15 print('发送的是%s'%cmd.upper().decode('utf-8'))

16 except Exception:

17 break

18 coon.close()

19 if __name__ == '__main__':

20 p = Pool(4)

21 while True:#链接循环

22 coon,addr = s.accept()

23 print(coon,addr)

24 p.apply_async(talk,args=(coon,addr))

25 s.close()

26 #因为是循环,所以就不用p.join了

27

28服务端 View Code

1 from socket import *

2 c = socket(AF_INET,SOCK_STREAM)

3 c.connect(('127.0.0.1',8081))

4 while True:

5 cmd = input('>>:').strip()

6 if not cmd:continue

7 c.send(cmd.encode('utf-8'))

8 data = c.recv(1024)

9 print('接受的是%s'%data.decode('utf-8'))

10 c.close()

11客户端 View Code

三、回调函数

1 回调函数什么时候用?(回调函数在爬虫中最常用)

2 造数据的非常耗时

3 处理数据的时候不耗时

4

5 你下载的地址如果完成了,就自动提醒让主进程解析

6 谁要是好了就通知解析函数去解析(回调函数的强大之处)

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

1 from multiprocessing import Pool

2 import requests

3 import os

4 import time

5 def get_page(url):

6 print('<%s> is getting [%s]' %(os.getpid(),url))

7 response = requests.get(url) #得到地址

8 time.sleep(2)

9 print('<%s> is done [%s]'%(os.getpid(),url))

10 return {'url':url,'text':response.text}

11 def parse_page(res):

12 '''解析函数'''

13 print('<%s> parse [%s]'%(os.getpid(),res['url']))

14 with open('db.txt','a') as f:

15 parse_res = 'url:%s size:%s\n' %(res['url'],len(res['text']))

16 f.write(parse_res)

17 if __name__ == '__main__':

18 p = Pool(4)

19 urls = [

20 'https://www.baidu.com',

21 'http://www.openstack.org',

22 'https://www.python.org',

23 'https://help.github.com/',

24 'http://www.sina.com.cn/'

25 ]

26 for url in urls:

27 obj = p.apply_async(get_page,args=(url,),callback=parse_page)

28 p.close()

29 p.join()

30 print('主',os.getpid()) #都不用.get()方法了

31

32回调函数(下载网页的小例子) View Code

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

1 from multiprocessing import Pool

2 import requests

3 import os

4 def get_page(url):

5 print('<%os> get [%s]' %(os.getpid(),url))

6 response = requests.get(url) #得到地址 response响应

7 return {'url':url,'text':response.text}

8 if __name__ == '__main__':

9 p = Pool(4)

10 urls = [

11 'https://www.baidu.com',

12 'http://www.openstack.org',

13 'https://www.python.org',

14 'https://help.github.com/',

15 'http://www.sina.com.cn/'

16 ]

17 obj_l= []

18 for url in urls:

19 obj = p.apply_async(get_page,args=(url,))

20 obj_l.append(obj)

21 p.close()

22 p.join()

23 print([obj.get() for obj in obj_l])

24

25下载网页小例子(无需回调函数) View Code

归类:网络编程socket

python并发处理list数据_python并发编程之多进程2--------数据共享及进程池和回调函数...相关推荐

  1. python互斥锁原理_python并发编程之多进程1------互斥锁与进程间的通信

    一.互斥锁 进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理. 注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行 ...

  2. python并发处理同一个文件_python并发编程(并发与并行,同步和异步,阻塞与非阻塞)...

    最近在学python的网络编程,学会了socket通信,并利用socket实现了一个具有用户验证功能,可以上传下载文件.可以实现命令行功能,创建和删除文件夹,可以实现的断点续传等功能的FTP服务器.但 ...

  3. Python之路 34:并发与并行、锁(GIL、同步锁、死锁与递归锁)、信号量、线程队列、生消模型、进程(基础使用、进程通信、进程池、回调函数)、协程

    内容: 同步锁 死锁.递归锁 信号量和同步对象(暂时了解即可) 队列------生产者和消费者模型 进程(基础使用.进程通信.进程池.回调函数) 协程 一.并发并行与同步异步的概念 1.1.并发和并行 ...

  4. python 多进程并发_python并发编程之多进程

    一 multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程.P ...

  5. python线程通信 消息传递_Python并发编程之线程消息通信机制/任务协调(四)

    大家好,并发编程进入第四篇. 本文目录 前言 Event事件 Condition Queue队列 总结 .前言 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非 ...

  6. python 多进程并发 阻塞_python并发编程之多进程理论部分

    一 什么是进程 进程:正在进行的一个过程或者说一个任务.而负责执行任务则是cpu. 举例(单核+多道,实现多个进程的并发执行): egon在一个时间段内有很多任务要做:python备课的任务,写书的任 ...

  7. python 多线程 数据库死锁_python并发编程之多线程2死锁与递归锁,信号量等

    一.死锁现象与递归锁 进程也是有死锁的 所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用, 这些永远在互相等待的进程称为死锁进程 如下就是死锁 ...

  8. python 异步io框架_Python并发编程之学习异步IO框架:asyncio 中篇(十)

    大家好,并发编程 进入第十章. 好了,今天的内容其实还挺多的,我准备了三天,到今天才整理完毕.希望大家看完,有所收获的,能给小明一个赞.这就是对小明最大的鼓励了. 为了更好地衔接这一节,我们先来回顾一 ...

  9. Python之进程+线程+协程(进程间通信、进程同步、进程池、回调函数)

    文章目录 进程间通信 进程同步 进程池 回调函数 本篇文章依然是进程.线程方面的内容,主要讲进程间的通信.进程队列.进程同步.进程池.进程同步和回调函数 进程间通信 进程就是两个独立的内存空间在运行, ...

最新文章

  1. 【Python学习系列二十六】networkx库图最短路径求解
  2. python opencv cv2.VideoCapture(),read(),waitKey()的使用 ret,frame参数
  3. 在c++代码中关闭和启动另外一个pid进程号,共享内存数据使用
  4. java中间==、equals和hashCode差额
  5. Appium国内下载地址
  6. JS layer时间组件laydate的回调中重置清除选择无效的问题
  7. 配电室智能监控系统设计及实现分析-Susie 周
  8. gcj编译java_怎样用gcj编译java程序
  9. albedo diffuse specular
  10. ISO8583报文128个域说明
  11. jQuery append( ) 方法
  12. 商品后台管理系统(项目一)
  13. 2020热门编程语言,总有一款适合你【云图智联】
  14. matlab与amesim,amesim与matlab联合仿真步骤(自己总结)(精)
  15. ICPC 2022 西安
  16. 新闻!牛磨王抗磨网发布超燃猪年贺岁词 | “绿多多”绿色资产资讯
  17. 打游戏适合用什么蓝牙耳机?低延迟蓝牙耳机推荐
  18. 全面解析均衡效果器之一:EQ原理
  19. X-Analyser 总线分析软件:CANopen、1939解析、UDS诊断、NMEA2000 协议解析、DBC文件解析、仿真工具、CAN报文分析、仿CANoe曲线显示 CAN仪表模拟器
  20. Nginx-域名跳转到另外一个域名

热门文章

  1. 物料分类账业务配置及操作手册
  2. sap-生产订单的成本理解
  3. ABAP Memory/SAP Memory/Shared Buffer/Database
  4. 盯紧那群养生的年轻人,他们的焦虑值300亿
  5. 根目录_Linux Shell从入门到删除根目录跑路指南
  6. 验证码生成java_JAVA-验证码生成
  7. angularjs http和ajax,AngularJS $ http和$ resource
  8. mysql优化说出九条_技术分享 | MySQL 优化:为什么 SQL 走索引还那么慢?
  9. numpy 图片填充_numpy/python中的洪水填充分割图像
  10. linux多进程网络实例,Linux下一个单进程并发服务器的实例 使用select