目录

前言

GIL

同步机制

1. Semaphore(信号量)

2. Lock(锁)

3. RLock(可重入锁)

4. Condition(条件)

5. Event

6. Queue

线程池


前言

对我来说,编程的乐趣之一是想办法让程序执行的越来越快,代码越写越优雅。在刚开始学习并发编程时,相信你它会有一些困惑,本文将解释多个并发开发的问题并帮助你快速了解并发编程的不同场景和应该使用的解决方案。

GIL

Python(特指 CPython)的多线程的代码并不能利用多核的优势,而是通过著名的全局解释锁(GIL)来进行处理的。如果是一个计算型的任务,使用多线程 GIL 就会让多线程变慢。我们举个计算斐波那契数列的例子:

# coding: utf-8
# Author: shelley
# 2020/9/21,14:10import time
import threadingdef profile(func):def wrapper(*args, **kwargs):import timestart = time.time()func(*args, **kwargs)end = time.time()print ('COST: {}'.format(end - start))return wrapperdef fib(n):if n<=2:return 1return fib(n-1) + fib(n-2)@profile
def nothread():fib(35)fib(35)@profile
def hasthread():for i in range(2):t = threading.Thread(target=fib, args=(35,))t.start()main_thread = threading.currentThread()for t in threading.enumerate():if t is main_thread:continuet.join()nothread()
hasthread()
"""
COST: 7.413999795913696
COST: 6.875999927520752
"""

运行的结果你猜猜会怎么样:

COST: 7.413999795913696
COST: 6.875999927520752

这种情况还不如不用多线程!

GIL 是必须的,这是 Python 设计的问题:Python 解释器是非线程安全的。这意味着当从线程内尝试安全的访问 Python 对象的时候将有一个全局的强制锁。 在任何时候,仅仅一个单一的线程能够获取 Python 对象或者 C API。每 100 个字节的 Python 指令解释器将重新获取锁,这(潜在的)阻塞了 I/O 操作。因为锁,CPU 密集型的代码使用线程库时,不会获得性能的提高(但是当它使用之后介绍的多进程库时,性能可以获得提高)。

那是不是由于 GIL 的存在,多线程库就是个「鸡肋」呢?当然不是。事实上我们平时会接触非常多的和网络通信或者数据输入 / 输出相关的程序,比如网络爬虫、文本处理等等。这时候由于网络情况和 I/O 的性能的限制,Python 解释器会等待读写数据的函数调用返回,这个时候就可以利用多线程库提高并发效率了。

同步机制

Python 线程包含多种同步机制:

1. Semaphore(信号量)

在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,需要进行同时访问的数量(通常是 1)。信号量同步基于内部计数器,每调用一次 acquire (),计数器减 1;每调用一次 release (),计数器加 1. 当计数器为 0 时,acquire () 调用被阻塞。

# coding: utf-8
# Author: shelley
# 2020/9/22,10:31
import time
from random import random
from threading import Thread, Semaphoredef foo(tid,sema):with sema:print ('{} acquire sema'.format(tid))wt = random() * 2time.sleep(wt)print('{} release sema'.format(tid))print('done')if __name__ == '__main__':import timetic = time.time()sema = Semaphore(10)  # 同时能访问资源的数量为3threads = []for i in range(100):t = Thread(target=foo, args=(i,sema))threads.append(t)t.start()for t in threads:t.join()toc = time.time()print(toc-tic)# 10.71899962425232tic = time.time()sema = Semaphore(2)  # 同时能访问资源的数量为3threads = []for i in range(100):t = Thread(target=foo, args=(i,sema))threads.append(t)t.start()for t in threads:t.join()toc = time.time()print(toc - tic)# 53.01680016517639

这个例子中,我们限制了同时能访问资源的数量为 3。看一下执行的效果:

0 acquire sema
1 acquire sema
2 acquire sema
3 acquire sema
4 acquire sema
5 acquire sema
6 acquire sema
7 acquire sema
8 acquire sema
9 acquire sema
9 release sema
45 acquire sema
done
0 release sema
done
11 acquire sema
2 release sema
done
12 acquire sema

