python 线程同步_python线程同步(2)
Barrier
有人翻译成栅栏,建议使用屏障,可以想象成路障,道闸。
3.2引入python的新功能。
Barrier(parties,action = None,timeout = None):构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值。
n_waiting:当前在屏障中等待的线程数。
parties:各方数,就是需要多少个等待。
wait(timeout = None):等待通过屏障,返回0到线程数减1的整数,每个线程返回不同。如果wait方法设置了超时,并超时发送,屏障将处于broken状态。
Barrier实例
importthreadingimportloggingimportrandom
FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(barrier:threading.Barrier):
logging.info("waiting for {} threads.".format(barrier.n_waiting))try:
barrier_id=barrier.wait()
logging.info("after barrier {}".format(barrier_id))exceptthreading.BrokenBarrierError:
logging.info("broken Barrier")
barrier= threading.Barrier(3)#3是参与方,partiesfor x in range(3):#改成4,5,6试一试
threading.Thread(target=worker,name="worker-{}".format(x),args=(barrier,)).start()
logging.info("started")
结果为:2019-11-26 17:06:48,333 [ worker-0,(thread)8d] waiting for0 threads.2019-11-26 17:06:48,333 [ worker-1,(thread)8d] waiting for 1threads.2019-11-26 17:06:48,334 [ worker-2,(thread)8d] waiting for 2threads.2019-11-26 17:06:48,334 [ worker-2,(thread)8d] after barrier 2
2019-11-26 17:06:48,334[ MainThread,(thread)8d] started2019-11-26 17:06:48,334 [ worker-1,(thread)8d] after barrier 1
2019-11-26 17:06:48,334 [ worker-0,(thread)8d] after barrier 0
当换成4,5的时候,上面的结果一样,但是程序还没有执行完,一直在等待。当为6的时候,三个一波刚好结束。
从运行结果来看:
所有线程冲到了Barrier前等待,直到到达parties的数目,屏障打开,所有线程停止等待,继续执行。
再有线程wait,屏障就绪等到到达参数方数目。
举例,赛马比赛所有马匹就位,开闸,下一批马匹陆续来到闸门前等待比赛。
broken:如果屏障处于打破的状态,返回True。
abort():将屏障置于broken状态,等待中的线程或者调用等待方法的线程中都会抛出BrokenBarrierError异常,直到reset方法来恢复屏障。
reset():恢复屏障,重新开始拦截。
importthreadingimportloggingimportrandom
FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(barrier: threading.Barrier):
logging.info("waiting for {} threads.".format(barrier.n_waiting))try:
barrier_id=barrier.wait()
logging.info("after barrier {}".format(barrier_id))exceptthreading.BrokenBarrierError:
logging.info("broken Barrier .run.")
barrier= threading.Barrier(3)for x in range(0,9):if x ==2:
barrier.abort()elif x==6:
barrier.reset()
threading.Event().wait(1)
threading.Thread(target=worker, name="worker-{}".format(x), args=(barrier,)).start()
logging.info("started")
结果为:2019-11-26 17:21:41,398 [ worker-0,(thread)8d] waiting for0 threads.2019-11-26 17:21:42,410 [ worker-1,(thread)8d] waiting for 1threads.2019-11-26 17:21:42,411 [ worker-1,(thread)8d] broken Barrier .run.2019-11-26 17:21:42,411 [ worker-0,(thread)8d] broken Barrier .run.2019-11-26 17:21:43,423 [ worker-2,(thread)8d] waiting for0 threads.2019-11-26 17:21:43,424 [ worker-2,(thread)8d] broken Barrier .run.2019-11-26 17:21:44,439 [ worker-3,(thread)8d] waiting for0 threads.2019-11-26 17:21:44,439 [ worker-3,(thread)8d] broken Barrier .run.2019-11-26 17:21:45,453 [ worker-4,(thread)8d] waiting for0 threads.2019-11-26 17:21:45,453 [ worker-4,(thread)8d] broken Barrier .run.2019-11-26 17:21:46,465 [ worker-5,(thread)8d] waiting for0 threads.2019-11-26 17:21:46,466 [ worker-5,(thread)8d] broken Barrier .run.2019-11-26 17:21:47,481 [ worker-6,(thread)8d] waiting for0 threads.2019-11-26 17:21:48,495 [ worker-7,(thread)8d] waiting for 1threads.2019-11-26 17:21:49,509 [ worker-8,(thread)8d] waiting for 2threads.2019-11-26 17:21:49,509 [ worker-8,(thread)8d] after barrier 2
2019-11-26 17:21:49,509 [ worker-6,(thread)8d] after barrier 02019-11-26 17:21:49,510[ Main Thread,(thread)8d] started2019-11-26 17:21:49,510 [ worker-7,(thread)8d] after barrier 1
上例中,屏障中等待了2个,屏障就被break了,waiting的线程抛了BrokenBarrierError异常,新wait的线程也抛异常,直到屏障恢复,才继续按照parties数目要求继续拦截线程。
wait方法超时实例
如果wait方法超时发生,屏障将处于broken状态,直到reset。
importthreadingimportloggingimportrandom
FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(barrier: threading.Barrier,i:int):
logging.info("waiting for {} threads.".format(barrier.n_waiting))try:
logging.info(barrier.broken)#是否brokrn
if i<3:
barrier_id= barrier.wait(1)#超时后,屏障broken
else:if i==6:
barrier.reset()#恢复屏障
barrier_id =barrier.wait()
logging.info("after barrier {}".format(barrier_id))exceptthreading.BrokenBarrierError:
logging.info("broken Barrier .run.")
barrier= threading.Barrier(3)for x in range(0,9):
threading.Event().wait(2)
threading.Thread(target=worker, name="worker-{}".format(x), args=(barrier,x)).start()
结果为:2019-11-26 17:30:06,811 [ worker-0,(thread)8d] waiting for0 threads.2019-11-26 17:30:06,811 [ worker-0,(thread)8d] False2019-11-26 17:30:07,822 [ worker-0,(thread)8d] broken Barrier .run.2019-11-26 17:30:08,822 [ worker-1,(thread)8d] waiting for0 threads.2019-11-26 17:30:08,822 [ worker-1,(thread)8d] True2019-11-26 17:30:08,823 [ worker-1,(thread)8d] broken Barrier .run.2019-11-26 17:30:10,835 [ worker-2,(thread)8d] waiting for0 threads.2019-11-26 17:30:10,835 [ worker-2,(thread)8d] True2019-11-26 17:30:10,835 [ worker-2,(thread)8d] broken Barrier .run.2019-11-26 17:30:12,849 [ worker-3,(thread)8d] waiting for0 threads.2019-11-26 17:30:12,849 [ worker-3,(thread)8d] True2019-11-26 17:30:12,849 [ worker-3,(thread)8d] broken Barrier .run.2019-11-26 17:30:14,859 [ worker-4,(thread)8d] waiting for0 threads.2019-11-26 17:30:14,859 [ worker-4,(thread)8d] True2019-11-26 17:30:14,859 [ worker-4,(thread)8d] broken Barrier .run.2019-11-26 17:30:16,870 [ worker-5,(thread)8d] waiting for0 threads.2019-11-26 17:30:16,870 [ worker-5,(thread)8d] True2019-11-26 17:30:16,871 [ worker-5,(thread)8d] broken Barrier .run.2019-11-26 17:30:18,885 [ worker-6,(thread)8d] waiting for0 threads.2019-11-26 17:30:18,885 [ worker-6,(thread)8d] True2019-11-26 17:30:20,895 [ worker-7,(thread)8d] waiting for 1threads.2019-11-26 17:30:20,895 [ worker-7,(thread)8d] False2019-11-26 17:30:22,909 [ worker-8,(thread)8d] waiting for 2threads.2019-11-26 17:30:22,909 [ worker-8,(thread)8d] False2019-11-26 17:30:22,909 [ worker-8,(thread)8d] after barrier 2
2019-11-26 17:30:22,910 [ worker-7,(thread)8d] after barrier 1
2019-11-26 17:30:22,911 [ worker-6,(thread)8d] after barrier 0
Barrier应用
并发初始化
所有线程必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能正常工作。
10个线程做10种工作准备,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作。先完成的要等待后完成的线程。
例如,启动一个程序,需要先加载磁盘文件,缓存预热,初始化连接池等工作,这些工作可以启动并进,不过只有都满足了,程序才能继续向后执行,假设数据库连接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出。
工作量
有10个计算任务,完成6个,就算工作完成。
semaphore信号量
和lock很像, 信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计算为0就阻塞请求的线程,直到其他线程对信号量release后,计数大于0,恢复阻塞的线程。
Semaphore(value = 1):构造方法,value小于0,抛valueerror异常。
acquire(blocking= TRUE,timeout = None):获取信号量,计数器减1,获取成功返回True.
release():释放信号量,计数器加1.
计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。
importthreadingimportloggingimporttime
FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(s:threading.Semaphore):
logging.info("in sub thread")
logging.info(s.acquire())#阻塞
logging.info("sub thread over")#信号量
s = threading.Semaphore(3)
logging.info(s.acquire())print(s._value)
logging.info(s.acquire())print(s._value)
logging.info(s.acquire())print(s._value)
threading.Thread(target=worker,args=(s,)).start()
time.sleep(2)
logging.info(s.acquire(False))
logging.info(s.acquire(timeout= 3))#阻塞3秒
#释放
logging.info("released")
s.release()
结果为:2019-11-26 17:45:49,188[ MainThread,(thread)8d] True2
2019-11-26 17:45:49,188[ MainThread,(thread)8d] True2019-11-26 17:45:49,188[ MainThread,(thread)8d] True2019-11-26 17:45:49,188 [ Thread-1,(thread)8d] insub thread102019-11-26 17:45:51,188[ MainThread,(thread)8d] False2019-11-26 17:45:54,189[ MainThread,(thread)8d] False2019-11-26 17:45:54,189[ MainThread,(thread)8d] released2019-11-26 17:45:54,190 [ Thread-1,(thread)8d] True2019-11-26 17:45:54,190 [ Thread-1,(thread)8d] sub thread over
应用举例
连接池
因为资源有限,且开启一个连接成本高,所以,使用连接池。
一个简单的连接池
连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。
classConn:def __init__(self,name):
self.name=nameclassPool:def __init__(self,count:int):
self.count=count#池中是连接对象的列表
self.pool = [self._count("conn-{}".format(x)) for x inrange(self.count)]def_connect(self,conn_name):#创建连接的方法,返回一个名称
returnConn(conn_name)defget_conn(self):#从池中拿走一个连接
if len(self.pool)>0:returnself.pool.pop()defreturn_conn(self,conn:Conn):#向池中添加一个连接
self.pool.append(conn)
真正的连接池的实现比上面的例子要复杂得多,这里只是一个简单的功能的实现。
本例中,get_conn(0方法在多线程的时候有线程安全问题。
假设池中正好有一个连接,有可能多个线程判断池的长度是大于0的,当一个线程拿走了连接对象,其他线程再来pop就会抛异常的,如何解决?
加锁,在读写的地方加锁
使用信号量Semaphore
使用信号量对上例进行修改
importthreadingimportloggingimportrandom
FORMAT= "%(asctime)s %(thread)d %(threadName)s %(message)s"logging.basicConfig(level= logging.INFO,format =FORMAT)classConn:def __init__(self,name):
self.name=namedef __repr__(self):returnself.nameclassPool:def __init__(self,count:int):
self.count=count#池中是连接对象的列表
self.pool = [self._count("conn-{}".format(x)) for x inrange(self.count)]
self.semaphore= threading.Semaphore(count)#threading.Semaphore()
def_connect(self,conn_name):#创建连接的方法,返回一个名称
returnConn(conn_name)defget_conn(self):#从池中拿走一个连接
print("-----------------")
self.semaphore.acquire()print("=====================")
conn=self.pool.pop()returnconndefreturn_conn(self,conn:Conn):#向池中添加一个连接
self.pool.append(conn)
self.semaphore.release()#连接池初始化
pool = Pool(3)defworker(pool:Pool):
conn=pool.get_conn()
logging.info(conn)#模拟使用了一段时间
threading.Event().wait(random.randint(1,4))
pool.return_conn(conn)for i in range(6):
threading.THREAD(target= worker,name = "worker - {}".format(i),args =(pool,)).start()
上例中,使用信号量解决资源有限的问题。
如果池中有资源,请求者获取资源时信号量减1,拿走资源,当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源。
注意:这个例子不能用到生产环境,只是为了说明信号量使用的例子,还有很多未完成功能。
问题
self.conn.append(conn)这一句要不要加锁
1 从程序逻辑上分析
1.1 假设如果还没有使用信号量,就release,会怎么样?
importloggingimportthreading
sema= threading.Semaphore(3)
logging.warning(sema.__dict__)for i in range(3):
sema.acquire()
logging.warning("```````````````````")
logging.warning(sema.__dict__)for i in range(4):
sema.release()
logging.warning(sema.__dict__)for i in range(3):
sema.acquire()
logging.warn("~~~~~~~~~~~~~~")
logging.warning(sema.__dict__)
sema.acquire()
logging.warning("~~~~~~~~~~~~~~~~~")
logging.warning(sema.__dict__)
从上例输出结果可以看出,竟然内置计数器达到了4,这样实际上超出我们的最大值,需要解决这个问题。
BoundedSemaphore类
有界的信号量,不允许使用release超过初始值的范围,否则,抛出valueerror异常。
这样用有界信号量修改源代码,保证如果多return_conn就会抛一长保证了多归还连接抛出异常。如果归还了同一个连接多次怎么办?去重很容易判断出来。
1.2如果使用了信号量,但是还没有用完
self.pool.append(conn)
self.semaphore.release()
假设一种极端情况,计数器还差1就满了,有三个线程A,B,C都执行了第一句,都没有来得及release,这时候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常。
因此信号量,可以保证,一定不能多归还。
1.3很多线程用完了信号量
没有获得信号量的线程都阻塞,没有线程和归还的线程争抢,当append后才release,这时候才能等待的线程被唤醒,才能pop,也就是没有获取信号量就不能pop,这是安全的。
经过上面的分析,信号量比计算列表长度好,线程安全。
信号量和锁
锁,只允许同一个时间一个线程独占资源,它是特殊的信号量,即信号量计数器初值为1。信号量,可以多个线程访问共享资源,但这个共享资源数量有限。
锁,可以看做特殊的信号量。
数据结构和GIL
Queue(线程安全的类)
标准库queue模块,提供FIFO的Queue、LIFO的队列,优先队列。
Queue类是线程安全的,适用于多个线程间安全的交换数据,内部使用了Lock和Condition.
为什么讲魔术方法时,说实现容器的大小,不准确?
如果不加锁,是不可能获得准确大小的,因为你刚读取到了一个大小,还没有取走,就有可能被其他线程给改了。
Queue类的size虽然加了锁,但是,依然不能保证立即get,put就能成功,因为读取大小和get.put方法是分开的。
importqueue
q= queue.Queue(8)if q.qsize() ==7:
q.put()#上下两句可能被打断
if q.qsize() ==1:
q.get()#未必会成功
GIL全局解释器锁
CPython在解释器进程界别有一把锁,叫做GIL全局解释器锁。
GIL保证Cpython进程中,只有一个线程执行字节码,甚至在多核cpu的情况下,也是如此。
cpython中
io密集型,由于线程阻塞,就会调度其他线程;
cpu密集型,当前线程可能会连续的获得GIL,导致其他线程几乎无法使用cpu。
在cpython中,由于有GIL的存在,IO密集型,使用多线程,cpu密集型,使用多进程,绕开GIL.
新版cpython正在努力优化GIL的问题,但不是移除。
如果非要使用多线程的效率问题,请绕行,选择其他语言erlang、go等。
Python中绝大多数内置数据结构的读写都是原子操作,由于GIL的存在,Python的内置数据结构类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是线程安全类型的。
保留GIL的原因:
Guido坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全的,简单的使用Python。而且移除GIL,会降低cpython的单线程的执行效率。
测试下面两个程序
从两段程序测试的结果来看,cpython中多线程根本没有任何优势,和一个线程执行时间相当,因为GIL的存在,尤其像上面的计算密集型程序。
importloggingimportdatetime
logging.basicConfing(level= logging.INFO,format = "%(thread)s %(message)s")
start=datetme.datetime.now()#计算
defcale():
sum=0for _in range(10000000000):
sum+=1cale()
cale()
cale()
cale()
cale()
delta= (datetime.datetime.now -start).total_seconds()
logging.info(delta)
importloggingimportdatetimeimportthreading
logging.basicConfing(level= logging.INFO,format = "%(thread)s %(message)s")
start=datetme.datetime.now()#计算
defcale():
sum=0for _in range(10000000000):
sum+=1t1= threading.Thread(target =cale)
t2= threading.Thread(target =cale)
t3= threading.Thread(target =cale)
t4= threading.Thread(target =cale)
t5= threading.Thread(target =cale)
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
delta= (datetime.datetime.now -start).total_seconds()
logging.info(delta)
python 线程同步_python线程同步(2)相关推荐
- python 线程同步_Python 线程同步
zhoushixiong Python 线程同步 以下代码可以直观展示加锁和不加锁时,对数据修改情况. 加锁时 # -*-* encoding:UTF-8 -*- # author : shoushi ...
- python 判断线程状态_Python线程指南
Python线程指南 本文介绍了Python对于线程的支持,包括"学会"多线程编程需要掌握的基础以及Python两个线程标准库的完整介绍及使用示例. 注意:本文基于Python2. ...
- python线程状态_Python线程
1. 线程基础 1.1. 线程状态 线程有5种状态,状态转换的过程如下图所示: 1.2. 线程同步(锁) 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样).但是当线程需要共享数据时,可能存在 ...
- python 线程退出_python线程退出
广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 如果某线程并未使用很多 io 操作, 它会在自己的时间片内一直占用处理器(和 g ...
- python线程状态_python 线程的五个状态
当程序中包含多个线程时,CPU 不是一直被特定的线程霸占,而是轮流执行各个线程. 那么,CPU 在轮换执行线程的过程中,即从创建到消亡的整个过程,可能会历经 5 种状态,分别是新建.就绪.运行.阻塞和 ...
- python 判断线程状态_Python 线程和进程
前言 学编程,谁没有为线程折腾过啊. 目录 线程与进程 线程与进程是操作系统里面的术语,简单来讲,每一个应用程序都有一个自己的进程. 操作系统会为这些进程分配一些执行资源,例如内存空间等. 在进程中, ...
- python结束线程类_Python线程指南(转)
1. 线程基础 1.1. 线程状态 线程有5种状态,状态转换的过程如下图所示: 1.2. 线程同步(锁) 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样).但是当线程需要共享数据时,可能存在 ...
- python 获取子线程状态_python线程状态
python怎么判断线程的状态 python中如何在父线程中检测其子线程是否处于运行状态 子线程有一个方法 is_alive() 运行时会返回Bool值True python 在线程函数中如何实现线程 ...
- python中gil锁和线程锁_Python线程——GIL锁、线程锁(互斥锁)、递归锁(RLock)...
GIL锁 计算机有4核,代表着同一时间,可以干4个任务.如果单核cpu的话,我启动10个线程,我看上去也是并发的,因为是执行了上下文的切换,让看上去是并发的.但是单核永远肯定时串行的,它肯定是串行 ...
- python 线程池_Python线程池及其原理和使用(超级详细)
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互.在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 线程池在系统启动时即 ...
最新文章
- java内存模型和线程安全
- 技术06期:测试系统软件需要重视哪几点?
- HDOJ 4699-Editor[栈]
- boot入门思想 spring_SpringBoot快速入门
- Python3文件操作详解 Python3文件操作大全
- ML《集成学习(一)Bagging 和 Random Forest》
- observable java_Observable基本用法(RxJava)
- volatile关键字的用法
- 窥探Swift之需要注意的基本运算符和高级运算符
- 快手上市首日涨近161% 两大创始人身家破千亿
- 快速向表中插入大量数据Oracle中append与Nologgin的作用
- python读写文件函数_Python开发【第三篇】:函数读写文件
- 学画画软件app推荐_5岁宝宝画画自学app推荐 快给宝宝找个合适的画画启蒙软件吧...
- shadowmap的原理与实现
- Rational License Key Error的永久解决办法
- fash 3D 游戏
- 《惢客创业日记》2021.08.28-31(周六)一错即否、一善俱荣(三)
- Go语言学习 二十三 错误处理和运行时恐慌(Panic)
- 5000立方米球罐设计
- SAP 资产会计过账-总账科目的获取
热门文章
- Ubuntu18.04报错:make[1]: *** No rule to make target armv4-mont.o, needed by build-msm8916/lk. Stop.
- Mac OS X上使用Wireshark(可用)
- ffmpeg (二):ffmpeg结合SDL2.0解码视频流
- Android Message和obtainMessage的区别
- java 方法_Java 方法 | 菜鸟教程
- ValueError: optimizer got an empty parameter list
- CentOS7安装了nginx后启动本机访问不到
- 如何安装Python3.7,小白必看!
- c语言中数组的变量j是什么,c语言中数组,一般数组
- c语言编写的贪吃蛇代码,刚学C语言,想写一个贪吃蛇的代码