python3 进程

1.开进程的两种方式:

1. 使用内置的进程

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/17from multiprocessing import Process
import osdef get_id(name):print(name,"Main process:",os.getppid(),"current process;", os.getpid())P1 = Process(target=get_id, args=('andy',))
P2 = Process(target = get_id, args=("Jack", ))if __name__ == "__main__":P2.start()P1.start()print("主进程")

2. 自定义进程类:

from multiprocessing import Process
import osclass Custom_Process(Process):def __init__(self, name):super().__init__()self.name = namedef run(self):print(self.name, "Main process:", os.getppid(), "current process;", os.getpid())if __name__ == "__main__":P1 = Custom_Process('andy')P2 = Custom_Process("jack")P1.start()P2.start()print("主进程")

事实上在调用P1.start时,系统调用了Process类的run方法,在我们直接调用Process类时,

我们需要指定target(即要进行的操作,参数args),那么定制后我们重写了run方法,即重写的

run方法。

在Custom_Process类中我用到了

super().__init__()

这是重写父类的方法之一,另一种方法是:

Parent.__init__(self)

在这里就是:Process.__init__()

关于super().__init__()事实上并不是调用父类,而是寻找继承顺序中的下一个

具体可以参考:Python’s super() considered super!

下面是一个应用进程的例子,之前在写 cs模型    时有:

server.listen(5)# 设置可以接受的连接数量

虽然这里可以接受5个链接,但事实上由于功能上并未实现

所以每次只有一个链接可以正常进行通信,其他的链接都必须

等到之前的链接完成才行。

下面着手改进:

server

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/16import socket,json, struct, subprocess
from multiprocessing import ProcessBUFF_SIZE = 1024
IP_PORT = ("127.0.0.1", 8081)server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)# 重用端口
server.bind(IP_PORT)
server.listen(5)# 设置可以接受的连接数量def communicate(conn, client_addr):while True:# 内层循环为通信循环msg = conn.recv(BUFF_SIZE)if not msg:breakpipes = subprocess.Popen(msg.decode("utf-8"),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)error = pipes.stderr.read()if error:print("Error:",error)response_msg = errorelse:response_msg = pipes.stdout.read()header = {'data_size':len(response_msg)}# 数据长度header_json = json.dumps(header)#序列化header_json_byte = bytes(header_json,encoding="utf-8")conn.send(struct.pack('i',len(header_json_byte)))#先发送报头长度,仅包含数据长度, 这里的i指int类型conn.send(header_json_byte)# 再发送报头conn.sendall(response_msg)# 正式的信息print("Request from:",client_addr, "Command:",msg)conn.close()
if __name__ == "__main__":while True:# 外层循环为链接循环conn, client_addr = server.accept()p = Process(target=communicate, args=(conn, client_addr))p.start()server.close()

client未变:

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/16import socket, json, structBUFF_SIZE = 1024
IP_PORT = ("127.0.0.1", 8081)client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(IP_PORT)while True:msg = input(">>:").strip().encode("utf-8")if not msg:breakclient.send(msg)header = client.recv(4)print("Header:",struct.unpack("i", header))header_length = struct.unpack('i', header)[0]print("Header_length:", header_length)header_json = json.loads(client.recv(header_length).decode("utf-8"))data_size = header_json['data_size']print("Data_size:",data_size)recv_size = 0recv_data = b''while recv_size < data_size:recv_data += client.recv(BUFF_SIZE)recv_size += len(recv_data)print(recv_data.decode("gbk"))client.close()

看下运行结果,这里只开了两个客户端,5个同样的道理:

2.LOCK 互斥锁:

import os, time
from multiprocessing import Process, Lockdef work(mutex):mutex.acquire()print("%d is working..." % os.getpid())time.sleep(2)print("%d is done!" % os.getpid())mutex.release()if __name__ == "__main__":mutex = Lock()p1 = Process(target=work, args=(mutex,))p2 = Process(target=work, args=(mutex,))p1.start()p2.start()

