作为高级编程范畴的重要组成部分,并发编程在编码中有着很大的应用,许多工程师收益于并发编程带来的高效福利。但是并行编程是有一定难度的,本章将和大家一起从入门开始,详细介绍在python中如何进行并行编程。

1 线程概述

几乎所有的操作系统都支持运行多任务,一个任务通常就是一个程序,每一个运行中的程序就是一个进程。当一个程序运行时,内部可能包含多个顺序执行流,每一个顺序执行流就是一个线程。

1.1 线程和进程

当一个程序进入内存运行时,就变成了一个进程。进程是运行过程中的程序,并且具有一定的独立的功能,进程是系统进行资源分配的最小单位。
进程有如下三个特征:

  1. 独立性: 进程是系统中独立存在的实体,它可以拥有自己的独立的资源,每一个进程都拥有自己的私有的地址空间。没有经过本进程的允许,一个用户进程不能访问其他进程的地址空间。
  2. 动态性:进程于程序的区别在于程序只是一个静态的指令集合,而进程是一个正在系统中活动的指令集合。在进程中加入了时间的概念,进程具有自己的生命周期和各种不同的状态,在程序中是没有这些概念的。
  3. 并发性:多个进程可以在单个处理器上并发执行,多个进程之间不会影响。

注意:并发和并行是两个概念,并行指在同一时刻有多条指令在多个处理器上同时执行;并发指同一时刻只能有一条指令执行,单多个进程指令被快速轮换执行,是的在宏观上具有多个进程同时执行的效果。
现代操作系统都支持多进程的并发执行,但在具体的实现上可能因为硬件和操作系统的不同而采用不同的策略。比较常用的策略有:

  1. 共用式的多任务策略,例如windows3.1和mac os 9操作系统采用这种策略。
  2. 抢占式的多任务操作策略,其效率更高,目前操作系统大多采用这种策略。

多线程则扩展了多进程的概念,使得同一个进程可以并发处理多个任务。线程也被称为轻量级进程,线程是进程的执行单元。线程在程序中是独立的、并发的执行流。当进程被初始化后,主线程就被创建了。

线程是进程的组成部分,一个进程可以拥有多个线程,一个线程必须有一个父进程。线程可以拥有自己的堆栈、自己的程序计数器和自己的局部变量,但不拥有系统资源,它与父进程的其他线程共享该进程拥有的全局资源。

线程可以完成一定的任务,可以与其他线程共享父进程中的共享变量及部分环境,相互之间协同完成进程要完成的任务。

线程是独立运行的,它并不知道进程中是否还有其他线程存在。线程的运行是抢占式的,也就是说,当前运行的线程在任何时候都可能被挂起,以便另外一个线程运行。

一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发运行。

1.2 多线程的优势

线程在程序中是独立的、并发的执行流,它们共享内存、文件句柄和其他进程的状态。因为线程的划分尺度小于进程,使得多线程程序的并发性高,而多个线程共享内存,从而极大的提高了程序运行的效率。
线程比进程具有更高的性能,这是由于同一个进程中的线程都有共性——多个线程共享同一个进程的虚拟空间。线程共享的环境包括进程代码段、进程的共有数据等,利用这些共享数据,线程之间很容易实现通信。
操作系统在创建进程时,必须为该进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单的多。
总结起来,使用多线程编码具有如下几个优点。

  1. 进程之间不能共享内存,但线程之间共享内存非常容易。
  2. 操作系统创建进程时,需要为该进程重新分配系统资源,但创建线程的代价小得多。
  3. python语言内置多线程功能支持,而不是单纯地作为操作系统的调度方式,从而简化了python的多线程编程。

2 线程的创建和启动

python主要通过两种方式创建线程。
4. 使用threading模块的thread类的构造器创建线程。
5. 继承threading模块的thread类创建线程类。

2.1 调用thread类的构造器创建线程

调用thread类的构造器创建线程很简单,直接调用threading.Thread类的如下构造器创建线程。

__init__(self,group=None,target=None,name=None,arg=(),kwargs=None,*,daemon=None)

上面的构造器涉及如下几个参数。

  1. group:指定线程所属的线程组。
  2. target:指定该线程要调度的目标方法。
  3. args:指定一个元组,以位置参数的形式为target指定的函数传入参数。元组的第一个元素传给target函数的第一个参数,元组的第二个元素传给target函数的第二个参数。
  4. kwargs:指定一个字典,以关键字参数的形式target指定的函数传入参数。
  5. daemon:指定所创建的线程是否为后台线程。
import threading
#定义一个普通的action方法,该方法准备为线程执行体
def action(max):for i in range(max):#调用threading模块的current_thread()函数获取当前进程#调用线程对象的getname()方法获取当前进程的名字print(threading.current_thread().getName()+" "+ str(i))
for i in range(100):#调用threading模块的current_thread()函数获取当前进程print(threading.current_thread().getName()+" "+str(i))if i == 20:#创建并启动第一个线程t1 = threading.Thread(target=action,args=(100,))t1.start()t2 = threading.Thread(target=action,args=(100,))t2.start()print('主线程执行完成')

