1 面向过程启动多进程

Python 操作进程的类都定义在 multiprocessing 模块,该模块提供了一个 Process 类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。

import time
import multiprocessing  def test1():while True:print("--1--")time.sleep(1)def test2():while True:print("--2--")time.sleep(1)def main():p1 = multiprocessing.Process(target=test1)p2 = multiprocessing.Process(target=test2)p1.start()p2.start()# Windows 操作系统下,创建进程一定要在 main 内创建
if __name__ == '__main__':main()

默认情况下我们需要传入一个 target 参数。traget 接收一个函数,就是我们开启进程时会执行的函数。

在创建进程后,我们需要调用 start 方法开启我们的进程。这个时候进程才真正运行了。

为什么在 Windows 下,一定要写在 __main__ 里面,因为在 Windows 下,子进程是用 import 的方式将主进程的代码拿过来,如果不写在 __main__ 下面,会导致重复的创建进程。如果大家是在 Linux 或者 Mac 下就没有这个问题,Linux 下是通过 fork 的方式来完成的子进程的创建。

fork 调用将生成一个子进程,所以这个函数会在父子进程同时返回。在父进程的返回结果是一个整数值,这个值是子进程的进程号,父进程可以使用该进程号来控制子进程的运行。fork 在子进程的返回结果是零。如果 fork 返回值小于零,一般意味着操作系统资源不足,无法创建进程。

pid = os.fork()
if pid > 0:# in parent process
if pid == 0:# in child process
if pid < 0:# fork error

子进程创建后,父进程拥有的很多操作系统资源,子进程也会持有。比如套接字和文件描述符,它们本质上都是对操作系统内核对象的一个引用。如果子进程不需要某些引用,一定要即时关闭它,避免操作系统资源得不到释放导致资源泄露。

大家如果对 Linux 比较熟悉,可以自己运行一下这个代码。

import os
import timepid = os.fork()  # 子进程会从 fork 之后来运行print("wohu")
if pid == 0:print("子进程:{},父进程:{}".format(os.getpid(),os.getppid()))
else:print("我是父进程:{}".format(os.getpid()))time.sleep(2)

输出结果:

wohu
子进程:16542,父进程:16541
wohu
我是父进程:16541

1.1 带参数的进程

from multiprocessing import Processdef func(x):print(x , "进程运行了")if __name__ == '__main__':p1 = Process(target=func, args=(1, ))p1.start()

我们给 Process 传入了一个 args 参数,参数的内容是一个元组,但是我只有一个元素。这个时候需要记住必须要加一个逗号。

1.2 join() 等待所有子进程执行完

join 方法的作用是等待进程结束,我们在代码中看看效果:

import time
from multiprocessing import Processdef func(x):for i in range(3):time.sleep(0.5)print(x, i)if __name__ == '__main__':p1 = Process(target=func, args=("123", ))p2 = Process(target=func, args=("456", ))p1.start()p1.join()p2.start()

func 中,我们接收一个 x 参数用于区分进程。在函数内部我们循环执行 sleep 并输出内容。下面是运行效果:

123 0
123 1
123 2
456 0
456 1
456 2

可以看到 p2p1 完全执行完后才开始执行。这是因为我们在 p2 执行开始前调用了 p1.join,而 join 后的代码都会等 p1 执行完后才执行。

import time
from multiprocessing import Processdef func(x):for i in range(5):time.sleep(0.5)print(x, i)if __name__ == '__main__':pl = []for i in range(5):p = Process(target=func, args=(str(i)+" wohu", ))pl.append(p)p.start()for p in pl:p.join()print("所有进程都执行完了")

我们使用了一个列表把所有进程都装了进去,然后在所有进程开启后再依次 join,这样我们就可以在实现多进程的同时还能等待所有进程执行完后再执行一些操作。下面是运行效果:

0 wohu 0
0 wohu 1
0 wohu 2
1 wohu 0
1 wohu 1
1 wohu 2
2 wohu 0
2 wohu 1
2 wohu 2
所有进程都执行完了

2. 面向对象启动多进程

除了创建 Process 类外,我们还可以继承 Process 来实现多进程,操作和之前区别不大,我们下定义一个类,进程 Process

from multiprocessing import Process
import timeclass MyProcess(Process):def __init__(self, arg):super().__init__()self.arg = argdef run(self):print("hello")print(self.arg, "执行了进程")time.sleep(1)print("world")if __name__ == '__main__':p = MyProcess()p.start()print("主")

我们创建了一个 MyProcess 类,然后继承了 Process 类,并实现了 run 方法。

实现了 init 方法,然后调用父类的 init 方法初始化 Process 类的参数。然后我们在自己的 init 方法中,传入了一个 arg 参数。当然我们为了更通用,可以再改造一下:

from multiprocessing import Processclass MyProcess(Process):def __init__(self, name, *args, **kwargs):# 初始化 Process 的参数super().__init__(*args, **kwargs)self.name = namedef run(self):print(self.name, "执行了进程")if __name__ == '__main__':p1 = MyProcess("wohu")p1.start()

这样我们可以传入自己添加的参数,同时也能传入 Process 自己的参数。

3. 守护进程

from multiprocessing import Process
import timedef foo():print("foo")time.sleep(1)print("end foo")def bar():print("bar")time.sleep(3)print("end bar")p1 = Process(target=foo)
p2 = Process(target=bar)p1.daemon = True  # 将 p1 设置为守护进程
p1.start()
p2.start()
print("------main-------")    # 打印该行则主进程代码结束,则守护进程 p1 应该被终止

输出结果

------main-------
bar
end bar

守护进程在主进程结束的时候就结束,那守护进程有什么作用呢?
这里给大家说一个守护进程的作用——程序的报活。
假如我们监控很多台服务器的运行状态,我们是让服务器自己告诉监控服务器的状态,还是监控服务器去询问服务器的状态呢?通常我们会主动上报自己的状态,这时候就可以通过守护进程来做。

  • 主进程:完成自己的业务逻辑
  • 守护进程:每隔五分钟就向一台机器汇报自己的状态

4. 进程间通信

线程间通信可以全局变量。那进程间的通信也可以通过全局变量吗?让我们来测试一下。

import multiprocessinga = 1def demo1():global aa += 1print(a)   def demo2():print(a)if __name__ == '__main__':d1 = multiprocessing.Process(target=demo1)d2 = multiprocessing.Process(target=demo2)d1.start()d2.start()

输出结果:

2
1

我们可以发现进程间的通信并不能通过全局变量。那两个进程间互相通信要通过什么呢? 需要通过 Queue

常用的 Queue 方法如下:

from multiprocessing import Queue# 创建对象 队列 最多可接收三条数据  如果不写最大看电脑内存
q = Queue(3)# 存数据
q.put(3)
q.put("1")
q.put([11,22])
q.put("2")              # 此时会发生什么? 程序阻塞# 取数据
print(q.get())          # 3
print(q.get())          # "1"
print(q.get())          # [11,22]
print(q.get())          # 此时会发生什么? 程序阻塞q.get_nowait()          # 通过异常告诉你没有了q.full()                # 判断是否为满
q.empty()               # 判断是否为空

接下来我们用队列完成进程间的通信。

import multiprocessingdef download(q):""" 下载数据 """# 模拟从网上下载的数据lis = [11,22,33,44]for item in lis:q.put(item) print("下载器已经下载完成,并且保存到队列中")def analysis(q):""" 数据处理 """analysis_data = list()while True:data = q.get()analysis_data.append(data)if q.empty():break# 模拟数据处理print(analysis_data)def main():# 创建一个队列 q = multiprocessing.Queue()# 创建多个进程,将队列的引用当做参数传递进去t1 = multiprocessing.Process(target=download,args=(q,))t2 = multiprocessing.Process(target=analysis,args=(q,))t1.start()t2.start()if __name__ == '__main__':main()

5. 进程池

当我们需要创建大量进程的时候,进程池可以节省我们的工作量,创建进程池的方式有两种,

  • 一种是 multiprocessing 模块提供的 Pool 方法,
  • 还有一种是 concurrent.futures 中的 ProcessPoolExecutor,

两者皆可以时间进程池,下面我们来举例看看。

from multiprocessing import Pool
import os,time,randomdef worker(msg):t_start = time.time()print("%s START...PROCESS,%d"%(msg,os.getpid()))time.sleep(random.random()*10)t_stop = time.time()print(msg,"END,time:%0.2f"%(t_stop-t_start))p=Pool(5) #定义一个进程池,最大进程数5
for i in range(0,20):#每次循环将会用空闲出来的子进程去调用目标p.apply_async(worker,(i,))p.close() #关闭进程池,不再接收请求
p.join() #等待p中所有子进程执行完成

multiprocessing.Pool 常用函数解析:

  • apply_async(func[, args[, kwds]]) :非阻塞方式调用 func
  • apply(func[, args[, kwds]]) :阻塞方式调用 func
  • close() :关闭 Process 对象,释放与之关联的所有资源
  • terminate() :立即终止进程
  • join() :阻塞主进程

当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态生成多个进程,但是如果是上百甚至上千个目标,手动的去创建的进程的工作量巨大,此时就可以用到 multiprocessing 模块提供的 Pool 方法。

初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

