python线程池阻塞队列_福利又来啦!python多线程进阶篇
使用Python中的线程模块,能够同时运行程序的不同部分,并简化设计。如果你已经入门Python,并且想用线程来提升程序运行速度的话,希望这篇教程会对你有所帮助。
通过阅读本文,你将了解到:什么是死锁?
python线程间通信?
Python的线程池?
python信号量使用?
什么是死锁
死锁:当线程A持有独占锁a,并尝试去获取独占锁b的同时,线程B持有独占锁b,并尝试获取独占锁a的情况下,就会发生AB两个线程由于互相持有对方需要的锁,而发生的阻塞现象,我们称为死锁。 即死锁是指多个进程因竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法向前推进。
死锁的原因竞争系统资源
进程运行推进的顺序不当
资源分配不当
产生死锁的四个必要条件互斥条件:一个资源每次只能被一个进程使用
请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。
解决死锁的办法减少资源占用时间,可以降低死锁放生的概率。
银行家算法。银行家算法的本质是优先满足占用资源较少的任务。
理解了死锁的原因,尤其是产生死锁的四个必要条件,就可以最大可能地避免、预防和解除死锁。
所以,在系统设计、进程调度等方面注意如何不让这四个必要条件成立,如何确定资源的合理分配算法,避免进程永久占据系统资源。
死锁代码
def task1(lock1, lock2):
if lock1.acquire():
print('{}获取到lock1锁。。。。'.format(current_thread().name))
for i in range(5):
print('{}------------->{}'.format(current_thread().name, i))
time.sleep(0.01)
if lock2.acquire(timeout=2):
print('{}获取了lock1,lock2'.format(current_thread().name))
lock2.release()
lock1.release()
def task2(lock1, lock2):
if lock2.acquire():
print('{}获取到lock2锁。。。。'.format(current_thread().name))
for i in range(5):
print('{}----->{}'.format(current_thread().name, i))
time.sleep(0.01)
if lock1.acquire(timeout=2):
print('{}获取了lock1,lock2'.format(current_thread().name))
lock1.release()
lock2.release()
if __name__ == '__main__':
lock1 = Lock()
lock2 = Lock()
t1 = Thread(target=task1, args=(lock1, lock2))
t2 = Thread(target=task2, args=(lock1, lock2))
t1.start()
t2.start()
python线程间通信
如果各个线程之间各干各的,确实不需要通信,这样的代码也十分的简单。但这一般是不可能的,至少线程要和主线程进行通信,不然计算结果等内容无法取回。而实际情况中要复杂的多,多个线程间需要交换数据,才能得到正确的执行结果。
Queue消息队列
python中Queue是消息队列,提供线程间通信机制,python3中重名为为queue,queue模块块下提供了几个阻塞队列,这些队列主要用于实现线程通信。
在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。Queue(maxsize=0):创建一个FIFO队列,若给定最大值,队列没有空间时阻塞,否则是无限队列
LifoQueue(maxsize=0):创建一个栈,maxsize含义同上
PriorityQueue(maxsize=0):创建一个优先队列,maxsize含义同上
Queue对象方法:qsize():返回队列大小,是近似值(返回时可能队列大小被修改了)
empty():判断队列是否为空
full():判断队列是否为满
put(item, block=True, timeout=None):将item加入队列,可选阻塞和阻塞时间
put_nowait(item):即put(item, False)
get(block=True, timeout=None):从队列中获取元素,可选阻塞和阻塞时间
get_nowait():即get(False)
task_done():用于表示队列中某个元素已经执行完成,会被join()调用
join():队列中所有元素执行完毕并调用task_done()信号之前,保持阻塞
Queue模块异常:Empty:对空队列调用 get(timeout=n),如果等待n秒钟队列还是空的就会抛出异常
Full:对满队列调用put(item,timeout=n),如果等待n秒钟仍然是满的就会抛出异常
简单代码演示
import random
import time
from queue import Queue
queue = Queue(3)
queue.put('香蕉')
queue.put('榴莲')
queue.put('西瓜')
queue.put('苹果')
print(queue.get())
print(queue.get())
print(queue.get())
此时代码会阻塞,因为queue中内容已满,此时可以在第四个queue.put('苹果')后面添加timeout,则成为 queue.put('苹果',timeout=1)如果等待1秒钟仍然是满的就会抛出异常.可以捕获异常。
import random
import time
from queue import Queue
queue = Queue(3)
try:
queue.put('香蕉')
queue.put('榴莲')
queue.put('西瓜')
queue.put('苹果',timeout=1)
print(queue.get())
print(queue.get())
print(queue.get())
except Exception as e:
print(e)
同理如果队列是空的,无法获取到内容默认也会阻塞,如果不阻塞可以使用queue.get_nowait()。
使用Queue完成线程间通信
在掌握了 Queue 阻塞队列的特性之后,在下面程序中就可以利用 Queue 来实现线程通信了。
下面演示一个生产者和一个消费者,当然都可以多个
import random
import time
from threading import Thread, current_thread
from queue import Queue
def producer(queue):
print('{}开门啦!'.format(current_thread().name))
foods = ['红烧狮子头', '香肠烤饭', '蒜蓉生蚝', '酸辣土豆丝', '肉饼']
for i in range(1, 21):
food = random.choice(foods)
print('{}正在加工中.....'.format(food))
time.sleep(1)
print('加工完成可以上菜了...')
queue.put(food)
queue.put(None)
def consumer(queue):
print('{}来吃饭啦'.format(current_thread().name))
while True:
food = queue.get()
if food:
print('正在享用美食:', food)
time.sleep(0.5)
else:
print('{}把饭店吃光啦,走人...'.format(current_thread().name))
break
if __name__ == '__main__':
queue = Queue(8)
t1 = Thread(target=producer, name='老家肉饼', args=(queue,))
t2 = Thread(target=consumer, name='坤坤', args=(queue,))
t1.start()
t2.start()
使用queue模块,可在线程间进行通信,并保证了线程安全
Python的线程池
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
线程池的使用
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
Exectuor 提供了如下常用方法:submit(fn,*args,**kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,**kwargs 代表以关键字参数的形式为 fn 函数传入参数。
map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
shutdown(wait=True):关闭线程池。程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下: 1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。 2. 定义一个普通函数作为线程任务。 3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。 4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
下面程序示范了如何使用线程池来执行线程任务:
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()
线程池的使用场合:单个任务处理的时间比较短;
需要处理的任务数量大;
python信号量使用
信号量semaphore 是用于控制进入数量的锁。有哪些应用场景呢,比如说在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。又比如在做爬虫的时候,有时候爬取速度太快了,会导致被网站禁止,所以这个时候就需要控制爬虫爬取网站的频率。
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release()时+1。计数器不能小于0,当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
下面程序演示了信号量的使用:
import threading
import time
def func(s):
# s.acquire() 阻塞
with s:
time.sleep(1)
print(threading.current_thread().name)
print(time.ctime())
# s.release()
if __name__ == '__main__':
s = threading.Semaphore(5)
for x in range(40):
t = threading.Thread(target=func, args=(s,))
t.start()
python线程池阻塞队列_福利又来啦!python多线程进阶篇相关推荐
- 【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )
文章目录 一.线程池阻塞队列 二.拒绝策略 三.使用 ThreadPoolExecutor 自定义线程池参数 一.线程池阻塞队列 线程池阻塞队列是线程池创建的第 555 个参数 : BlockingQ ...
- android 线程池 阻塞队列,【Android框架进阶〖02〗】ThinkAndroid线程池机制
/************************************************************************************************** ...
- python 线程池与队列简单应用
import random from concurrent.futures import ThreadPoolExecutor from queue import Queue import threa ...
- python中导入模块队列_【每日学习】Python中模块的导入
模块的概念: 每一个以扩展名py结束的Python源代码文件都是一个模块 模块名同样也是一个标识符,需要符合标识符的命名规则 在模块中定义的全局变量.函数.类都是提供给外界直接使用的工具 模块就好比工 ...
- python线程池使用和问题记录
记录一次使用多线程的问题 背景 最近工作有个需求根据文件中的数据请求中台服务,然后解析返回值.文件中每行代表一个参数,使用post方式携带参数请求中台接口. 分析:需要处理的数据量非常大(近200w行 ...
- Python 线程池 ThreadPoolExecutor(二) - Python零基础入门教程
目录 一.Python 线程池前言 二.Python 线程池 ThreadPoolExecutor 常用函数 1.线程池 as_completed 函数使用 2.线程池 map 函数使用 3.线程池 ...
- Python 线程池 ThreadPoolExecutor(一) - Python零基础入门教程
目录 一.Python 线程池前言 二.Python 线程池原理 三.Python 线程池 ThreadPoolExecutor 函数介绍 四.Python 线程池 ThreadPoolExecuto ...
- python3 线程池源码解析_5分钟看懂系列:Python 线程池原理及实现
概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器 ...
- dict实现原理 python_5分钟看懂系列:Python 线程池原理及实现
本文的文字及图片来源于网络,仅供学习.交流使用,不具有任何商业用途,如有问题请及时联系我们以作处理 概述 传统多线程方案会使用"即时创建, 即时销毁"的策略.尽管与创建进程相比,创 ...
最新文章
- 测试和恢复性的争论:面向对象vs.函数式编程
- 学习笔记Spark(七)—— Spark SQL应用(2)—— Spark DataFrame基础操作
- ALE IDoc RFC of SAP
- java 错误声音播放器_java 音频播放器出不了声音,代码里哪有问题啊?
- 随想(二):简化别人的工作,就是简化自己的工作,节省自己的时间
- 谈谈前后端分离实践中如何提升RESTful API开发效率
- 如何挑选家用交换机 交换机选购攻略分享
- [配置中心] --- consul
- linux usermod修改用户所在组方法
- 查看数据库中存在触发器的表
- Apache代理Tomcat实现session共享构建网上商城系统
- 最新Java面试资料整理
- Xilinx FPGA bit 文件加密
- 轻松摆好人像摄影姿势
- java 数字拆分_如何在java中分割数字?
- photoshop中如何在6寸相纸上打印1寸照片12张3X4模式(手动拖动模式)
- WIA的使用及自定义可拖拽大小的picturebox
- 查看linux系统IPV6地址
- 定风波·三月七日沙湖道中遇雨
- Java反射机制之初见端倪
热门文章
- java用for循环查询数据_使用for循环结果创建数据框
- java 两个值对换_java将两个整型变量值进行互换的几种实现方法
- numpy 数组 填充 0、1和各种值
- 数学知识--Methods for Non-Linear Least Squares Problems(第二章)
- 语义分割--Pixel Deconvolutional Networks
- keras和tensorflow使用 keras.callbacks.EarlyStopping 提前结束训练
- Java常见面试题(持续更新)
- jQuery 变量数字相加
- python延时一秒_python如何最快毫秒速度使用requests?
- App自动化元素定位不到?别慌,3大工具帮你解决