除此之外,上面程序还用到了如下函数和方法:

  1. threading.current_thread():它是threading模块的函数,该函数总是返回正在执行的线程对象。
  2. getName():它是Thread类的实例方法,该方法返回调用它的线程名字。
  3. setName(name):可以通过getName()方法返回指定线程的名字,这两个方法可通过name属性代替。默认情况下,主线程的名字为MainThread,用户启动的多个线程的名字依次为Thread-1、Thread-2、Thread-3…Thread-n等。

2.2 继承Thread类创建线程类

通过继承Thread类来创建并启动线程的步骤如下:

  1. 定义Thread类的子类,并重写该类的run()方法。run()方法的方法就代表了线程需要完成的任务,因此把run()方法称为线程执行体。
  2. 创建Thread子类的实例,即创建线程对象。
  3. 调用线程对象的start()方法启动线程。
    直接上例子:
import threading#通过继承threading.Thread类创建线程类
class FkThread(threading.Thread):def __init__(self):threading.Thread.__init__(self)self.i = 0#重写run()方法作为线程执行体def run(self):while self.i < 100:print(threading.current_thread().getName()+" "+str(self.i))self.i+=1
for i in range(100):print(threading.current_thread().getName()+" "+str(i))if i == 20:ft1 = FkThread()ft1.start()ft2 = FkThread()ft2.start()
print('主线程完成')

通常来说推荐直接使用Thread类来创建线程,因为这种方式不仅编程简单,而且线程直接包装target函数,具有更加清晰的逻辑结果。

3 线程的生命周期

当线程被创建并启动以后,它既不是一启动就进入执行状态的,也不是一直处于执行状态的,在线程的生命周期中,它要经过新建(new)、就绪(Ready)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。

3.1 新建和就绪状态

当程序创建了一个Thread对象或Thread子类的对象之后,该线程就处于新建状态,和其他的python对象一样,此时的线程对象并没有表现出任何线程的动态特征,程序也不会执行线程执行体。
当线程对象调用start()方法之后,该线程就处于就绪状态,python解释器会为其创建方法调用栈和程序计数器,处于这种状态中的线程并没有开始运行,只是表示该线程可以运行了。至于该线程何时开始运行,取决于python解释器中线程调度器的调度。
notice:启动线程使用start()方法,而不是run()方法。如果直接调用线程对象的run()方法,则系统把线程对象当作一个普通对象,而run()方法也是一个普通方法,而不是线程执行体。

#这种调用方法不会调用线程大家要注意
import threadingdef action(max):for i in range(max):print(threading.current_thread().name + " "+str(i))for i in range(100):print(threading.current_thread().name+" "+str(i))if i == 20:threading.Thread(target=action,args=(100,)).run()threading.Thread(target=action,args=(100,)).run()

3.2 运行和阻塞

如果处于就绪状态的线程获得了CPU,开始执行run()方法的线程执行体,则该线程处于运行状态。如果计算机只有一个CPU,那么在任何时刻只有一个线程处于运行状态。在一个具有多处理器的机器上,将会有多个线程并行执行(pareallel)执行;当线程数大于处理器数时,依然存在多个线程在同一个CPU轮换的情况。
当一个线程开始运行后,它不可能一直处于运行状态(除非它的线程执行体足够短,瞬间就执行结束了),线程在运行过程中需要被中断,目的是使其他线程获得执行的机会,线程调度的细节取决于底层平台的策略。对于抢占式调度策略的系统而言,系统会给每个可执行的线程一个小时间段来处理任务;当该时间段用完后,系统就会剥夺该线程所占用的资源,让其他线程获得执行的机会。在选择下一个线程时,系统会考虑线程的优先级。
所有现代的桌面和服务器操作系统都采用抢占式调度策略,但一些小型设备如手机等则采用协作式调度策略——也就是必须又该线程主动放弃所占用的资源。
当发生如下情况式,线程会进入阻塞状态(中断):

  1. 线程调用sleep()方法主动放弃其所占用的处理器资源。
  2. 线程调用了一个阻塞式i/o方法,在该方法返回之前,该线程会被阻塞。
  3. 线程试图获得一个锁对象,但该锁对象正被其他线程持有。
  4. 线程在等待某个通知。
    被阻塞的线程会在合适的机会重新进入就绪状态,也就是说被阻塞线程的阻塞解除后,必须重新等待线程调度器再次调度它。
    当发生如下特定的情况可以解除阻塞,让该线程重新进入就绪状态。
  5. 调用sleep()方法的线程经过了指定时间
  6. 线程调用的阻塞式i/o方法已经返回。
  7. 线程成功地获得了试图获取的锁对象
  8. 线程正在等待某个通知时,其他线程发出了一个通知。
    下图为线程的状态转换图:

3.3 线程死亡

线程会以如下三种方式结束,结束后就处于死亡状态。

  1. run()方法或代表线程执行体的target函数执行完成后,线程正常结束。
  2. 线程抛出一个异常或error
    当主线程结束时,其他线程不受任何影响,并不会随之结束。一旦子线程启动起来后,他就拥有和主线程相同的地位,它不会受主线程的影响。
    为了测试某个线程是否已经死亡可以调用线程对象的is_alive()方法,当线程处于就绪、运行、阻塞三种状态时,该方法返回true,当线程处于新建和死亡时返回false。
    不要试图对一个已经死亡的线程调用start()方法使它重新启动,死亡就是死亡,该线程不可再次作为线程运行。
