Barrier

有人翻译成栅栏,建议使用屏障,可以想象成路障,道闸。

3.2引入python的新功能。

Barrier(parties,action  = None,timeout = None):构建Barrier对象,指定参与方数目,timeout是wait方法未指定超时的默认值。

n_waiting:当前在屏障中等待的线程数。

parties:各方数,就是需要多少个等待。

wait(timeout = None):等待通过屏障,返回0到线程数减1的整数,每个线程返回不同。如果wait方法设置了超时,并超时发送,屏障将处于broken状态。

Barrier实例

importthreadingimportloggingimportrandom

FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(barrier:threading.Barrier):

logging.info("waiting for {} threads.".format(barrier.n_waiting))try:

barrier_id=barrier.wait()

logging.info("after barrier {}".format(barrier_id))exceptthreading.BrokenBarrierError:

logging.info("broken Barrier")

barrier= threading.Barrier(3)#3是参与方,partiesfor x in range(3):#改成4,5,6试一试

threading.Thread(target=worker,name="worker-{}".format(x),args=(barrier,)).start()

logging.info("started")

结果为:2019-11-26 17:06:48,333 [ worker-0,(thread)8d] waiting for0 threads.2019-11-26 17:06:48,333 [ worker-1,(thread)8d] waiting for 1threads.2019-11-26 17:06:48,334 [ worker-2,(thread)8d] waiting for 2threads.2019-11-26 17:06:48,334 [ worker-2,(thread)8d] after barrier 2

2019-11-26 17:06:48,334[ MainThread,(thread)8d] started2019-11-26 17:06:48,334 [ worker-1,(thread)8d] after barrier 1

2019-11-26 17:06:48,334 [ worker-0,(thread)8d] after barrier 0

当换成4,5的时候,上面的结果一样,但是程序还没有执行完,一直在等待。当为6的时候,三个一波刚好结束。

从运行结果来看:

所有线程冲到了Barrier前等待,直到到达parties的数目,屏障打开,所有线程停止等待,继续执行。

再有线程wait,屏障就绪等到到达参数方数目。

举例,赛马比赛所有马匹就位,开闸,下一批马匹陆续来到闸门前等待比赛。

broken:如果屏障处于打破的状态,返回True。

abort():将屏障置于broken状态,等待中的线程或者调用等待方法的线程中都会抛出BrokenBarrierError异常,直到reset方法来恢复屏障。

reset():恢复屏障,重新开始拦截。

importthreadingimportloggingimportrandom

FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(barrier: threading.Barrier):

logging.info("waiting for {} threads.".format(barrier.n_waiting))try:

barrier_id=barrier.wait()

logging.info("after barrier {}".format(barrier_id))exceptthreading.BrokenBarrierError:

logging.info("broken Barrier .run.")

barrier= threading.Barrier(3)for x in range(0,9):if x ==2:

barrier.abort()elif x==6:

barrier.reset()

threading.Event().wait(1)

threading.Thread(target=worker, name="worker-{}".format(x), args=(barrier,)).start()

logging.info("started")

结果为:2019-11-26 17:21:41,398 [ worker-0,(thread)8d] waiting for0 threads.2019-11-26 17:21:42,410 [ worker-1,(thread)8d] waiting for 1threads.2019-11-26 17:21:42,411 [ worker-1,(thread)8d] broken Barrier .run.2019-11-26 17:21:42,411 [ worker-0,(thread)8d] broken Barrier .run.2019-11-26 17:21:43,423 [ worker-2,(thread)8d] waiting for0 threads.2019-11-26 17:21:43,424 [ worker-2,(thread)8d] broken Barrier .run.2019-11-26 17:21:44,439 [ worker-3,(thread)8d] waiting for0 threads.2019-11-26 17:21:44,439 [ worker-3,(thread)8d] broken Barrier .run.2019-11-26 17:21:45,453 [ worker-4,(thread)8d] waiting for0 threads.2019-11-26 17:21:45,453 [ worker-4,(thread)8d] broken Barrier .run.2019-11-26 17:21:46,465 [ worker-5,(thread)8d] waiting for0 threads.2019-11-26 17:21:46,466 [ worker-5,(thread)8d] broken Barrier .run.2019-11-26 17:21:47,481 [ worker-6,(thread)8d] waiting for0 threads.2019-11-26 17:21:48,495 [ worker-7,(thread)8d] waiting for 1threads.2019-11-26 17:21:49,509 [ worker-8,(thread)8d] waiting for 2threads.2019-11-26 17:21:49,509 [ worker-8,(thread)8d] after barrier 2