2. Lock(锁)

Lock 也可以叫做互斥锁,其实相当于信号量为 1。我们先看一个不加锁的例子:

# coding: utf-8
# Author: shelley
# 2020/9/22,11:47
import time
from threading import Thread# value为全局变量,同时运行多个getlock函数,
# 内存中保存的value在不同时刻是不同的,
# 所以,不同的getlock获得value是不同的,
# 最后value的值也是不定的。
value = 0def getlock():global valuenew = value + 1time.sleep(0.001)  # 使用sleep让线程有机会切换value = newthreads = []
# 同时运行多个getlock函数
for i in range(100):t = Thread(target=getlock)t.start()threads.append(t)for t in threads:t.join()print(value)

执行一下:

25

大写的黑人问号。不加锁的情况下,结果会远远的小于 100。那我们加上互斥锁看看:

# coding: utf-8
# Author: shelley
# 2020/9/22,11:45
import time
from threading import Thread, Lockvalue = 0def getlock():# 不同的getlock函数读取value时,由于有lock,只能# 由一个getlock读取,其他不能读取,等该getlock释放# lock锁后,其他getlock函数才可以读取。global valuewith lock:new = value + 1time.sleep(0.001)value = newif __name__ == '__main__':threads = []lock = Lock()# 运行多个getlock函数,for i in range(100):t = Thread(target=getlock)t.start()threads.append(t)for t in threads:t.join()print(value)

我们对 value 的自增加了锁,就可以保证了结果了:

100

3. RLock(可重入锁)

acquire () 能够不被阻塞的被同一个线程调用多次。但是要注意的是 release () 需要调用与 acquire () 相同的次数才能释放锁。

4. Condition(条件)

一个线程等待特定条件,而另一个线程发出特定条件满足的信号。最好说明的例子就是「生产者 / 消费者」模型:

# coding: utf-8
# Author: shelley
# 2020/9/22,11:48
import time
import threading"""
代码只运行一次,不像event一直运行
"""product = 19def consumer(cond):global productt = threading.currentThread()  # 当前thread的名字with cond:cond.wait()  # wait()方法创建了一个名为waiter的锁,并且设置锁的状态为locked。# 这个waiter锁用于线程间的通讯if product>0:product-=1print('{}: Resource is available to consumer'.format(t.name))else:print('Resource exhausted')def producer(cond):global productt = threading.currentThread()with cond:print ('{}: Making resource available'.format(t.name))product+=1cond.notifyAll()  # 释放waiter锁,唤醒消费者# 把对应的线程联系在一起,线程之间的关系
condition = threading.Condition()c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p1 = threading.Thread(name='p1', target=producer, args=(condition,))
p2 = threading.Thread(name='p2', target=producer, args=(condition,))c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p1.start()
p2.start()"""
help(threading.Condition)
返回的是一个类,该类实现了一个条件变量,该变量允许许多等待的线程可被另一个线程唤醒。
lock参数如果不空,必须是Lock或是RLock对象,而且用为潜在锁。实现的方法:
notify(self, n=1)
唤醒一个或多个正在等待的线程。
如果调用这个方法的线程没有获得lock,则会出现RuntimeError
此方法最多唤醒n个等待条件变量的线程; 如果没有线程正在等待,则为空操作。notifyAll = notify_all(self)
notify_all(self)
唤醒这个条件下所有等待中的线程
如果调用这个方法的线程没有获得lock,则会出现RuntimeErrorwait(self, timeout=None)
一直等待直到被唤醒,或是时间到。
如果调用这个方法的线程没有获得lock,则会出现RuntimeErrorwait_for(self, predicate, timeout=None)
一直等待直到条件为真。"""

执行一下:

p1: Making resource available
c1: Resource is available to consumer
c2: Resource is available to consumer
p2: Making resource available

可以看到生产者发送通知之后,消费者都收到了。

5. Event

一个线程发送 / 传递事件,另外的线程等待事件的触发。我们同样的用「生产者 / 消费者」模型的例子:

# coding: utf-8
# Author: shelley
# 2020/9/22,15:46
import time
import threading
from random import randint"""
一个线程发送 / 传递事件,另外的线程等待事件的触发。
不出现异常,程序一直运行!!!,原因:while 1 的问题
任何时刻都只有一个线程在运行
"""
TIMEOUT = 2def consumer(event, l):t = threading.currentThread()  # 当前运行的线程while 1:event_is_set = event.wait(TIMEOUT)  # 设置等待时间,如果超过抛出异常# 如果flag为true,即event_is_set为true,则唤醒进程if event_is_set:try:integer = l.pop()print ('{} popped from list by {}'.format(integer, t.name))event.clear()  # 重置事件状态except IndexError:  # 为了让刚启动时容错passdef producer(event, l):t = threading.currentThread()while 1:integer = randint(10, 100)l.append(integer)print ('{} appended to list by {}'.format(integer, t.name))event.set()  # 把flag设置为true,激活线程time.sleep(1)# flag的初始值为false
# 把所有线程联系在一起,保存线程之间的唤醒,阻塞等。
event = threading.Event()l = []
threads = []for name in ('consumer1', 'consumer2'):t = threading.Thread(name=name, target=consumer, args=(event, l))t.start()threads.append(t)p = threading.Thread(name='producer', target=producer, args=(event, l))
p.start()
threads.append(p)# for t in threads:
#     t.join()"""
t = threading.Thread()class Thread(builtins.object)
这是一个类,有两种方法调用对象,1. 给构造函数传递一个可调用函数。2.重写run()方法
__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)*group*:必须是None,用于未来的拓展
*target*:可调用函数,由run()调用,默认是None,表示没有方法可调用
*name*:线程的名称,默认是一个唯一的名字,形式为"Thread-N",N是一个小整数
*args*:target函数的参数,默认为空列表
*kwargs*:是target函数的字典形式的关键字参数,默认为{}is_alive(self)
返回线程在运行中。|      |      This method returns True just before the run() method starts until just|      after the run() method terminates. The module function enumerate()|      returns a list of all alive threads.|  |  join(self, timeout=None)|      Wait until the thread terminates.|      |      This blocks the calling thread until the thread whose join() method is|      called terminates -- either normally or through an unhandled exception|      or until the optional timeout occurs.|      |      When the timeout argument is present and not None, it should be a|      floating point number specifying a timeout for the operation in seconds|      (or fractions thereof). As join() always returns None, you must call|      isAlive() after join() to decide whether a timeout happened -- if the|      thread is still alive, the join() call timed out.|      |      When the timeout argument is not present or None, the operation will|      block until the thread terminates.|      |      A thread can be join()ed many times.|      |      join() raises a RuntimeError if an attempt is made to join the current|      thread as that would cause a deadlock. It is also an error to join() a|      thread before it has been started and attempts to do so raises the same|      exception.|  |  run(self)|      Method representing the thread's activity.|      |      You may override this method in a subclass. The standard run() method|      invokes the callable object passed to the object's constructor as the|      target argument, if any, with sequential and keyword arguments taken|      from the args and kwargs arguments, respectively.|  |  setDaemon(self, daemonic)|  |  setName(self, name)|  |  start(self)|      Start the thread's activity.|      |      It must be called at most once per thread object. It arranges for the|      object's run() method to be invoked in a separate thread of control.|      |      This method will raise a RuntimeError if called more than once on the|      same thread object.|  |  ----------------------------------------------------------------------|  Data descriptors defined here:|  |  __dict__|      dictionary for instance variables (if defined)|  |  __weakref__|      list of weak references to the object (if defined)|  |  daemon|      A boolean value indicating whether this thread is a daemon thread.|      |      This must be set before start() is called, otherwise RuntimeError is|      raised. Its initial value is inherited from the creating thread; the|      main thread is not a daemon thread and therefore all threads created in|      the main thread default to daemon = False.|      |      The entire Python program exits when no alive non-daemon threads are|      left.|  |  ident|      Thread identifier of this thread or None if it has not been started.|      |      This is a nonzero integer. See the thread.get_ident() function. Thread|      identifiers may be recycled when a thread exits and another thread is|      created. The identifier is available even after the thread has exited.|  |  name|      A string used for identification purposes only.|      |      It has no semantics. Multiple threads may be given the same name. The|      initial name is set by the constructor.
""""""
t.join()
join(timeout=None) 是threading实例里的一个方法。
线程实例在结束前一直等待。这个堵塞运行中的线程,直到线程的join()方法被调用,才会激活线程。如果提供timeout参数,必须是float型的,声明在几秒内的操作时间。
join()返回None,所以在join()后必须调用isAlive(),以表示timeout是否发生。
如果该线程还在运行中,join调用timeout。如果timeout参数没有提供,那么操作会被阻塞直到线程中断。一个线程可以调用join多次。"""
"""
threading.Event()Events管理flag变量,可以使用set()设为true或是使用reset设为false
wait()方法一直blocks直到flag是true,flag的初始值为false。clear():重新给flag赋值为false,接下来,线程调用wait()将会被阻塞,直到set()被调用,
并给flag赋值为true。is_set(self):返回true如果flag为trueset(self):把flag设置为true
所有的线程都等待flag设置为true,这样线程才会被唤醒。
一旦flag为true,线程将会调用wait,而且不会再被block。wait(self, timeout=None):block直到flag设置为true。
如果内部标志在输入时为true,则立即返回true。否则,阻塞直到另一个线程
调用set()将标志flag设置为true,或者直到发生可选的超时为止。"""

执行的效果是这样的:

p1: Making resource 20 available
c2: Resource 20 is available to consumer
c1: Resource 19 is available to consumer
p2: Making resource 19 available
p2: Making resource 20 available
p1: Making resource 21 available
c1: Resource 21 is available to consumer
p1: Making resource 21 available
c2: Resource 21 is available to consumer

可以看到事件被 2 个消费者比较平均的接收并处理了。如果使用了 wait 方法,线程就会等待我们设置事件,这也有助于保证任务的完成。

6. Queue

队列在并发开发中最常用的。我们借助「生产者 / 消费者」模式来理解:生产者把生产的「消息」放入队列,消费者从这个队列中对去对应的消息执行。

大家主要关心如下 4 个方法就好了:

  1. put: 向队列中添加一个项。
  2. get: 从队列中删除并返回一个项。
  3. task_done: 当某一项任务完成时调用。
  4. join: 阻塞直到所有的项目都被处理完。
# coding: utf-8
# Author: shelley
# 2020/9/23,14:01
import time
import threading
from random import random
from queue import Queue"""
代码一直运行,没有结束的标识???
原因:while: 1 的问题"""q = Queue()
"""
Queue 模块还自带了 PriorityQueue(带有优先级)和 LifoQueue(后进先出)
"""def double(n):return n * 2def producer():while 1:wt = random()time.sleep(wt)q.put((double, wt))def consumer():while 1:task, arg = q.get()print (arg, task(arg))q.task_done()for target in(producer, consumer):t = threading.Thread(target=target)t.start()

这就是最简化的队列架构。

Queue 模块还自带了 PriorityQueue(带有优先级)和 LifoQueue(后进先出)2 种特殊队列。我们这里展示下线程安全的优先级队列的用法, PriorityQueue 要求我们 put 的数据的格式是(priority_number, data),我们看看下面的例子:

# coding: utf-8
# Author: shelley
# 2020/9/23,14:13
import time
import threading
from random import randint
from queue import PriorityQueue"""
由于任何时刻只能运行一个线程,当producer运行完,
consumer才会运行,
"""
q = PriorityQueue()def double(n):return n * 2def producer():count = 0while 1:if count > 5:breakpri = randint(0, 100)print('put :{}'.format(pri))q.put((pri, double, pri))  # (priority, func, args)count += 1def consumer():while 1:if q.empty():breakpri, task, arg = q.get()print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))q.task_done()time.sleep(0.1)t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()

其中消费者是故意让它执行的比生产者慢很多,为了节省篇幅,只随机产生 5 次随机结果。我们看下执行的效果:

put :95
put :67
put :68
put :49
put :33
put :73
[PRI:33] 33 * 2 = 66
[PRI:49] 49 * 2 = 98
[PRI:67] 67 * 2 = 134
[PRI:68] 68 * 2 = 136
[PRI:73] 73 * 2 = 146
[PRI:95] 95 * 2 = 190

可以看到 put 时的数字是随机的,但是 get 的时候先从优先级更高(数字小表示优先级高)开始获取的。

线程池

面向对象开发中,大家知道创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。无节制的创建和销毁线程是一种极大的浪费。那我们可不可以把执行完任务的线程不销毁而重复利用呢?仿佛就是把这些线程放进一个池子,一方面我们可以控制同时工作的线程数量,一方面也避免了创建和销毁产生的开销。

线程池在标准库中其实是有体现的,只是在官方文章中基本没有被提及:

# coding: utf-8
# Author: shelley
# 2020/9/23,14:31
from multiprocessing.pool import ThreadPoolpool = ThreadPool(5)
res = pool.map(lambda x:x**2, range(100))
print(res)"""
class ThreadPool(Pool)
__init__(self, processes=None, initializer=None, initargs=())apply(self, func, args=(), kwds={}):
和func(*args, **kwds)相等apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)
apply()的Asynchronous版本imap(self, func, iterable, chunksize=1)
和map()相同,比Pool.map()慢一些imap_unordered(self, func, iterable, chunksize=1)
和imap()相同,但结果是的顺序是随意的map(self, func, iterable, chunksize=None)
iterable里的每个元素都传递到func中进行处理,
所有结果都保存在list中。map_async(self, func, iterable, chunksize=None, callback=None, error_callback=None)
map()的Asynchronous版本starmap(self, func, iterable, chunksize=None)
和map()相似,但是iterable的元素要求也是可以迭代的,并且可以分解为参数,
即`func` and (a, b)变为func(a, b)starmap_async(self, func, iterable, chunksize=None, callback=None, error_callback=None)
starmap()的Asynchronous版"""

当然我们也可以自己实现一个:

# coding: utf-8
# Author: shelley
# 2020/9/23,14:46
import time
import threading
from random import random
from queue import Queuedef double(n):return n * 2class Worker(threading.Thread):def __init__(self, queue):super(Worker, self).__init__()self._q = queueself.daemon = Trueself.start()def run(self):while 1:f, args, kwargs = self._q.get()try:print('USE: {}'.format(self.name))   # 线程 名字print(f(*args, **kwargs))except Exception as e:print(e)self._q.task_done()class ThreadPool(object):def __init__(self, num_t=5):self._q = Queue(num_t)# Create Worker Threadfor _ in range(num_t):Worker(self._q)def add_task(self, f, *args, **kwargs):self._q.put((f, args, kwargs))def wait_complete(self):self._q.join()pool = ThreadPool()
for _ in range(8):wt = random()pool.add_task(double, wt)time.sleep(wt)
pool.wait_complete()

执行一下:

USE: Thread-1
1.3049898070261405
USE: Thread-2
1.7351442214984278
USE: Thread-3
1.738625933301346
USE: Thread-4
0.25336362356113806
USE: Thread-5
1.0201166583040362
USE: Thread-1
1.1103793578596903
USE: Thread-2
1.3763251363959381
USE: Thread-3
0.28749344108468944

线程池会保证同时提供 5 个线程工作,但是我们有 8 个待完成的任务,可以看到线程按顺序被循环利用了。

参考:

https://www.dongwm.com/post/76/

理解Python并发编程一篇就够了 - 线程篇相关推荐

  1. 理解python并发编程_Python并发编程很简单

    上次已经和大家探讨了关于进程和线程的区别和联系相关的东东,今天呢,咱们再次回到 好啦,废话少说,咱们就开始吧! 首先说一下哦,_thread和threading哦,到这可能有朋友会问了,这两个有什么区 ...

  2. Python并发编程理论篇

    Python并发编程理论篇 前言 很多人学习python,不知道从何学起. 很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手. 很多已经做案例的人,却不知道如何去学习更加高深的知识 ...

  3. python并发编程:协程asyncio、多线程threading、多进程multiprocessing

    python并发编程:协程.多线程.多进程 CPU密集型计算与IO密集型计算 多线程.多进程与协程的对比 多线程 创建多线程的方法 多线程实现的生产者-消费者爬虫 Lock解决线程安全问题 使用线程池 ...

  4. 深入浅出讲解Python并发编程

    微信公众号:运维开发故事,作者:素心 Python并发编程 本文比较长,绕的也比较快,需要慢慢跟着敲代码并亲自运行一遍,并发编程本身来说就是编程里面最为抽象的概念,单纯的理论确实很枯燥,但这是基础,基 ...

  5. 深入理解Python异步编程

    声明:本文为转载内容 前言 很多朋友对异步编程都处于"听说很强大"的认知状态.鲜有在生产项目中使用它.而使用它的同学,则大多数都停留在知道如何使用 Tornado.Twisted. ...

  6. 深入理解 Python 异步编程

    原文地址:点击打开链接 来源:阿驹(微信公号:驹说码事) 如有好文章投稿,请点击 → 这里了解详情 前言 很多朋友对异步编程都处于"听说很强大"的认知状态.鲜有在生产项目中使用它. ...

  7. 一文搞明白Python并发编程和并行编程

    目录 前言 一.基础知识 1.并行和并发 (1)定义 (2)联系 2.进程.线程和协程 (1)定义 (2)联系 3.生成器 (1)yield (2)send, next (3)yield from 4 ...

  8. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  9. Python并发编程系列之多进程(multiprocessing)

    1 引言 本篇博文主要对Python中并发编程中的多进程相关内容展开详细介绍,Python进程主要在multiprocessing模块中,本博文以multiprocessing种Process类为中心 ...

最新文章

  1. 几行代码实现神奇移动的过渡动画
  2. java 打包目录_Java打包文件目录问 zip文件
  3. 人们对社会与金钱奖赏的预期共享神经环路
  4. Cardinality 对执行计划的重要性
  5. 影子系统、沙箱、虚拟机之间的区别
  6. 数学篇(一) 矩阵运算
  7. php网站xml链接,xml图像超链接的制作代码
  8. 又做了3个极品菜[图]
  9. 20180813-20180817
  10. redis核心技术与实战(二)缓存应用篇
  11. 解除Linux最大进程数和最大文件句柄打开数限制
  12. 电影院售票系统,电影院订票系统,电影院购票管理系统计算机毕业设计
  13. Win10PE启动维护工具 | U盘WinPE下载
  14. 长期趋势的测定方法-时距扩大法和移动平均法
  15. 用GoldWave制作合唱的四重奏回音效果
  16. Charles cannot configure your proxy settings while it is on a read-only volume
  17. VS2008 简体中文正式版序列号(到期解决办法)
  18. DELPHI盒子FTP登录地址及用户密码
  19. igh+preempt_rt主战搭建
  20. 【前端技术】一篇文章搞掂:JS

热门文章

  1. 神舟linux装win7教程,神舟笔记本win10系统改win7系统详细图文步骤
  2. STM32F工程移植注意事项
  3. ocr图片识别文字工具笔记(包括汉王)
  4. scikit-learn工具包中分类模型predict_proba、predict、decision_function用法详解
  5. 软件体系结构期末考试复习题(题中页码 与软件体系结构原理、方法与实践第2版 张友生编著 匹配)
  6. linux pscp 上传_windows与linux传输工具pscp详解
  7. RTMP、HTTP-FLV、HLS,你了解常见的三大直播协议吗
  8. 【开发教程3】开源蓝牙心率防水运动手环-开发环境搭建
  9. 车载音频总线A2B编解码系统
  10. java随机点名器_随机点名器(Java实现、读取txt文件)