import threadingdef action(max):for i in range(100):print(threading.current_thread().name+" "+str(i))
sd = threading.Thread(target=action,args=(100,))
for i in range(100):print(threading.current_thread().name+" "+str(i))if i==20:sd.start()print(sd.is_alive())if i>20 and not (sd.is_alive()):sd.start()

4 控制线程

4.1 join线程

Thread提供了一个让线程等待另外一个线程的方法join(),该方法通常由使用线程的程序调用,以将大问题划分成许多小问题,并为每个小问题分配许多小问题,并为每个小问题分配一个线程。

import threadingdef action(max):for i in range(100):print(threading.current_thread().name+" "+str(i))
sd = threading.Thread(target=action,args=(100,))
for i in range(100):print(threading.current_thread().name+" "+str(i))if i==20:sd.start()print(sd.is_alive())sd.join()print(threading.current_thread().name+" "+str(i))

join(timeout=None)方法可以指定一个timeout参数,该参数指定等待被join的线程的时间最长为timeout秒。如果在timeout秒内被join的线程还没有执行结束则不再等待。

4.2 后台线程

有一种线程,它是在后台运行的,它的任务是为其他线程提供服务,这种线程被称为“后台线程”,又称为“守望线程”或“精灵线程”。后台线程有一个特征:如果所有的前台线程都死亡了,那么后台线程会自动死亡。
调用Thread对象的daemon属性可以将指定线程设置成后台线程。下面给出一个演示例子:

import threadingdef action(max):for i in range(max):print(threading.current_thread().name+" "+str(i))
sd = threading.Thread(target=action,args=(100,))
sd.daemon = True
#启动后台线程
sd.start()
for i in range(10):print(threading.current_thread().name+" "+str(i))

前台线程创建的子线程默认是前台线程,后台线程创建的子线程默认是后台线程。
如果要将某个线程设置为后台线程,则必须在该线程启动之前进行设置。

4.3 线程睡眠sleep

如果需要让当前正在执行的线程暂停一段时间,并进入阻塞状态,则可以通过调用time模块的sleep(secs)函数来实现。该函数可以指定一个secs参数,用于指定线程阻塞多少秒。
当当前线程调用sleep()函数进入阻塞状态后,在其睡眠时间段内,该线程不会获得执行的机会,即使系统中没有其他可执行的线程,处于sleep()中的线程也不会执行,因此sleep()函数用来暂停程序的运行。

import timefor i in range(10):print("当前时间:%s"%time.ctime())#调用sleep()函数让当前线程暂停1stime.sleep(1)

5 线程同步

多线程编程是一件很有趣的事情,它很容易突然出现错误,这使得由于系统的线程调度具有随机性造成的。

5.1 线程安全问题

关于线程安全问题,有一个经典的问题——银行取钱问题。从银行取钱的基本流程可以分为如下几个步骤

  1. 用户输入账号、密码,系统判断用户的账户,密码是否匹配。
  2. 用户输入取款金额
  3. 系统判断账户余额是否大于取款金额。
  4. 如果余额大于取款金额,则取款成功;如果余额小于取款金额,则取款失败。
    按照上面的流程编写取款程序,并使用两个线程来模拟两个人使用同一个账户并发取钱操作。此处忽略检查账户和密码的操作,仅仅模拟后面三步操作。下面先定义一个账户类,该账户类封装了账和余额两个变量。
class Account:#定义构造器def __init__(self,account_no,balance):self.account_no = account_noself.balance = balanceimport threading
import  time#定义一个函数模拟取钱操作
def draw(account,draw_amount):if account.balance >= draw_amount:#吐出钞票print(threading.current_thread().name+"取钱成功!吐出钞票:"+str(draw_amount))time.sleep(0.001)#修改余额account.balance -= draw_amountprint("\t余额为:"+str(account.balance))else:print(threading.current_thread().name+"取钱失败!余额不足")
acct = Account("1234567",1000)
#使用两个线程从一个账户中取钱
threading.Thread(name='甲',target=draw,args=(acct,800)).start()
threading.Thread(name='乙',target=draw,args=(acct,800)).start()

问题出现了:账户金额只有1000元时取出了1600元,而且账户出现了负值,这不是银行所期望的结果。

5.2 同步锁

之所以会出现上面这个问题,是因为run()方法的方法体不具有线程安全性——程序中有两个并发线程在修改Account:而且系统恰好在sleep处执行线程切换,切换到另外一个修改account的线程,所以就出现了问题。
为了解决这个问题,python的threading模块引入了锁(Lock)。threading模块提供了lock和rlock两个类,它们都提供了如下两个方法来枷锁和释放锁。

  1. acquire(blocking=True,timeout=-1):请求lock和rlock加锁,其中timeout参数指定加锁多少秒。

  2. release():释放锁。
    lock和rlock的区别如下:

  3. threading.lock:它是一个基本的锁对象,每次只能锁定一次,其余的锁请求,需等待锁释放后才能获取。

  4. threading.rlock:它代表可重入锁。对于可重入锁,在同一个线程中可以对它进行多次锁定,也可以多次释放。如果使用rlock,那么acquire()和release()方法必须成对出现。
    rlock锁具有可重入性。也就是说,同一个线程可以对已被加锁的rlock锁再次枷锁,rlock会维持一个计数器追踪acquire方法的嵌套调用,线程在每次调用acquire()枷锁后都必须显示调用release()方法释放锁。所以,一段被锁保护的方法可以调用另外一个被相同的锁保护的方法。
    lock是控制多个线程对共享资源进行访问的工具。在实现线程安全的控制中,比较常用的是rlock。
    通过使用lock对象实现线程安全的类,线程安全的类具有如下特征:

  5. 该类的对象可以被多个线程访问。

  6. 每个线程在调用该对象的任意方法之后,都将得到正确的结果

  7. 每个线程调用该对象的任意方法之后,该对象保持正确的状态。