模拟 抢票系统:所有人都可以查看到还剩下多票,但是只有部分人能抢到票。

import json, random, time, os
from multiprocessing import Process , Lockdef search():dic = json.load(open('db.txt',))print("%s查询,车票剩余%s" % (os.getpid(),dic['count']))def get_ticket():dic = json.load(open('db.txt',))if dic['count'] > 0:dic['count'] -= 1time.sleep(random.randint(1,4))json.dump(dic,open('db.txt', 'w'))print('%s 购买成功' % os.getpid())print("车票剩下%s" % dic["count"])else:print("%s抢票失败 " % os.getpid())def task(mutex):search()mutex.acquire()get_ticket()mutex.release()if __name__ == "__main__":mutex = Lock()for i in range(10):p = Process(target=task, args=(mutex,))p.start()

3.Join

1.join方法的作用是阻塞主进程(挡住,无法执行join以后的语句),专注执行多线程。

2.多线程多join的情况下,依次执行各线程的join方法,前头一个结束了才能执行后面一个。

3.无参数,则等待到该线程结束,才开始执行下一个线程的join。

4.设置参数后,则等待该线程这么长时间就不管它了(而该线程并没有结束)。不管的意思就是可以执行后面的主进程了。

看例子:

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/19import os, time
from multiprocessing import Process, Lockdef work(mutex, t):mutex.acquire()print("%s Running at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))time.sleep(t)mutex.release()print("%s Stop at %s\n" % (os.getpid(),time.strftime("%H:%M:%S")))if __name__ == "__main__":print("Main Process Running at %s\n" % time.strftime("%H:%M:%S"))mutex = Lock()p1 = Process(target=work, args=(mutex,5))p2 = Process(target=work, args=(mutex,3))p1.start()p2.start()p1.join()print("Join1 finish at %s!\n" % time.strftime("%H:%M:%S"))p2.join()print("Join2 finish at %s!\n" % time.strftime("%H:%M:%S"))print("Main Process Stop at %s\n" % time.strftime("%H:%M:%S"))

此时没有指定join的时长,所以,第一个进程执行完了,第一个join也相应的结束了,

然后第二个进程执行完了,第二个join也结束了。

当指定时间后分两种情况,当join的时间比进程需要执行的时间短时,它就不再等待该进行,直接执行

将join()修改为p2.join(2)

将p2.join()修改为p2.join(2)

可以看到,进程4768还未执行完时,join1等待2秒后直接不管它了,执行了后面的打印语句

接着执行了join2,等待2秒后,主进程自己结束了自己(这里应该是打印语句的原因,事实上并未直接的结束)

此时4768仍在运行,直到自己结束。然后才是进程11108

如果我将时间设置得比它需要的时间还长呢,那么它应该在进程运行完时也结束

将P1.join()修改为p1.join(6)

将p2.join()修改为p2.join(4)

可以看到Join1,join2都是在两个进行结束后自己结束了,并没有等待设定的时间长度。

4.Daemon 守护进程

守护进程的作用:

一:守护进程会在主进程代码执行结束后就终止

二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

下面看例子:

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/19from multiprocessing import Process
import timedef work():print("Running...")time.sleep(3)print("Finish!")if __name__ == "__main__":p = Process(target=work,)#p.daemon = Truep.start()print("Main finish!")

运行结果:

将#p.daemon=True注释掉,再运行:

可以看到,主进程结束了,子进程也结束了, 并不会等待它运行完。

守护进程为什么在主进程结束后就结束了呢?

首先,我们要明白守护进程的作用:守护主进程的一些功能,当主进程执行完了,

也就是说它的功能已经全部执行完了,那么,守护进程也就没有继续守护下去的

必要了,所以一旦主进程结束了,守护进程也就结束了。

5.Semaphore 信号量

Semaphore制对共享资源的访问数量,比如可以同时运行的子进程数量:

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/19import multiprocessing
import timedef worker(s):s.acquire()print(multiprocessing.current_process().name + "acquire");time.sleep(2)print(multiprocessing.current_process().name + "release\n");s.release()if __name__ == "__main__":s = multiprocessing.Semaphore(2)for i in range(5):p = multiprocessing.Process(target = worker, args=(s,))p.start()

如上, 只有释放一个进程才有新的进程进来

将信号量改成大于等于进程数:

s = multiprocessing.Semaphore(5)

可以看到,所有进程一下全部启动了。

进程间通信有一个人种方式,一种是队列,一种是管道

6.队列

下面演示在一个进程中往队列中传入数据,用另一个进程取出来:

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/19import random,os
from multiprocessing import Queue,Processdef put_q(q):print("Put...")for i in range(5):n = random.randint(1,5)print(n)q.put(n)def get_q(q):print("\nGet...")while True:if not q.empty():print("%s" % os.getpid(),q.get())else:breakif __name__ == "__main__":q = Queue(8)p1 = Process(target=put_q,args=(q,))p2 = Process(target=get_q,args=(q,))p1.start()p1.join() # 防止进程2先启动,队列为空p2.start()

这样就实现了进程间的通信

7.管道

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,
如果duplex参数为True(默认值),那么这个管道是全双工模式,
也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,
可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,
recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
事实上,管道的应用与上面的队列基本一致,对上面的代码稍作修改:
#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/19import random,os
from multiprocessing import Pipe,Processdef send_p(p):print("send...")for i in range(5):n = random.randint(1,5)print(n)p.send(n)def receive_p(p):print("\nReceive...")while True:print("%s" % os.getpid(),p.recv())if __name__ == "__main__":p = Pipe()p1 = Process(target=send_p,args=(p[0],))p2 = Process(target=receive_p,args=(p[1],))p1.start()p1.join() p2.start()

运行:

8.Pool 进程池

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,

如果池还没有满,那么就会创建一个新的进程用来执行该请求;

但如果池中的进程数已经达到规定最大值,那么该请求就会等待,

直到池中有进程结束,才会创建新的进程来它。

看例子:

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/20import time
from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,))pool.close()pool.join()# 阻塞主进程,等待子进程执行完