2019-11-26 17:21:49,509 [ worker-6,(thread)8d] after barrier 02019-11-26 17:21:49,510[ Main Thread,(thread)8d] started2019-11-26 17:21:49,510 [ worker-7,(thread)8d] after barrier 1

上例中,屏障中等待了2个,屏障就被break了,waiting的线程抛了BrokenBarrierError异常,新wait的线程也抛异常,直到屏障恢复,才继续按照parties数目要求继续拦截线程。

wait方法超时实例

如果wait方法超时发生,屏障将处于broken状态,直到reset。

importthreadingimportloggingimportrandom

FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(barrier: threading.Barrier,i:int):

logging.info("waiting for {} threads.".format(barrier.n_waiting))try:

logging.info(barrier.broken)#是否brokrn

if i<3:

barrier_id= barrier.wait(1)#超时后,屏障broken

else:if i==6:

barrier.reset()#恢复屏障

barrier_id =barrier.wait()

logging.info("after barrier {}".format(barrier_id))exceptthreading.BrokenBarrierError:

logging.info("broken Barrier .run.")

barrier= threading.Barrier(3)for x in range(0,9):

threading.Event().wait(2)

threading.Thread(target=worker, name="worker-{}".format(x), args=(barrier,x)).start()

结果为:2019-11-26 17:30:06,811 [ worker-0,(thread)8d] waiting for0 threads.2019-11-26 17:30:06,811 [ worker-0,(thread)8d] False2019-11-26 17:30:07,822 [ worker-0,(thread)8d] broken Barrier .run.2019-11-26 17:30:08,822 [ worker-1,(thread)8d] waiting for0 threads.2019-11-26 17:30:08,822 [ worker-1,(thread)8d] True2019-11-26 17:30:08,823 [ worker-1,(thread)8d] broken Barrier .run.2019-11-26 17:30:10,835 [ worker-2,(thread)8d] waiting for0 threads.2019-11-26 17:30:10,835 [ worker-2,(thread)8d] True2019-11-26 17:30:10,835 [ worker-2,(thread)8d] broken Barrier .run.2019-11-26 17:30:12,849 [ worker-3,(thread)8d] waiting for0 threads.2019-11-26 17:30:12,849 [ worker-3,(thread)8d] True2019-11-26 17:30:12,849 [ worker-3,(thread)8d] broken Barrier .run.2019-11-26 17:30:14,859 [ worker-4,(thread)8d] waiting for0 threads.2019-11-26 17:30:14,859 [ worker-4,(thread)8d] True2019-11-26 17:30:14,859 [ worker-4,(thread)8d] broken Barrier .run.2019-11-26 17:30:16,870 [ worker-5,(thread)8d] waiting for0 threads.2019-11-26 17:30:16,870 [ worker-5,(thread)8d] True2019-11-26 17:30:16,871 [ worker-5,(thread)8d] broken Barrier .run.2019-11-26 17:30:18,885 [ worker-6,(thread)8d] waiting for0 threads.2019-11-26 17:30:18,885 [ worker-6,(thread)8d] True2019-11-26 17:30:20,895 [ worker-7,(thread)8d] waiting for 1threads.2019-11-26 17:30:20,895 [ worker-7,(thread)8d] False2019-11-26 17:30:22,909 [ worker-8,(thread)8d] waiting for 2threads.2019-11-26 17:30:22,909 [ worker-8,(thread)8d] False2019-11-26 17:30:22,909 [ worker-8,(thread)8d] after barrier 2

2019-11-26 17:30:22,910 [ worker-7,(thread)8d] after barrier 1

2019-11-26 17:30:22,911 [ worker-6,(thread)8d] after barrier 0

Barrier应用

并发初始化

所有线程必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能正常工作。

10个线程做10种工作准备,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作。先完成的要等待后完成的线程。

例如,启动一个程序,需要先加载磁盘文件,缓存预热,初始化连接池等工作,这些工作可以启动并进,不过只有都满足了,程序才能继续向后执行,假设数据库连接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出。

工作量

有10个计算任务,完成6个,就算工作完成。

semaphore信号量

和lock很像, 信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计算为0就阻塞请求的线程,直到其他线程对信号量release后,计数大于0,恢复阻塞的线程。

Semaphore(value = 1):构造方法,value小于0,抛valueerror异常。

acquire(blocking= TRUE,timeout = None):获取信号量,计数器减1,获取成功返回True.