import threading
import  timeclass Account:#定义构造器def __init__(self,account_no,balance):self.account_no = account_noself._balance = balanceself.lock = threading.RLock()#因为账户余额不允许随便修改,所以只为self._balance提供getter方法def getBalance(self):return  self._balance#提供一个线程安全的draw方法完成取钱操作def draw(self,draw_amount):try:self.lock.acquire()if self._balance >= draw_amount:# 吐出钞票print(threading.current_thread().name + "取钱成功!吐出钞票:" + str(draw_amount))time.sleep(0.001)# 修改余额self._balance -= draw_amountprint("\t余额为:" + str(self._balance))else:print(threading.current_thread().name + "取钱失败!余额不足")finally:self.lock.release()#定义一个函数模拟取钱操作
def draw(account,draw_amount):account.draw(draw_amount)
acct = Account("1234567",1000)
#使用两个线程从一个账户中取钱
threading.Thread(name='甲',target=draw,args=(acct,800)).start()
threading.Thread(name='乙',target=draw,args=(acct,800)).start()

可变类的线程安全是以降低程序的运行效率作为代价的,为了减少线程安全带来的负面影响,程序可以采取如下策略。
1、不要对线程安全类的所有方法都进行同步,只对那些会改变竞争资源的方法进行同步。
2、如果可变类有两种运行环境:单线程环境和多线程环境则应该为该可变类提供两个版本。

5.3 死锁

当两个线程相互等待对方释放同步监视器时就会发生死锁。一旦发生死锁,整个程序既不会发生任何异常,也不会给出任何异常提示,只是所有线程处于阻塞状态,无法继续。

#这是一个典型的死锁的例子
class A:def __init__(self):self.lock = threading.RLock()def foo(self,b):try:self.lock.acquire()print("当前线程名:" + threading.current_thread().name + "进入A实例的foo()方法")time.sleep(0.2)print("当前线程名:" + threading.current_thread().name + "企图调用B实例的last()方法")b.last()finally:self.lock.release()def last(self):try:self.lock.acquire()print("进入A类的last()方法")finally:self.lock.release()
class B:def __init__(self):self.lock = threading.RLock()def bar(self,a):try:self.lock.acquire()print("当前线程名:"+threading.current_thread().name+"进入B实例的bar()方法")time.sleep(0.2)print("当前线程名:"+threading.current_thread().name+"企图调用A实例的last()方法")a.last()finally:self.lock.release()def last(self):try:self.lock.acquire()print("进入了B类的last()方法内部")finally:self.lock.release()
a = A()
b = B()
def init():threading.current_thread().name="主线程"a.foo(b)print("进入了主线程之后")
def action():threading.current_thread().name="副线程"b.bar(a)print("进入了副线程之后")
threading.Thread(target=action).start()
init()

死锁是不应该在程序中出现的,在编写程序时应该尽量避免出现死锁。下面有几种常见的方式解决死锁问题。

  1. 避免多次锁定:尽量避免同一个线程对多个lock进行锁定。
  2. 如果多线程需要对多个lock进行锁定,应该保证它们以相同的顺序请求加锁。
  3. 使用定时锁:程序调用acquire()方法加锁时可指定timeout参数,该参数指定超过timeout秒后会自动释放对lock的锁定。
  4. 死锁检测,以靠算法机制实现死锁预防,针对不可以实现按序加锁,也不能使用定时锁的场景。

6 线程通信

当线程在系统中运行时,线程的调度具有一定的透明性,通常程序无法准确控制线程的轮换执行,如果有需要,python可通过线程通信来保证线程协调运行。

6.1 使用condition实现线程通信

假设系统中有两个线程,这两个线程分别代表存款者和取钱者——现在假设系统有一种特殊的要求,即要求存款者和取钱者不断重复存款、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱。不允许存款者连续两次存钱,也不允许取钱者两次取钱。
使用condition可以让那些已经得到lock对象却无法继续执行的线程释放lock对象,condition对象也可以唤醒其他处于等待状态的线程。
将conditon对象与lock对象组合使用,可以为每个对象提供多个等待集(wait-set)。因此,condition对象总是需要有对应的Lock对象。在构造condition的时,要传入Lock对象将器绑定。
Condition类提供了如下几个方法:

  1. acquire([timeout])/release():调用condition关联的lock的acquire()或release()方法。
  2. wait([timeout]):导致当前进程进入condition的等待池等待通知并释放锁,直到其他线程调用该condition的notify()或notify_all()方法来唤醒其他线程,在调用该wait()方法时可传入timeout参数,指定该线程最多等待多少秒。
  3. notify():唤醒在该conditon等待池中的单个线程并通知它,收到通知的线程将会自动调用acquire方法尝试加锁,如果所有线程都在condition等待池中等待,则会选择其中一个线程,选择是任意的。
  4. notify_all():唤醒在该condition等待池中的所有线程并通知它们。