运行:

指定进程池只有3个进程,所以第四个进程只有前面结束一个进程时才能开始。

需要说明的是 pool.apply_async()是非阻塞的,pool.apply()则是阻塞的。看区别:

修改:

pool.apply(work,(msg,))

再次运行:

可以看到,子进程只能结束一个后都会运行下一个进程

回调函数:

回调函数指:进程池中任何一个任务一旦处理完了,就立即告知主进程:

我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果。

对上面的例子进行修改:

#!/usr/bin/env python
#coding:utf-8
#Created by Andy @ 2017/9/20import time, os
from multiprocessing import Pool, Processdef work(msg):print(msg, 'is working\n')time.sleep(2)print(msg,'finish!\n')return msgdef plus(msg):if msg:msg = msg + '*plus*'print(msg)if __name__ == "__main__":pro = Process()pool = Pool(processes=3)for i in range(1,6):msg = "process %s" % ipool.apply_async(work,(msg,), callback=plus)# 回调函数pool.close()pool.join()

运行:

可以看到一个进程结果后,在开启一个新的进程到进程池后,

主进程又调用一个回调函数对该进程的结果进行了二次处理。

补充:

对于计算机来说,也并不能无限开启进程,通常比较好的情况是

进程数等于计算机核数是比较好的,否则开多了可能会起到反作用

那么要怎么查看自己的计算机是几核的呢?

posted on 2017-09-17 13:49 Andy_963 阅读(...) 评论(...) 编辑 收藏

转载于:https://www.cnblogs.com/Andy963/p/7535378.html