from multiprocessing import Poolimport os,time,randomdef worker(msg):t_start = time.time()print('%s 开始执行,进程号为%d'%(msg,os.getpid()))time.sleep(random.random()*2)t_stop = time.time()print(msg,"执行完成,耗时%0.2f"%(t_stop-t_start))return msgdef demo():passif __name__ == '__main__':po = Pool(3)        # 定义一个进程池l = []for i in range(0,5):# 每次循环将会用空闲出来的子进程去调用目标res = po.apply_async(worker,(i,))     # 获取返回值,但是变成同步print(res.get())    l.append(res)for i in l:print(i.get())print("--start--")# 关闭进程池,关闭后不再接收新的请求,当进程池 close 的时候并未关闭进程池,# 只是会把状态改为不可再插入元素的状态,完全关闭进程池使用po.close()      # 关闭之后在次添加任务,程序报错# po.apply_async(demo)# 等待 po 中所有子进程执行完成,必须放在 close 语句之后 # 如果注释掉,主进程不会等子进程,程序直接执行结束po.join()       print("--end--")

案例二(ProcessPoolExecutor 方式):

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import timedef get_html(times):time.sleep(times)print("get page {} success".format(times))return timesexecutor = ProcessPoolExecutor(max_workers=3)
task1 = executor.submit(get_html,(3))
task2 = executor.submit(get_html,(5))#done方法用来判断某个人物是否完成
print(task1.done())
time.sleep(5)
print(task2.done())
print(task1.cancel()
#result方法可以获取task返回值
print(task1.result())

这种方式使用方法和多线程的中的 ThreadPoolExecutor 类似在此不再赘述。

6. 进程池之间的通信

from multiprocessing import Pool, Queue, Managerdef worker(msg, q):q.put(msg)def worker1(msg, q):# print(msg)data = q.get()print(data)if __name__ == '__main__':po = Pool(3)  # 定义一个进程池q = Manager().Queue()for i in range(0, 10):# 每次循环将会用空闲出来的子进程去调用目标  异步的po.apply_async(worker, (i, q))  # 每次循环将会用空闲出来的子进程去调用目标po.apply_async(worker1, (i, q))  print("--start--")# 关闭进程池,关闭后不再接收新的请求,po.close()  # 等待 po 中所有子进程执行完成,必须放在 close 语句之后 # 如果注释掉,主进程不会等子进程,程序直接执行结束po.join()  print("--end--")

7. 多进程之间的生产者消费者

from multiprocessing import Process, Queue
import time
import randomdef producer(name, food, q):for i in range(5):data = '%s 生产了%s%s' % (name, food, i)# 模拟延迟time.sleep(random.randint(1, 3))print(data)# 将数据放入 队列中q.put(data)def consumer(name, q):while True:# 没有数据就会卡住food = q.get()  # 判断当前是否有结束的标识if food is None: breaktime.sleep(random.randint(1, 3))print('%s 吃了%s' % (name, food))if __name__ == '__main__':q = Queue()p1 = Process(target=producer, args=('Tom', '包子', q))p2 = Process(target=producer, args=('Jack', '面条', q))c1 = Process(target=consumer, args=('张三', q))c2 = Process(target=consumer, args=('李四', q))p1.start()p2.start()c1.start()c2.start()p1.join()p2.join()# 等待生产者生产完毕之后 往队列中添加特定的结束符号q.put(None)  # 肯定在所有生产者生产的数据的末尾q.put(None)  # 肯定在所有生产者生产的数据的末尾

8. 进程锁

在进程之前是数据隔离的,那为什么我们还要锁呢?进程的数据隔离实际上指的是内存隔离,两个进程之间不能直接进行数据交流,但是我们可以通过文件或者网络来进行通信。而在这个时候,就可能出现数据不安全的问题,所以我们需要学习进程锁。

from multiprocessing import Lock
from multiprocessing import Processdef func(lock):lock.acquire()with open("0.txt") as f:num = int(f.read())num += 1with open("0.txt", "w") as f:f.write(str(num))lock.release()if __name__ == '__main__':lock = Lock()for i in range(100):p = Process(target=func, args=(lock, ))p.start()

我们创建了一个 Lock 类,将 lock 作为参数传入函数,在可能出现数据不安全的地方 lock.acquire(),如何结束时释放锁 lock.release(),但是数据安全效率会有一定的削减,因为我们调用 lock 时会有等待的过程。

9. Process 常用参数和方法

Process 语法结构如下:

Process([group [, target [, name [, args [, kwargs]]]]])
  • target :表示这个进程实例所调用对象
  • args :表示调用对象的位置参数元组
  • kwargs :表示调用对象的关键字参数字典
  • name :进程的名称,该名称是一个字符串,仅用于识别目的
  • group :仅用于兼容 threading.Thread

Process 类常用方法:

  • is_alive():返回进程是否还活着,粗略地说,从 start() 方法返回到子进程终止之前,进程对象仍处于活动状态。
    +join([timeout]) :是否等待进程实例执行结束,或等待多少秒。
  • start() :启动进程活动(创建子进程)。
  • run() :表示进程活动的方法,你可以在子类中重载此方法,标准 run() 方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 argskwargs 参数中获取顺序和关键字参数。
  • terminate() :终止进程。

Process 类常用属性:

  • name :进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。可以为多个进程指定相同的名称。初始名称由构造器设定。如果没有为构造器提供显式名称,则会构造一个形式为 ‘Process-N1:N2:…:Nk’ 的名称,其中每个 Nk 是其父亲的第 N 个孩子。
  • pid :当前进程实例的 PID 值。

Python 多进程笔记 — 启动进程的方式、守护进程、进程间通信、进程池、进程池之间通信、多进程生产消费模型相关推荐

  1. Java实现生产消费模型的5种方式

    ** 前言 ** 生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储 ...

  2. Python学习笔记:Day 3编写ORM

    前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...

  3. 【学习笔记】启动Nginx、查看nginx进程、查看nginx服务主进程的方式、Nginx服务可接受的信号、nginx帮助命令、Nginx平滑重启、Nginx服务器的升级

     1.启动nginx的方式: cd /usr/local/nginx ls ./nginx -c nginx.conf 2.查看nginx的进程方式: [root@localhost nginx] ...

  4. Python 学习笔记 多进程 multiprocessing

    Python 解释器有一个全局解释器锁(PIL),导致每个 Python 进程中最多同时运行一个线程,因此 Python 多线程程序并不能改善程序性能,不能发挥多核系统的优势,可以通过这篇文章了解. ...

  5. python学习笔记(十六)-Python多线程多进程

    一.线程&进程 对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程, ...

  6. python 进程间同步_python之路29 -- 多进程与进程同步(进程锁、信号量、事件)与进程间的通讯(队列和管道、生产者与消费者模型)与进程池...

    所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了.至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠 ...

  7. Python学习笔记:进程和线程(起)

    前言 最近在学习深度学习,已经跑出了几个模型,但Pyhton的基础不够扎实,因此,开始补习Python了,大家都推荐廖雪峰的课程,因此,开始了学习,但光学有没有用,还要和大家讨论一下,因此,写下这些帖 ...

  8. Python学习笔记__10.4章 进程VS线程

    # 这是学习廖雪峰老师python教程的学习笔记 1.概览 我们介绍了多进程和多线程,这是实现多任务最常用的两种方式.现在,我们来讨论一下这两种方式的优缺点 要实现多任务,通常我们会设计Master- ...

  9. python学习笔记(二十七)多线程与多进程

    线程是程序里面的最小执行单元. 进程是资源的集合. 线程是包含在一个进程里面,一个进程可以有多个线程,一个进程里面默认有一个主线程.由主线程去启动子线程. 1.多线程 import threading ...

最新文章

  1. PLSQL_解析过程及硬解析和软解析的区别(案例)
  2. Java并发 正确终止与恢复线程
  3. linux 分区_Linux文件系统、逻辑分区、物理分区
  4. 计算机视觉摔倒检测,基于计算机视觉的室内跌倒检测
  5. 计算机组装与维护实训1,计算机组装与维护实训报告[1]
  6. Dapper.Common基于Dapper的开源LINQ超轻量扩展
  7. 记一次Linux磁盘满盘/dev/vda1目录清理记录
  8. C语言 socket 编程学习
  9. 一个程序员的感慨的《虚拟光驱》
  10. Java企业面试算法新得体会之6大数据和空间限制问题6问
  11. 多目标遗传算法与优化的关系
  12. java 递归_两篇文章带你了解java基础算法之递归和折半查找
  13. axure 动态面板实现图片轮播效果(淘宝)
  14. java字节输出流方法,Java-IO:File和字节输入输出流
  15. Python面向对象进阶和socket网络编程
  16. linux环境apache,php的安装目录
  17. nginx开机启动脚本
  18. 【渝粤题库】陕西师范大学210009幼儿园健康教育作业(高起专)
  19. learning - Haskell AND Lisp vs. Haskell OR Lisp - Programmers Stack Exchange
  20. 寻找高匿名破盾代理方案1. tinyproxy

热门文章

  1. 2021-2027年中国手机天线行业竞争格局分析及发展趋势预测报告
  2. 2022-2028年中国应急救援装备行业市场研究及前瞻分析报告
  3. 浅显易懂 Makefile 入门 (09)— include 文件包含、MAKECMDGOALS
  4. 数据库 user schema sqlserver 关系
  5. 前后端分离必备工具:Swagger快速搞定(整合SpringBoot详细教程)
  6. ResNet网络的训练和预测
  7. 什么是视觉Visual SLAM
  8. mysql 分号 存储过程_MySql 存储过程
  9. Android switchCompat. 和 Switch
  10. Rendering failed with a known bug ,Please try a rebuild