import threading
import  timeclass Account:#定义构造器def __init__(self,account_no,balance):self.account_no = account_noself._balance = balanceself.cond = threading.Condition()self._flag = False#因为账户余额不允许随便修改,所以只为self._balance提供getter方法def getBalance(self):return  self._balance#提供一个线程安全的draw方法完成取钱操作def draw(self,draw_amount):try:self.cond.acquire()if not self._flag:self.cond.wait()else:print(threading.current_thread().name+"取钱:"+str(draw_amount))self._balance -= draw_amountprint("账户余额为:"+str(self._balance))self._flag=Falseself.cond.notify_all()finally:self.cond.release()def deposit(self,deposit_amount):self.cond.acquire()try:if self._flag:self.cond.wait()else:#执行存款操作print(threading.current_thread().name+"存款:"+str(deposit_amount))self._balance += deposit_amountprint("账户余额为:"+str(self._balance))self._flag=Trueself.cond.notify_all()finally:self.cond.release()
def draw_many(account,draw_amount,max):for i in range(max):account.draw(draw_amount)
def deposit_many(account,deposit_amount,max):for i in range(max):account.deposit(deposit_amount)
acct = Account("1234567",0)
threading.Thread(name="取钱者",target=draw_many,args=(acct,800,100)).start()
threading.Thread(name="存款者甲",target=deposit_many,args=(acct,800,100)).start()
threading.Thread(name="存款者乙",target=deposit_many,args=(acct,800,100)).start()
threading.Thread(name="存款者丙",target=deposit_many,args=(acct,800,100)).start()

6.2 使用队列控制线程通信

在queue模块下提供了几个阻塞队列,这些队列用于实现线程通信。

  1. queue.Queue(maxsieze=0):代表FIFO(先进先出)的常规队列,maxsize可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将maxsize设置为0或者负数,则该队列的大小就是无限制的。
  2. queue.lifoQueue(maxsize=0):代表LIFO(后进先出)的队列,与Queue的区别就是出队列的顺序不同。
  3. priorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。
  4. Queue.qsize():返回队列的实际大小。
  5. Queue.empty()/full():返回队列是否为空/是否已满。
  6. Queue.put(item,block=True,timeout=None):向队列中加入元素。如果队列已满,且block参数True(阻塞),当前线程被阻塞,timeout指定阻塞时间,如果将timeout设置为None,则代表一直阻塞,直到该队列的元素被消费;如果队列已满,且block参数为False(不阻塞),则直接引发queue.FULL异常。
  7. Queue.put_nowait(item):向队列中放入元素,不阻塞。
  8. Queue.get(item,block=True,timeout=None):从队列中取出来元素(消费元素)。
  9. Queue.get_nowait(item):从队列中取出元素,不阻塞。
import  queue
bq = queue.Queue(2)
bq.put("python")
bq.put("python")
print("11111111111")
bq.put("Python")
print("22222222222")

下面给出通过Queue来实现线程通信的例子:

import threading
import  time
import  queue
def product(bq):str_tuple = ("python","kotlin","swift")for i in range(99999):print(threading.current_thread().name+"生产者准备生产元组元素!")time.sleep(0.2)bq.put(str_tuple[i%3])print(threading.current_thread().name+"生产者生产元组元素完成")
def consumer(bq):while True:print(threading.current_thread().name+"消费者准备消费元素元素")time.sleep(0.2)t = bq.get()print(threading.current_thread().name+"消费者消费[%s]元素完成!" % t)
bq = queue.Queue(maxsize=1)
#启动三个生产者线程
threading.Thread(target=product,args=(bq,)).start()
threading.Thread(target=product,args=(bq,)).start()
threading.Thread(target=product,args=(bq,)).start()
#启动一个消费者线程
threading.Thread(target=consumer,args=(bq,)).start()

6.3 使用Event控制线程通信

Event是一种非常简单的线程通信机制:一个线程发出一个Event,另一个线程可通过该Event被触发。
Event本身管理一个内部旗标,程序可以通过Event()的set方法将该旗标设置为True,也可以调用clear()方法将该旗标设置为False,程序可以调用wait()方法阻塞当前进程,直达Event的内部旗标设置为True。
Event提供了如下方法:

  1. is_set():返回Event的内部旗标是否为True。
  2. set():该方法将会把Event的内部旗标置为True,并唤醒所有处于等待状态的线程。
  3. clear():将Event的内部旗标设置为False,通常接下来会调用wait()方法来阻塞当前进程。
  4. wait(timeout=None):该方法会阻塞当前线程。
    下面给出应用event的例子:
import threading
import time
event = threading.Event()def cal(name):#等待事件,进入等待阻塞状态print("%s启动"%threading.current_thread().getName())print("%s准备开始计算状态"%name)event.wait()#收到事件后进入运行状态print('%s 收到通知了。'%threading.current_thread().getName())print("%s正式开始计算"%name)
#创建并启动两个线程它们都会在wait后阻塞
threading.Thread(target=cal,args=('甲',)).start()
threading.Thread(target=cal,args=('乙',)).start()
time.sleep(2)
print('--------------------------------')
print('主线程发出事件')
event.set()