python3 进程相关推荐

  1. python3 进程池中使用队列Queue

    主要是这两行代码: m = multiprocessing.Manager() self.queue = m.Queue() 描述如下: Manager()返回的管理器对象控制一个服务器进程,该进程保 ...

  2. python3,进程间的通信

    本文来源于python 3.5版本的官方文档 multiprocessing模块为进程间通信提供了两种方法: 1.进程队列queue The Queue class is a near clone o ...

  3. python3 进程池Pool 详解

    进程池Pool 当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到mult ...

  4. python3 进程锁

    多进程抢占资源 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题. import osimport timeimport randomfrom multiprocessing imp ...

  5. python进程间通信时间开销_python 进程间的通信

    python3,进程间的通信 本文来源于python 3.5版本的官方文档 multiprocessing模块为进程间通信提供了两种方法: 1.进程队列queue The Queue class is ...

  6. python的进程池map函数_python进程池map

    python进程池怎么实现 当进程池中任务队列非空时,才会触发worker进程去工作,那么如何向进程池中的任务队列中添加任务呢,进程池类有两组关键方法来创建任务,分别是apply/apply_asyn ...

  7. python 进程池不足,解决Python 进程池Pool中一些坑

    1 from multiprocessing import Pool,Queue. 其中Queue在Pool中不起作用,具体原因未明. 解决方案: 如果要用Pool创建进程,就需要使用multipro ...

  8. python 命名管道_Python:检查命名管道是否有数据

    我的Unix系统上的 Python3进程一直在运行,我希望能够通过偶尔运行的其他进程的命名管道随机发送数据.如果命名管道没有数据,我希望我的进程继续做其他事情,所以我需要检查它是否有没有阻塞的数据. ...

  9. pytorch指定用多张显卡训练_Pytorch多GPU训练

    Pytorch多GPU训练 临近放假, 服务器上的GPU好多空闲, 博主顺便研究了一下如何用多卡同时训练 原理 多卡训练的基本过程 首先把模型加载到一个主设备 把模型只读复制到多个设备 把大的batc ...

最新文章

  1. 【C 语言】数组 ( 数组取值操作 | array[i][j] 用法 等价于 *( *(array = i) + j ) 用法 | 下标操作到指针操作演化过程 )
  2. RxJava 参考文档
  3. Laravel 错误处理
  4. winform分页案例简单实现方式~
  5. LeetCode 02.两数相加
  6. Python批量Excel文件数据导入SQLite数据库的优化方案
  7. Eigen3卸载与重装
  8. 辨异 —— 概率与统计
  9. 将您的Apple ID更改为其他电子邮件地址的方法
  10. Atitit it软件领域职称评级规定,广博方向。 目录 1. 软件工程师资格证 1 1.1. 法规规范 十大标准,三级五晋制。 1 1.2. 组织架构 域职称评级委员会 2 1.3. 人员职责流程表
  11. 数据库系统概论(第5版)
  12. 结构梁配筋最牛插件_Revit结构出图案例分享
  13. Excel-快捷键的使用方法
  14. 区块链开发主流编程语言居然是Go语言
  15. html5读取加速度传感器,一文读懂加速度传感器
  16. 国家新标准上线,妈妈再也不用担心我吃辣条啦!
  17. 醉袖迎风受落花——好代码的10条认知
  18. count在python中是什么意思_python count返回什么
  19. Foundry 中文文档发布啦
  20. 新出炉彩色游戏——炸弹战争1.0版

热门文章

  1. ASP.NET Core重写个人博客站点小结
  2. Sublime Text开发Quick-Cocos2d-x环境搭建(Mac)
  3. vimrc for windows
  4. Got error: 1449: The user specified as a definer ('root'@'%') does not exist when using LOCK TAB
  5. 第一篇 著名函数之单值函数
  6. DPDK 应用层对网卡进行配置(二十四)
  7. leetcode算法题--删除链表的倒数第N个节点
  8. leetcode算法题--比特位计数
  9. linux系统读取第二个盘的数据,磁盘及文件系统管理—第二篇
  10. 对接钉钉审批_低代码对接钉钉创建外部联系人