release():释放信号量,计数器加1.

计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。

importthreadingimportloggingimporttime

FORMAT= "%(asctime)-15s\t [ %(threadName)s,(thread)8d] %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)defworker(s:threading.Semaphore):

logging.info("in sub thread")

logging.info(s.acquire())#阻塞

logging.info("sub thread over")#信号量

s = threading.Semaphore(3)

logging.info(s.acquire())print(s._value)

logging.info(s.acquire())print(s._value)

logging.info(s.acquire())print(s._value)

threading.Thread(target=worker,args=(s,)).start()

time.sleep(2)

logging.info(s.acquire(False))

logging.info(s.acquire(timeout= 3))#阻塞3秒

#释放

logging.info("released")

s.release()

结果为:2019-11-26 17:45:49,188[ MainThread,(thread)8d] True2

2019-11-26 17:45:49,188[ MainThread,(thread)8d] True2019-11-26 17:45:49,188[ MainThread,(thread)8d] True2019-11-26 17:45:49,188 [ Thread-1,(thread)8d] insub thread102019-11-26 17:45:51,188[ MainThread,(thread)8d] False2019-11-26 17:45:54,189[ MainThread,(thread)8d] False2019-11-26 17:45:54,189[ MainThread,(thread)8d] released2019-11-26 17:45:54,190 [ Thread-1,(thread)8d] True2019-11-26 17:45:54,190 [ Thread-1,(thread)8d] sub thread over

应用举例

连接池

因为资源有限,且开启一个连接成本高,所以,使用连接池。

一个简单的连接池

连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。

classConn:def __init__(self,name):

self.name=nameclassPool:def __init__(self,count:int):

self.count=count#池中是连接对象的列表

self.pool = [self._count("conn-{}".format(x)) for x inrange(self.count)]def_connect(self,conn_name):#创建连接的方法,返回一个名称

returnConn(conn_name)defget_conn(self):#从池中拿走一个连接

if len(self.pool)>0:returnself.pool.pop()defreturn_conn(self,conn:Conn):#向池中添加一个连接

self.pool.append(conn)

真正的连接池的实现比上面的例子要复杂得多,这里只是一个简单的功能的实现。