event有点类似于condition和旗标的结合体,但event不带lock对象,如果要实现线程同步需要额外的lock对象。
下面通过event对取钱过程进行控制

#可以将该实现与condition进行对比
import threading
import  time
import  queueclass Account:#定义构造器def __init__(self,account_no,balance):self.account_no = account_noself._balance = balanceself.lock = threading.Lock()self.event = threading.Event()#因为账户余额不允许随便修改,所以只为self._balance提供getter方法def getBalance(self):return  self._balance#提供一个线程安全的draw方法完成取钱操作def draw(self,draw_amount):self.lock.acquire()if self.event.is_set():print(threading.current_thread().name+"取钱:"+str(draw_amount))self._balance -= draw_amountprint("账户余额为:"+str(self._balance))self.event.clear()self.lock.release()self.event.wait()else:self.lock.release()self.event.wait()def deposit(self,deposit_amount):self.lock.acquire()if not self.event.is_set():#执行存款操作print(threading.current_thread().name+"存款:"+str(deposit_amount))self._balance += deposit_amountprint("账户余额为:"+str(self._balance))self.event.set()self.lock.release()self.event.wait()else:self.lock.release()self.event.wait()
def draw_many(account,draw_amount,max):for i in range(max):account.draw(draw_amount)
def deposit_many(account,deposit_amount,max):for i in range(max):account.deposit(deposit_amount)
acct = Account("1234567",0)
threading.Thread(name="取钱者",target=draw_many,args=(acct,800,100)).start()
threading.Thread(name="存款者甲",target=deposit_many,args=(acct,800,100)).start()
threading.Thread(name="存款者乙",target=deposit_many,args=(acct,800,100)).start()
threading.Thread(name="存款者丙",target=deposit_many,args=(acct,800,100)).start()

7 线程池

线程池在系统启动时创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡的,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
使用线程池可以有效地控制系统中并发线程的数量。当系统中有大量的并发线程时,会导致系统性能急剧下降,甚至导致python解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过次数量。

7.1 使用线程池

线程池的基类是concurrent.future模块中的Executor,Executor提供了两个子类,即ThreadPoolExecutor和ProcessPoolExecutor,其中ThreadPoolExecutor用于创建线程池,而ProcessPoolExecutor创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的task函数提交给线程池/进程池,剩下的事由线程池或进程池搞定。
Executor提供了如下常用方法:

  1. submit(fn,*arg,**kwargs):将fn函数提交给线程池。*args代表传给fn函数的参数,*kargs代表以关键字参数的形式为fn传入参数。

  2. map(func,*iterables,timeout=None,chunksize=1):该函数类似于全局函数map(func,*iterables),只是该函数会启动多个线程,以异步方式立即对iterable执行map处理。

  3. shutdown(wait=True):关闭线程池。
    程序将task函数提交(submit)给线程池后,submit方法会返回一个Future对象,Future类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行相当于一个“将来完成”的任务,所以python用Future来代表。
    Future提供了如下方法:

  4. cancel():取消该Future代表的线程任务。

  5. cancelled():返回future代表的线程任务是否被成功取消。

  6. running():如果该Future代表的线程正在执行,不可取消,该方法返回True。

  7. done():如果该Future代表的线程任务被成功取消或完成执行,返回为True。

  8. result(timeout=None):获取该线程任务最后返回的结果。

  9. exception(timeout=None):获取该Future代表的线程任务引发的异常。
    10.add_done_callback(fn):为该Future代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动出发该fn函数。
    在用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列。调用shutdown()方法后的线程池不再接收新任务,但会将以前所有的已提交的任务完成。当线程池中所有任务都执行完成后,该线程池中所有线程都会死亡。
    使用线程池来执行线程任务的步骤如下:

  10. 调用ThreadPoolExecutor类的构造器创建一个线程池。

  11. 定义一个普通函数作为线程任务。

  12. 调用ThreadPoolExcutor的submit方法提交线程任务。

  13. 当不想提交任何任务时,调用ThreadPoolExcutor对象的shutdown()方法关闭线程池。
    下面上一个例子:

from concurrent.futures import ThreadPoolExecutor
import threading
import timedef action(max):my_sum = 0for i in range(max):print(threading.current_thread().name+" "+str(i))my_sum+=ireturn  my_sum
pool = ThreadPoolExecutor(2)
future1 = pool.submit(action,50)
future2 = pool.submit(action,100)print(future1.done())
time.sleep(3)print(future2.done())
print(future1.result())
print(future2.result())pool.shutdown()

7.2 获取执行结果

前面程序调用了Future的result()方法获取线程任务的返回值,但该方法会阻塞当前主线程,只有该线程完成后,result()的阻塞才会解除。
如果程序不希望直接调用result()方法阻塞线程,可以通过future的add_done_callback()方法来添加回调函数,该回调函数行如fn(future)。

from concurrent.futures import ThreadPoolExecutor
import threading
import timedef action(max):my_sum = 0for i in range(max):print(threading.current_thread().name+" "+str(i))my_sum+=ireturn  my_sum
with ThreadPoolExecutor(2) as pool:future1 = pool.submit(action, 50)future2 = pool.submit(action, 100)def get_result(future):print(future.result())future1.add_done_callback(get_result)future2.add_done_callback(get_result)

此外,Executor还提供了一个map(func,*iterable,timeout=None,chunsize=1)方法,该方法的功能类似于全局函数map()。

from concurrent.futures import ThreadPoolExecutor
import threading
import timedef action(max):my_sum = 0for i in range(max):print(threading.current_thread().name+" "+str(i))my_sum+=ireturn  my_sum
with ThreadPoolExecutor(2) as pool:results = pool.map(action,(50,100,150))print('---------------------------')for r in results:print(r)

8 线程相关类

8.1 线程局部变量

python再threading模块下提供了一个Local(),该函数可以返回一个线程局部变量,通过使用线程局部变量可以很简捷地隔离多线程访问的竞争资源,从而简化多线程并发的编程处理。
线程局部变量的功能其实非常简单,就是为每一个使用该变量的线程提供一个变量的副本,使每一个线程都可以独立改变自己的副本,而不会和其他线程的副本冲突。

import  threading
from concurrent.futures import ThreadPoolExecutor#定义线程局部变量
mydata = threading.local()
def action(max):for i in range(max):try:mydata.x += iexcept:mydata.x = i#访问mydata的x值print('%s mydata.x的值为:%d ----- %d'%(threading.current_thread().name,i,mydata.x))
with ThreadPoolExecutor(max_workers=2) as pool:pool.submit(action,10)pool.submit(action,10)

线程局部变量和其他同步机制一样,都是为了解决多线程中对共享资源的访问冲突。在普通的同步机制中,使通过为对象加锁来实现多个线程对共享资源的安全访问的。
线程局部变量从另外一个角度解决多线程的并发访问问题,线程局部变量将需要并发访问的资源复制多份,每个线程都拥有自己的资源副本,从而也就没有对该资源进行同步的必要了。
线程局部变量并不能替代同步机制,两者面向的问题领域不同。同步机制是为了同步多个线程对共享资源的并发访问,是多个线程之间进行有效通信的方式(从效果上看共同维护一份数据);线程局部变量是为了隔离多个线程对共享资源的并发访问(从效果上看维护两套数据)

8.2 定时器

Thread类有一个timer子类,该子类可用于控制指定函数在特定之间内执行一次。

from  threading import Timerdef hello():print('hello world')
t = Timer(10.0,hello)
t.start()

8.3 任务调度

如果需要执行更复杂的任务调度,则可使用python提供的shed模块。该模块提供了sched.sheduler类,该类代表一个任务调度器。

import sched,timeimport  threadings = sched.scheduler()
def print_time(name='default'):print("%s 的时间: %s" %(name,time.ctime()))
print('主线程:',time.ctime())
s.enter(10,1,print_time)
s.enter(5,2,print_time,argument=('位置参数',))
s.enter(5,1,print_time,kwargs={'name':'关键字参数'})s.run()
print('主线程:',time.ctime())

9 多进程

9.1 使用fork创建新进程

python的os模块提供了一个fork()方法,该方法可以fork()出来一个子进程。

import  os
print('父进程(%s)开始执行'% os.getpid())
pid = os.fork()
print('进程进入:%s'% os.getpgid())if pid == 0:print('子进程,其ID为(%s),父进程ID为(%s)'%(os.getpgid(),os.getppid()))
else:print('我(%s)创建的子进程ID为(%s)' % (os.getppid(),pid))

9.2 使用multiprocessing.Process 创建新进程

Python在multiprocessing模块下提供了process来创建新进程。


1、以指定函数作为target创建新进程

import  os
import  multiprocessing.process
def action(max):for i in range(max):print("%s子进程,父进程(%s):%d"%(os.getpid(),os.getppid(),i))
if __name__ == '__main__':for i in range(100):print("%s主进程:%d"%(os.getpid(),i))if i == 20:mp1 = multiprocessing.process(target=action,args=(100,))mp1.start()mp2 = multiprocessing.process(target=action, args=(100,))mp2.start()mp2.join()print("主进程执行完成")

2、继承Process类创建子进程

import  os
import  multiprocessingclass MyProcess(multiprocessing.Process):def __init__(self,max):self.max = maxsuper().__init__()def run(self):for i in range(self.max):print("(%s)子进程(父进程:(%s)):%d"%(os.getpid(),os.getppid(),i))
if __name__ == '__main__':for i in range(100):print("(%s)主进程:%d"%(os.getpid(),i))mp1 = MyProcess(100)mp1.start()mp1.join()
print("主进程执行结束")

9.3 Context和启动进程的方式

python支持3中启动进程的方式:

9.4 使用进程池管理进程

9.5 进程通信

python为进程通信提供了两种机制。
1、Queue:一个进程想queue中放入数据,另外一个进程从中读取数据。
2、Pipe:Pipe代表连接两个进程的管道。
使用queue实现进程通信:

import  os
import  multiprocessingdef f(q):print('(%s)进程开始放入数据...'%multiprocessing.current_process().pid)q.put('Python')
if __name__ == '__main__':q = multiprocessing.Queue()p = multiprocessing.Process(target=f,args=(q,))p.start()print('(%s)进程开始取出数据...'% multiprocessing.current_process().pid)print(q.get())p.join()