本例中,get_conn(0方法在多线程的时候有线程安全问题。

假设池中正好有一个连接,有可能多个线程判断池的长度是大于0的,当一个线程拿走了连接对象,其他线程再来pop就会抛异常的,如何解决?

加锁,在读写的地方加锁

使用信号量Semaphore

使用信号量对上例进行修改

importthreadingimportloggingimportrandom

FORMAT= "%(asctime)s %(thread)d %(threadName)s %(message)s"logging.basicConfig(level= logging.INFO,format =FORMAT)classConn:def __init__(self,name):

self.name=namedef __repr__(self):returnself.nameclassPool:def __init__(self,count:int):

self.count=count#池中是连接对象的列表

self.pool = [self._count("conn-{}".format(x)) for x inrange(self.count)]

self.semaphore= threading.Semaphore(count)#threading.Semaphore()

def_connect(self,conn_name):#创建连接的方法,返回一个名称

returnConn(conn_name)defget_conn(self):#从池中拿走一个连接

print("-----------------")

self.semaphore.acquire()print("=====================")

conn=self.pool.pop()returnconndefreturn_conn(self,conn:Conn):#向池中添加一个连接

self.pool.append(conn)

self.semaphore.release()#连接池初始化

pool = Pool(3)defworker(pool:Pool):

conn=pool.get_conn()

logging.info(conn)#模拟使用了一段时间

threading.Event().wait(random.randint(1,4))

pool.return_conn(conn)for i in range(6):

threading.THREAD(target= worker,name = "worker - {}".format(i),args =(pool,)).start()

上例中,使用信号量解决资源有限的问题。

如果池中有资源,请求者获取资源时信号量减1,拿走资源,当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源。

注意:这个例子不能用到生产环境,只是为了说明信号量使用的例子,还有很多未完成功能。

问题

self.conn.append(conn)这一句要不要加锁

1 从程序逻辑上分析

1.1 假设如果还没有使用信号量,就release,会怎么样?

importloggingimportthreading

sema= threading.Semaphore(3)

logging.warning(sema.__dict__)for i in range(3):

sema.acquire()

logging.warning("```````````````````")

logging.warning(sema.__dict__)for i in range(4):

sema.release()

logging.warning(sema.__dict__)for i in range(3):

sema.acquire()

logging.warn("~~~~~~~~~~~~~~")

logging.warning(sema.__dict__)

sema.acquire()

logging.warning("~~~~~~~~~~~~~~~~~")

logging.warning(sema.__dict__)

从上例输出结果可以看出,竟然内置计数器达到了4,这样实际上超出我们的最大值,需要解决这个问题。

BoundedSemaphore类

有界的信号量,不允许使用release超过初始值的范围,否则,抛出valueerror异常。

这样用有界信号量修改源代码,保证如果多return_conn就会抛一长保证了多归还连接抛出异常。如果归还了同一个连接多次怎么办?去重很容易判断出来。

1.2如果使用了信号量,但是还没有用完

self.pool.append(conn)

self.semaphore.release()

假设一种极端情况,计数器还差1就满了,有三个线程A,B,C都执行了第一句,都没有来得及release,这时候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常。

因此信号量,可以保证,一定不能多归还。

1.3很多线程用完了信号量

没有获得信号量的线程都阻塞,没有线程和归还的线程争抢,当append后才release,这时候才能等待的线程被唤醒,才能pop,也就是没有获取信号量就不能pop,这是安全的。

经过上面的分析,信号量比计算列表长度好,线程安全。

信号量和锁

锁,只允许同一个时间一个线程独占资源,它是特殊的信号量,即信号量计数器初值为1。信号量,可以多个线程访问共享资源,但这个共享资源数量有限。

锁,可以看做特殊的信号量。

数据结构和GIL

Queue(线程安全的类)

标准库queue模块,提供FIFO的Queue、LIFO的队列,优先队列。

Queue类是线程安全的,适用于多个线程间安全的交换数据,内部使用了Lock和Condition.

为什么讲魔术方法时,说实现容器的大小,不准确?

如果不加锁,是不可能获得准确大小的,因为你刚读取到了一个大小,还没有取走,就有可能被其他线程给改了。

Queue类的size虽然加了锁,但是,依然不能保证立即get,put就能成功,因为读取大小和get.put方法是分开的。

importqueue

q= queue.Queue(8)if q.qsize() ==7:

q.put()#上下两句可能被打断

if q.qsize() ==1:

q.get()#未必会成功

GIL全局解释器锁

CPython在解释器进程界别有一把锁,叫做GIL全局解释器锁。

GIL保证Cpython进程中,只有一个线程执行字节码,甚至在多核cpu的情况下,也是如此。

cpython中

io密集型,由于线程阻塞,就会调度其他线程;

cpu密集型,当前线程可能会连续的获得GIL,导致其他线程几乎无法使用cpu。

在cpython中,由于有GIL的存在,IO密集型,使用多线程,cpu密集型,使用多进程,绕开GIL.

新版cpython正在努力优化GIL的问题,但不是移除。

如果非要使用多线程的效率问题,请绕行,选择其他语言erlang、go等。

Python中绝大多数内置数据结构的读写都是原子操作,由于GIL的存在,Python的内置数据结构类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是线程安全类型的。

保留GIL的原因:

Guido坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全的,简单的使用Python。而且移除GIL,会降低cpython的单线程的执行效率。

测试下面两个程序

从两段程序测试的结果来看,cpython中多线程根本没有任何优势,和一个线程执行时间相当,因为GIL的存在,尤其像上面的计算密集型程序。

importloggingimportdatetime

logging.basicConfing(level= logging.INFO,format = "%(thread)s %(message)s")

start=datetme.datetime.now()#计算

defcale():

sum=0for _in range(10000000000):

sum+=1cale()

cale()

cale()

cale()

cale()

delta= (datetime.datetime.now -start).total_seconds()

logging.info(delta)

importloggingimportdatetimeimportthreading

logging.basicConfing(level= logging.INFO,format = "%(thread)s %(message)s")

start=datetme.datetime.now()#计算

defcale():

sum=0for _in range(10000000000):

sum+=1t1= threading.Thread(target =cale)

t2= threading.Thread(target =cale)

t3= threading.Thread(target =cale)

t4= threading.Thread(target =cale)

t5= threading.Thread(target =cale)

t1.start()

t2.start()

t3.start()

t4.start()

t5.start()

t1.start()

t2.start()

t3.start()

t4.start()

t5.start()

delta= (datetime.datetime.now -start).total_seconds()

logging.info(delta)

python 线程同步_python线程同步(2)相关推荐

  1. python 线程同步_Python 线程同步

    zhoushixiong Python 线程同步 以下代码可以直观展示加锁和不加锁时,对数据修改情况. 加锁时 # -*-* encoding:UTF-8 -*- # author : shoushi ...

  2. python 判断线程状态_Python线程指南

    Python线程指南 本文介绍了Python对于线程的支持,包括"学会"多线程编程需要掌握的基础以及Python两个线程标准库的完整介绍及使用示例. 注意:本文基于Python2. ...

  3. python线程状态_Python线程

    1. 线程基础 1.1. 线程状态 线程有5种状态,状态转换的过程如下图所示: 1.2. 线程同步(锁) 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样).但是当线程需要共享数据时,可能存在 ...

  4. python 线程退出_python线程退出

    广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 如果某线程并未使用很多 io 操作, 它会在自己的时间片内一直占用处理器(和 g ...

  5. python线程状态_python 线程的五个状态

    当程序中包含多个线程时,CPU 不是一直被特定的线程霸占,而是轮流执行各个线程. 那么,CPU 在轮换执行线程的过程中,即从创建到消亡的整个过程,可能会历经 5 种状态,分别是新建.就绪.运行.阻塞和 ...

  6. python 判断线程状态_Python 线程和进程

    前言 学编程,谁没有为线程折腾过啊. 目录 线程与进程 线程与进程是操作系统里面的术语,简单来讲,每一个应用程序都有一个自己的进程. 操作系统会为这些进程分配一些执行资源,例如内存空间等. 在进程中, ...

  7. python结束线程类_Python线程指南(转)

    1. 线程基础 1.1. 线程状态 线程有5种状态,状态转换的过程如下图所示: 1.2. 线程同步(锁) 多线程的优势在于可以同时运行多个任务(至少感觉起来是这样).但是当线程需要共享数据时,可能存在 ...

  8. python 获取子线程状态_python线程状态

    python怎么判断线程的状态 python中如何在父线程中检测其子线程是否处于运行状态 子线程有一个方法 is_alive() 运行时会返回Bool值True python 在线程函数中如何实现线程 ...

  9. python中gil锁和线程锁_Python线程——GIL锁、线程锁(互斥锁)、递归锁(RLock)...

    GIL锁 ​ 计算机有4核,代表着同一时间,可以干4个任务.如果单核cpu的话,我启动10个线程,我看上去也是并发的,因为是执行了上下文的切换,让看上去是并发的.但是单核永远肯定时串行的,它肯定是串行 ...

  10. python 线程池_Python线程池及其原理和使用(超级详细)

    系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互.在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 线程池在系统启动时即 ...

最新文章

  1. java内存模型和线程安全
  2. 技术06期:测试系统软件需要重视哪几点?
  3. HDOJ 4699-Editor[栈]
  4. boot入门思想 spring_SpringBoot快速入门
  5. Python3文件操作详解 Python3文件操作大全
  6. ML《集成学习(一)Bagging 和 Random Forest》
  7. observable java_Observable基本用法(RxJava)
  8. volatile关键字的用法
  9. 窥探Swift之需要注意的基本运算符和高级运算符
  10. 快手上市首日涨近161% 两大创始人身家破千亿
  11. 快速向表中插入大量数据Oracle中append与Nologgin的作用
  12. python读写文件函数_Python开发【第三篇】:函数读写文件
  13. 学画画软件app推荐_5岁宝宝画画自学app推荐 快给宝宝找个合适的画画启蒙软件吧...
  14. shadowmap的原理与实现
  15. Rational License Key Error的永久解决办法
  16. fash 3D 游戏
  17. 《惢客创业日记》2021.08.28-31(周六)一错即否、一善俱荣(三)
  18. Go语言学习 二十三 错误处理和运行时恐慌(Panic)
  19. 5000立方米球罐设计
  20. SAP 资产会计过账-总账科目的获取

热门文章

  1. Ubuntu18.04报错:make[1]: *** No rule to make target armv4-mont.o, needed by build-msm8916/lk. Stop.
  2. Mac OS X上使用Wireshark(可用)
  3. ffmpeg (二):ffmpeg结合SDL2.0解码视频流
  4. Android Message和obtainMessage的区别
  5. java 方法_Java 方法 | 菜鸟教程
  6. ValueError: optimizer got an empty parameter list
  7. CentOS7安装了nginx后启动本机访问不到
  8. 如何安装Python3.7,小白必看!
  9. c语言中数组的变量j是什么,c语言中数组,一般数组
  10. c语言编写的贪吃蛇代码,刚学C语言,想写一个贪吃蛇的代码