2、使用pipe实现进程通信

import  os
import  multiprocessingdef f(conn):print('(%s)进程开始放入数据...'%multiprocessing.current_process().pid)conn.send('Python')
if __name__ == '__main__':parent_conn,child_conn = multiprocessing.Pipe()p = multiprocessing.Process(target=f,args=(child_conn,))p.start()print('(%s)进程开始取出数据...'% multiprocessing.current_process().pid)print(parent_conn.recv())p.join()

疯狂python讲义学习日志11——并发编程相关推荐

  1. 疯狂Python讲义学习日志01——变量和简单类型

    1.变量和简单类型 1.1python脚本注释 python提供了两种脚本注释的方式: 当行脚本的注释: 对于单行的python的脚本而言,可以在该行脚本所在的左侧添加"#"字符的 ...

  2. 疯狂python讲义学习笔记——中十章完结

    #第十一章 thinker import tkinter as tk print(help(tk.Button.__init__))#以按扭为例查看有什么属性 class myApplication( ...

  3. 疯狂Python讲义学习笔记(含习题)之 类和对象

    Python支持面向对象的三大特征:封装.继承和多态. 一.类和对象 可以把类当成一种自定义类型,可以使用类来定义变量,也可以使用类来创建对象. (一)定义类 类是某一批对象的抽象,可以把类理解成某种 ...

  4. 疯狂Python讲义学习笔记(含习题)之 常见模块

    一.sys模块 sys模块代表了Python解释器,主要用于获取和Python解释器相关的信息. >>> import sys >>> [e for e in di ...

  5. 疯狂Python讲义学习笔记(含习题)之 流程控制

    Python支持两种基本流程控制结构:分支结构和循环结构.分支结构用于实现根据条件来选择性地执行某段代码:循环结构用户实现根据循环条件重复执行某段代码. Python使用if语句提供分支支持,使用wh ...

  6. 疯狂python讲义学习笔记——后十章完结

    ''' numpy pandas torch keras tensorflow tushare sklearn opencv time kivy '''#第二十一章:numpy import nump ...

  7. 疯狂python讲义学习笔记——前十章完结

    #第一章:绪论 #单行注释 ''' 多行注释 ''' """ 多行注释 """#dir列出指定类或模块的属性与方法,help查看某个函数或方 ...

  8. 面试分析《疯狂Python讲义》PDF代码+《Python核心编程第3版》PDF代码问题

    python语言现在很流行了,除了用在学校,也用在很多行业.python学起来较为简单,语法容易理解,也可用于数据分析. 国内的教材推荐看<疯狂python讲义>,对比国外也有很多好的参考 ...

  9. 疯狂python讲义pdf_如何自学成Python大神?这份学习宝典火爆 IT 圈!

    都说人生苦短,我用 Python.为什么? 简单明了的理由当然是开发效率高.但是学习 Python 的初学者往往会面临以下残酷的现状: 网上充斥着大量的学习资源.书籍.视频教程和博客,但是大部分都是讲 ...

最新文章

  1. 别人总结的批处理技巧
  2. 机器学习常用激活函数
  3. java环境变量设置
  4. 解决 Dynamics AX 2009 部署报表时错误
  5. cs-HtmlHelpers
  6. chrome上很棒的爬虫插件,至少爬取博客够用了
  7. [TCP/IP] TCP流和UDP数据报之间的区别
  8. redis客户端连接数量_实战解析无所不知的Redis拓展应用——Info,进阶学习,无所不能...
  9. 小白自学前端,轻松月入过万哦!
  10. 节流函数的实现,一次面试题遇到的编程题
  11. MySQL 添加where 1= 1 是否会引起索引失效
  12. wordpress模板-Blocksy主题模板V1.8.3.4
  13. C语言 int y=10 do,删除学生信息算法执行完case10输入y 之后do whil
  14. 安装 samba 记录
  15. Android ViewFlipper翻转视图的基本使用
  16. 多出多个虚拟显示器的解决方法
  17. 浙江大学计算机专业选考要求,浙大等招办主任解读2020年选考科目要求!各专业有调整!...
  18. 快速乘 O(lgn) and O(1)
  19. 国家开放大学2021春1062文学英语赏析题目
  20. 移除联想M5210阵列卡(3650M5)的缓存模块以开启JBOD模式

热门文章

  1. linux发邮件到126,使用126邮箱发送邮件的python脚本
  2. VB取得硬盘物理序列号
  3. LeetCode-题目详解(十一):回溯算法【递归回溯、迭代回溯】【DFS是一个劲往某一个方向搜索;回溯算法建立在DFS基础之上,在搜索过程中,达到结束/裁剪条件后,恢复状态,回溯上一层,再次搜索】
  4. Python编程:实现凯撒密码加密解密
  5. 说一下数据库有哪些索引类型,有什么优缺点?
  6. js通过IP地址获取所在城市
  7. 目标导向的交互设计:About face 3 -- The essentials of interaction design 读书分享
  8. 服部周作《麦肯锡晋升法则》读书笔记 I
  9. springboot同时接受文件和多个参数
  10. 《勋伯格和声学》读书笔记(十):减七和弦