Python并发编程-线程同步(线程安全)

作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

线程同步,线程间协调,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直到该线程完成对数据的操作。

一.Event

1>.Event的常用方法

Event事件,是线程通信机制中最简单的实现,使用一个内部的标记flag,通过flage的True或False的变化来进行操作。

常用方法如下:

set():

标记为True。clear():

标记设置为Flase。

is_set():

查询标记是否为True。wait(timeout=None):

设置等待标记为True的时长,None为无线等待。等到返回True,未等到超时了返回False。

2>.Event使用案例

1 #!/usr/bin/envpython2 #_*_conding:utf-8_*_3 #@author :yinzhengjie4 #blog:http://www.cnblogs.com/yinzhengjie

5

6

7 from threading import Event,Thread8 import logging9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

11 logging.basicConfig(format=FORMAT,level=logging.INFO)12

13 def boss(event:Event):14 logging.info("I'm boss,waiting for U")15 event.wait() #阻塞等待,直到event被标记为Ture16 logging.info("Good Job.")17

18 def worker(event:Event,count=10):19 logging.info("I'm working for U.")20 cups =[]21 while not event.wait(0.5): #使用wait等待0.5秒(相当于"time.sleep(0.5)"),若规定事件内event依旧标记依旧没有设置为True,则返回False22 logging.info("make 1 cup")23 cups.append(1)24 if len(cups) >=count:25 event.set() #将标记设置为True26 break27 logging.info("I finished my job.cups={}".format(cups))28

29 event =Event()30 print(event.is_set()) #event实例的标记默认为False31

32 b = Thread(target=boss,name="boss",args=(event,))33 w = Thread(target=worker,name="worker",args=(event,))34 b.start()35 w.start()

2019-11-23 14:54:53,177 boss 10916 I'm boss,waiting for U

2019-11-23 14:54:53,178 worker 15672 I'm working for U.

False2019-11-23 14:54:53,678 worker 15672 make 1cup2019-11-23 14:54:54,179 worker 15672 make 1cup2019-11-23 14:54:54,680 worker 15672 make 1cup2019-11-23 14:54:55,180 worker 15672 make 1cup2019-11-23 14:54:55,680 worker 15672 make 1cup2019-11-23 14:54:56,181 worker 15672 make 1cup2019-11-23 14:54:56,681 worker 15672 make 1cup2019-11-23 14:54:57,181 worker 15672 make 1cup2019-11-23 14:54:57,681 worker 15672 make 1cup2019-11-23 14:54:58,182 worker 15672 make 1cup2019-11-23 14:54:58,182 worker 15672 I finished my job.cups=[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]2019-11-23 14:54:58,182 boss 10916 Good Job.

以上代码执行结果戳这里

3>.定时器Timer(延迟执行)

1 #!/usr/bin/envpython2 #_*_conding:utf-8_*_3 #@author :yinzhengjie4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 import threading7 import logging8 import time

9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

11 logging.basicConfig(level=logging.INFO,format=FORMAT)12

13 def worker():14 logging.info("working")15 time.sleep(2)16

17 """18 Timer是线程Thread的子类,Timer实例内部提供了一个finished属性,该属性是Event对象。19 Timer是线程Thread的子类,就是线程类,具有线程的能力和特征。20 它的实例时能够延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它。21 cacel方法本质使用Event类实现。这并不是说,线程提供了取消的方法。22 """23 t = threading.Timer(4,worker) #当t对象调用start方法后,等待4秒后执行worker函数24 t.setName("timer")25

26 t.start()27 # t.cancel() #本质上是在worker函数执行前对finished属性set方法操作,从而跳过了worker函数执行,达到了取消的效果。28

29 for _ in range(10):30 print(threading.enumerate())31 time.sleep(1)

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>, ]2019-11-23 15:07:40,987 timer 6656working

[<_MainThread(MainThread, started 5332)>, ]

[<_MainThread(MainThread, started 5332)>]

[<_MainThread(MainThread, started 5332)>]

[<_MainThread(MainThread, started 5332)>]

[<_MainThread(MainThread, started 5332)>]

以上代码执行结果戳这里

二.Lock

1>.Lock的常用方法

锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞。

锁:凡是存在资源共享争抢的地方都可以使用锁,从而只有一个使用者可以完全使用这个资源。

锁常用的方法如下:

acquire(blocking=True,timeout=-1):

默认阻塞,阻塞可以设置超时事件。非阻塞时,timeout禁止设置。

成功获取锁,返回True,否则返回False

release():

释放锁。可以从任何线程调用释放。

已上锁的锁,会被重置为unlocked

若在未上锁的锁上调用,则会抛出RuntimeError异常。

2>.Lock锁使用案例

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 from threading importThread,Lock7 importlogging8 importtime9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

11 logging.basicConfig(level=logging.INFO,format=FORMAT)12

13 cpus =[]14 lock =Lock()15

16 def worker(count=10):17 logging.info("I'm working for U.")18 flag =False19 whileTrue:20 lock.acquire() #获取锁

21 if len(cpus) >=count:22 flag =True23 time.sleep(0.0001) #为了看出线程切换效果

24 if notflag:25 cpus.append(1)26 lock.release()27 ifflag:28 break

29 logging.info("I finished . cups = {}".format(len(cpus)))30

31

32 for _ in range(10):33 Thread(target=worker,args=(1000,)).start()

2019-11-23 16:03:35,225 Thread-1 16204 I'm working for U.

2019-11-23 16:03:35,226 Thread-2 14436 I'm working for U.

2019-11-23 16:03:35,226 Thread-3 1164 I'm working for U.

2019-11-23 16:03:35,226 Thread-4 10460 I'm working for U.

2019-11-23 16:03:35,227 Thread-5 5072 I'm working for U.

2019-11-23 16:03:35,227 Thread-6 12016 I'm working for U.

2019-11-23 16:03:35,227 Thread-7 9732 I'm working for U.

2019-11-23 16:03:35,228 Thread-8 15644 I'm working for U.

2019-11-23 16:03:35,228 Thread-9 104 I'm working for U.

2019-11-23 16:03:35,228 Thread-10 16508 I'm working for U.

2019-11-23 16:03:37,130 Thread-1 16204 I finished . cups = 1000

2019-11-23 16:03:37,132 Thread-3 1164 I finished . cups = 1000

2019-11-23 16:03:37,134 Thread-4 10460 I finished . cups = 1000

2019-11-23 16:03:37,136 Thread-2 14436 I finished . cups = 1000

2019-11-23 16:03:37,138 Thread-5 5072 I finished . cups = 1000

2019-11-23 16:03:37,140 Thread-6 12016 I finished . cups = 1000

2019-11-23 16:03:37,142 Thread-7 9732 I finished . cups = 1000

2019-11-23 16:03:37,144 Thread-8 15644 I finished . cups = 1000

2019-11-23 16:03:37,146 Thread-9 104 I finished . cups = 1000

2019-11-23 16:03:37,148 Thread-10 16508 I finished . cups = 1000

以上代码执行结果戳这里

3>.加锁和解锁(计数器类案例)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 from threading importThread,Lock8 importlogging9 importtime10

11 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

12 logging.basicConfig(level=logging.INFO,format=FORMAT)13

14 classCounter:15 def __init__(self):16 self._val =017 self.__lock =Lock()18

19 @property20 defvalue(self):21 with self.__lock:22 returnself._val23

24 definc(self):25 try:26 self.__lock.acquire()27 self._val += 1

28 finally:29 self.__lock.release()30

31 defdec(self):32 with self.__lock:33 self._val -= 1

34

35

36 def worker(c:Counter,count=100):37 for _ inrange(count):38 for i in range(-50,50):39 if i <0:40 c.dec()41 else:42 c.inc()43

44 c =Counter()45 c1 = 10

46 c2 = 10000

47

48 for i inrange(c1):49 Thread(target=worker,args=(c,c2)).start()50

51

52 whileTrue:53 time.sleep(1)54 print(threading.enumerate())55 if threading.active_count() == 1:56 print((c.value))57 break

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , , , , ]

[<_MainThread(MainThread, started 14856)>, , , , , , ]

[<_MainThread(MainThread, started 14856)>]

0

以上代码执行结果戳这里

4>.锁的应用场景

锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。

如果全部都是读取同一个共享资源需要锁吗?

不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁

使用锁的注意事项:

少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行

举例,高速公路上车并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。

加锁时间越短越好,不需要就立即释放锁

一定要避免死锁

不使用锁,有了效率,但是结果是错的。

使用了锁,效率低下,但是结果是对的。

所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算吧。

5>.非阻塞锁使用

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 importlogging8 importtime9

10 FORMAT = "%(asctime)s %(threadName)s %(thread)-10d %(message)s"

11 logging.basicConfig(level=logging.INFO,format=FORMAT)12

13 lock =threading.Lock()14

15 defworker(l:threading.Lock):16 whileTrue:17 flag =l.acquire(False)18 ifflag:19 logging.info("do something.") #为了显示效果,没有释放锁

20 else:21 logging.info("try again")22 time.sleep(1)23

24 for i in range(5):25 threading.Thread(target=worker,name="worker={}".format(i),args=(lock,)).start()

2019-11-24 15:58:31,932 worker=0 123145354420224do something.2019-11-24 15:58:31,933 worker=0 123145354420224 tryagain2019-11-24 15:58:31,933 worker=1 123145359675392 tryagain2019-11-24 15:58:31,933 worker=2 123145364930560 tryagain2019-11-24 15:58:31,934 worker=3 123145370185728 tryagain2019-11-24 15:58:31,934 worker=4 123145375440896 tryagain2019-11-24 15:58:32,933 worker=0 123145354420224 tryagain2019-11-24 15:58:32,933 worker=1 123145359675392 tryagain2019-11-24 15:58:32,934 worker=2 123145364930560 tryagain2019-11-24 15:58:32,936 worker=4 123145375440896 tryagain2019-11-24 15:58:32,936 worker=3 123145370185728 tryagain2019-11-24 15:58:33,935 worker=0 123145354420224 tryagain2019-11-24 15:58:33,935 worker=1 123145359675392 tryagain2019-11-24 15:58:33,935 worker=2 123145364930560 tryagain2019-11-24 15:58:33,940 worker=4 123145375440896 tryagain2019-11-24 15:58:33,940 worker=3 123145370185728 tryagain2019-11-24 15:58:34,939 worker=0 123145354420224 tryagain2019-11-24 15:58:34,940 worker=1 123145359675392 tryagain2019-11-24 15:58:34,940 worker=2 123145364930560 tryagain2019-11-24 15:58:34,944 worker=4 123145375440896 tryagain2019-11-24 15:58:34,945 worker=3 123145370185728 tryagain2019-11-24 15:58:35,943 worker=1 123145359675392 tryagain2019-11-24 15:58:35,944 worker=0 123145354420224 tryagain2019-11-24 15:58:35,944 worker=2 123145364930560 tryagain2019-11-24 15:58:35,948 worker=4 123145375440896 tryagain2019-11-24 15:58:35,949 worker=3 123145370185728 tryagain

......

以上代码执行结果戳这里

三.可重入锁RLock

1>.可重入锁不可跨越线程

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 importtime8

9 lock =threading.RLock()10 print(lock.acquire()) #仅对当前线程上锁,但是代码并不会阻塞而是可以继续执行

11 print(lock.acquire(blocking=True))12 print(lock.acquire(timeout=3)) #默认"blocking=True",因此可以设置值阻塞的超时时间,但当blocking=False时,timeout无法使用。

13 print(lock.acquire(blocking=False))14

15 print("main thread {}".format(threading.main_thread().ident))16 print("lock in main thread {}".format(lock))17

18 print("{0} 我是分割线 {0}".format("*" * 15))19

20 lock.release()21 lock.release()22 lock.release()23 lock.release()24 #lock.release() #由于上面锁定的lock调用了4此锁定,因此解锁也只能是4次,若解锁次数超过上锁次数则抛出"RuntimeError: cannot release un-acquired lock"异常。

25

26 print("main thread {}".format(threading.main_thread().ident))27 print("lock in main thread {}".format(lock))28

29 print("{0} 我是分割线 {0}".format("*" * 15))30

31 print(lock.acquire(blocking=False))32 print(lock.acquire(blocking=False))33 print("main thread {}".format(threading.main_thread().ident))34 print("lock in main thread {}".format(lock))35

36 #threading.Thread(target=lambda l:l.release(),args=(lock,)).start() #可重入锁不可跨越线程,否则会抛出"RuntimeError: cannot release un-acquired lock"异常。

37 lock.release() #可重入锁无论是上锁还是解锁要求在同一个线程中。

38

39 time.sleep(3)40 print("main thread {}".format(threading.main_thread().ident))41 print("lock in main thread {}".format(lock))

True

True

True

True

main thread18096lockin main thread

*************** 我是分割线 ***************main thread18096lockin main thread

*************** 我是分割线 ***************True

True

main thread18096lockin main thread main thread18096lockin main thread

以上代码执行结果戳这里

2>.为另一个线程传入同一个RLock对象

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importthreading7 importtime8

9 lock =threading.RLock()10

11 defsub(l:threading.RLock):12 print("{}:{}".format(threading.current_thread(),l.acquire())) #阻塞

13 print("{}:{}".format(threading.current_thread(),l.acquire()))14 print("lock in sub thread {}".format(lock))15 l.release()16 print("release in sub 1")17 print("lock in sub thread {}".format(lock))18 l.release()19 print("release in sub 2")20 print("lock in sub thread {}".format(lock))21

22

23 print(lock.acquire())24 print("main thread {}".format(threading.main_thread().ident))25 print("lock in main thread {}".format(lock))26

27 print("{0} 我是分割线 {0}".format("*" * 15))28

29 threading.Timer(2,sub,(lock,)).start() #为另一个线程传入同一个lock对象

30

31 print("in main thread, {}".format(lock.acquire()))32 lock.release()33 time.sleep(5)34 print("release lock in main thread =======",end="\n\n")35 lock.release()36 print("lock in main thread {}".format(lock))

True

main thread2456lockin main thread

*************** 我是分割线 ***************

inmain thread, True

release lockin main thread =======lockin main thread

:True:True

lockin sub thread releasein sub 1lockin sub thread releasein sub 2lockin sub thread

以上代码执行结果戳这里

3>.可重入锁相关总结

可重入锁,是线程相关的锁。可在同一个线程中获取锁,并可以继续在同一线程不阻塞多次获取锁。

当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁。

锁都应该使用完后释放。可重入锁也是锁,应该acquire多少次,就release多少次。

四.Condition

1>.Condition常用方法

Condition(lock=None):

可传入一个Lock或者RLock对象,默认是RLock。

acquire(*args):

获取锁。

wait(self,timeout=None):

等待或超时。

notify(n=1):

唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作。

notify_all():

唤醒所有等待的线程。

2>.生产者消费者模型

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importlogging7 from threading importEvent,Thread,Condition8 importtime9 importrandom10

11 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"

12 logging.basicConfig(format=FORMAT,level=logging.INFO)13

14 classDispachter:15 def __init__(self):16 self.data =None17 self.event = Event() #event只是为了使用方便,与逻辑无关

18 self.cond =Condition()19

20 defproduce(self,total):21 for _ inrange(total):22 data = random.randint(1,100)23 with self.cond:24 logging.info(data)25 self.data =data26 #self.cond.notify_all()

27 self.cond.notify(2)28 self.event.wait(1)29

30 defconsume(self):31 while notself.event.is_set():32 with self.cond:33 self.cond.wait()34 data =self.data35 logging.info("recieved {}".format(data))36 self.data =None37 self.event.wait(0.5)38

39 d =Dispachter()40 p = Thread(target=d.produce,name="producer",args=(10,))41

42 #增加消费者

43 for i in range(5):44 c = Thread(target=d.consume,name="consumer")45 c.start()46

47 p.start()

2019-11-25 22:24:45,076 producer 12228 64

2019-11-25 22:24:45,076 consumer 7612 recieved 64

2019-11-25 22:24:45,076 consumer 18400recieved None2019-11-25 22:24:46,077 producer 12228 41

2019-11-25 22:24:46,077 consumer 15008 recieved 41

2019-11-25 22:24:46,078 consumer 16440recieved None2019-11-25 22:24:47,077 producer 12228 98

2019-11-25 22:24:47,077 consumer 14832 recieved 98

2019-11-25 22:24:47,077 consumer 7612recieved None2019-11-25 22:24:48,077 producer 12228 39

2019-11-25 22:24:48,077 consumer 18400 recieved 39

2019-11-25 22:24:48,078 consumer 15008recieved None2019-11-25 22:24:49,078 producer 12228 79

2019-11-25 22:24:49,078 consumer 16440 recieved 79

2019-11-25 22:24:49,078 consumer 14832recieved None2019-11-25 22:24:50,079 producer 12228 39

2019-11-25 22:24:50,079 consumer 7612 recieved 39

2019-11-25 22:24:50,080 consumer 15008recieved None

......

以上代码执行结果戳这里

3>.Condition总结

Condition用于生产者消费者模型中,解决生产者消费者速度匹配的问题。

采用了通知机制,非常有效率。

使用方式:

使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock,最好的方式是使用with上下文。

消费者这wait,等待通知。

生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。

五.semaphore

1>.semaphore常用方法

和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程。换句话说,计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞。

semaphore常用方法如下:

Semaphore(value=1):

构造方法。value小于0,抛ValueError异常

acquire(blocking=True, timeout=None):

获取信号量,计数器减1,获取成功返回True

release():

释放信号量,计数器加1

2>.基本使用案例(存在release方法超界限的问题)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6

7 from threading importThread, Semaphore8 importlogging9 importtime10

11 FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'

12 logging.basicConfig(format=FORMAT, level=logging.INFO)13

14 defworker(s:Semaphore):15 logging.info("in worker thread")16 logging.info(s.acquire())17 logging.info('worker thread over')18

19

20 #定义信号量的个数为3

21 s = Semaphore(3)22 print(s.__dict__)23

24 logging.info(s.acquire()) #获取一把锁之后,"_value"计数器就会减1。

25

26

27 print(s.acquire(),s._value)28

29 print(s.acquire(),s._value)30

31 Thread(target=worker, name="worker",args=(s,)).start()32 time.sleep(2)33 logging.info(s.acquire(False))34 logging.info(s.acquire(timeout=3))35

36 #释放一个

37 logging.info('release one')38 s.release()39 print(s.__dict__) #释放锁后可以被"worker"线程获取

40

41 s.release()42 s.release()43 s.release()44 s.release()45 s.release() #此处我们可以故意多释放几次锁

46

47 print(s.__dict__) #竟然内置计数器"_value"达到了6(也有可能是5,因为worker线程中需要获取一把锁),这样实际上超出我们的最大值,需要解决这个问题。

2019-11-26 09:54:22,345 MainThread 140735817298880True2019-11-26 09:54:22,345 worker 123145401769984 inworker thread

{'_cond': , 0)>, '_value': 3}

True1True 02019-11-26 09:54:24,346 MainThread 140735817298880False2019-11-26 09:54:27,349 MainThread 140735817298880False2019-11-26 09:54:27,349 MainThread 140735817298880release one

{'_cond': , 0)>, '_value': 1}

{'_cond': , 0)>, '_value': 6}2019-11-26 09:54:27,350 worker 123145401769984True2019-11-26 09:54:27,350 worker 123145401769984 worker thread over

以上代码执行结果戳这里

3>.BoundedSemaphore类(有边界的信号量,不允许使用release超出初始化范围,否则,抛出“ValueError: Semaphore released too many times”异常)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6

7 from threading importThread, BoundedSemaphore8 importlogging9 importtime10

11 FORMAT = '%(asctime)s %(threadName)-12s %(thread)-8s %(message)s'

12 logging.basicConfig(format=FORMAT, level=logging.INFO)13

14 defworker(s:BoundedSemaphore):15 logging.info("in worker thread")16 logging.info(s.acquire())17 logging.info('worker thread over')18

19

20 #定义有边界信号量的个数为3

21 s = BoundedSemaphore(3)22 print(s.__dict__)23

24 logging.info(s.acquire()) #获取一把锁之后,"_value"计数器就会减1。

25

26

27 print(s.acquire(),s._value)28

29 print(s.acquire(),s._value)30

31 Thread(target=worker, name="worker",args=(s,)).start()32 time.sleep(2)33 logging.info(s.acquire(False))34 logging.info(s.acquire(timeout=3))35

36 #释放一个

37 logging.info('release one')38 s.release()39 print(s.__dict__) #释放锁后可以被"worker"线程获取

40

41 s.release()42 s.release()43 s.release()44 s.release()45 s.release() #此处我们可以故意多释放几次锁,一旦release超出初始值的范围就抛出异常!

46

47 print(s.__dict__)

{'_cond': , 0)>, '_value': 3, '_initial_value': 3}

True1True 02019-11-26 10:09:17,632 MainThread 140735817298880True2019-11-26 10:09:17,635 worker 123145507561472 inworker thread2019-11-26 10:09:19,638 MainThread 140735817298880False

{'_cond': , 0)>, '_value': 1, '_initial_value': 3}2019-11-26 10:09:22,643 MainThread 140735817298880False2019-11-26 10:09:22,644 MainThread 140735817298880release one

Traceback (most recent call last):

File"/yinzhengjie/python/devops/python基础/09.线程/04.信号量.py", line 43, in

2019-11-26 10:09:22,644 worker 123145507561472True2019-11-26 10:09:22,644 worker 123145507561472worker thread over

s.release()

File"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 483, inreleaseraise ValueError("Semaphore released too many times")

ValueError: Semaphore released too many times

以上代码执行结果戳这里

4>.应用举例(一个简单的连接池)

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importrandom7 importthreading8 importlogging9 importtime10

11 FORMAT = '%(asctime)s %(threadName)s %(thread)-8d %(message)s'

12 logging.basicConfig(level=logging.INFO, format=FORMAT)13

14 classConn:15 def __init__(self, name):16 self.name =name17

18 """

19 连接池20 因为资源有限,且开启一个连接成本高,所以,使用连接池。21

22 一个简单的连接池23 连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用。24

25 使用信号量解决资源有限的问题。26 如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源。27 注意:这个连接池的例子不能用到生成环境,只是为了说明信号量使用的例子,连接池还有很多未完成功能。28 """

29 classPool:30 def __init__(self, count:int):31 self.count =count32 #池中提前放着连接备用

33 self.pool = [self._connect('conn-{}'.format(i)) for i inrange(self.count)]34 self.semaphore =threading.Semaphore(self.count)35

36 def_connect(self, conn_name):37 #创建连接的方法,返回一个连接对象

38 returnConn(conn_name)39

40 defget_conn(self):41 #从池中拿走一个连接

42 logging.info('get~~~~~~~~~~~~~')43 self.semaphore.acquire()44 logging.info('-------------------------')45 returnself.pool.pop()46

47 defreturn_conn(self, conn: Conn):48 #向池中返回一个连接对象

49 logging.info('return~~~~~~~~~~~~~')50 self.pool.append(conn)51 self.semaphore.release()52

53 defworker(pool:Pool):54 conn =pool.get_conn()55 logging.info(conn)56 #模拟使用了一段时间

57 time.sleep(random.randint(1, 5))58 pool.return_conn(conn)59

60 #初始化连接池

61 pool = Pool(3)62

63 for i in range(6):64 threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()

2019-11-26 11:27:58,148 worker-0 123145324670976 get~~~~~~~~~~~~~

2019-11-26 11:27:58,148 worker-0 123145324670976 -------------------------

2019-11-26 11:27:58,148 worker-0 123145324670976 <__main__.Conn object at 0x102db0438>

2019-11-26 11:27:58,149 worker-1 123145329926144 get~~~~~~~~~~~~~

2019-11-26 11:27:58,149 worker-1 123145329926144 -------------------------

2019-11-26 11:27:58,149 worker-1 123145329926144 <__main__.Conn object at 0x102db03c8>

2019-11-26 11:27:58,149 worker-2 123145335181312 get~~~~~~~~~~~~~

2019-11-26 11:27:58,149 worker-2 123145335181312 -------------------------

2019-11-26 11:27:58,150 worker-2 123145335181312 <__main__.Conn object at 0x102db0240>

2019-11-26 11:27:58,150 worker-3 123145340436480 get~~~~~~~~~~~~~

2019-11-26 11:27:58,150 worker-4 123145345691648 get~~~~~~~~~~~~~

2019-11-26 11:27:58,151 worker-5 123145350946816 get~~~~~~~~~~~~~

2019-11-26 11:28:02,153 worker-0 123145324670976 return~~~~~~~~~~~~~

2019-11-26 11:28:02,153 worker-3 123145340436480 -------------------------

2019-11-26 11:28:02,154 worker-3 123145340436480 <__main__.Conn object at 0x102db0438>

2019-11-26 11:28:02,154 worker-1 123145329926144 return~~~~~~~~~~~~~

2019-11-26 11:28:02,154 worker-4 123145345691648 -------------------------

2019-11-26 11:28:02,154 worker-4 123145345691648 <__main__.Conn object at 0x102db03c8>

2019-11-26 11:28:03,154 worker-2 123145335181312 return~~~~~~~~~~~~~

2019-11-26 11:28:03,155 worker-5 123145350946816 -------------------------

2019-11-26 11:28:03,155 worker-5 123145350946816 <__main__.Conn object at 0x102db0240>

2019-11-26 11:28:05,155 worker-4 123145345691648 return~~~~~~~~~~~~~

2019-11-26 11:28:07,154 worker-3 123145340436480 return~~~~~~~~~~~~~

2019-11-26 11:28:08,159 worker-5 123145350946816 return~~~~~~~~~~~~~

以上代码执行结果戳这里

5>.信号量和锁

信号量:

可以多个线程访问共享资源,但这个共享资源数量有限。

锁:

可以看做特殊的信号量,即信号量计数器初值为1。只允许同一个时间一个线程独占资源。

六.Queue的线程安全

1 #!/usr/bin/env python

2 #_*_conding:utf-8_*_

3 #@author :yinzhengjie

4 #blog:http://www.cnblogs.com/yinzhengjie

5

6 importqueue7

8

9 """

10 标准库queue模块,提供FIFO的Queue、LIFO的队列、优先队列。11

12 Queue类是线程安全的,适用于多线程间安全的交换数据。内部使用了Lock和Condition。13

14 为什么讲魔术方法时,说实现容器的大小,不准确?15 1>.如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走数据,就有可能被其他线程改了。16 2>.Queue类的size虽然加了锁,但是,依然不能保证立即get、put就能成功,因为读取大小和get、put方法是分开的。17 """

18

19 q = queue.Queue(8)20

21 if q.qsize() == 7:22 q.put("abc") #上下两句可能被打断

23

24 if q.qsize() == 1:25 q.get() #未必会成功

python 线程同步_Python并发编程-线程同步(线程安全)相关推荐

  1. python线程唤醒_Python 并发编程(一)之线程

    常用用法 t.is_alive() Python中线程会在一个单独的系统级别线程中执行(比如一个POSIX线程或者一个Windows线程) 这些线程将由操作系统来全权管理.线程一旦启动,将独立执行直到 ...

  2. python锁机制_Python并发编程之谈谈线程中的“锁机制”(三)

    大家好,并发编程 进入第三篇. 今天我们来讲讲,线程里的锁机制. 本文目录 何为Lock( 锁 )?如何使用Lock( 锁 )?为何要使用锁?可重入锁(RLock)防止死锁的加锁机制饱受争议的GIL( ...

  3. python多线程调度_python并发编程之进程、线程、协程的调度原理(六)

    进程.线程和协程的调度和运行原理总结. 系列文章 进程.线程的调度策略介绍 linux中的进程主要有三种调度策略: 优先级调度:将进程分为普通进程和实时进程: 先进先出(队列)调度:实时进程先创建的先 ...

  4. python 消息机制_Python并发编程之线程消息通信机制任务协调(四)

    . 前言 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了. 可是要知道,在真实的项目中,实际场景可要我们举 ...

  5. python3 线程隔离_Python并发编程之线程中的信息隔离(五)

    大家好,并发编程 进入第三篇. 上班第一天,大家应该比较忙吧.小明也是呢,所以今天的内容也很少.只要几分钟就能学完. 昨天我们说,线程与线程之间要通过消息通信来控制程序的执行. 讲完了消息通信,今天就 ...

  6. python并发处理机制_Python并发编程—同步互斥

    同步互斥 线程间通信方法 1.通信方法:线程间使用全局变量进行通信 2.共享资源争夺 共享资源:多个进程或者线程都可以操作的资源称为共享资源.对共享资源的操作代码段称为临界区. 影响 : 对共享资源的 ...

  7. java 5 线程 睡眠,Java并发编程实例--5.线程睡眠

    有时候我们需要让线程在一段时间内不做任何事.例如某线程每个一小时检测一下传感器,剩余的时间不做任何事. 我们可以使用sleep()方法使线程睡眠,此期间不占用计算机资源. 这个方法接受一个整数表示睡眠 ...

  8. 学习笔记(33):Python网络编程并发编程-进程池线程池

    立即学习:https://edu.csdn.net/course/play/24458/296451?utm_source=blogtoedu 进程池与线程池: 一般应用在网站上,进程池或线程池最大的 ...

  9. Java并发编程 synchronized保证线程安全的原理

    文章转载致博客 blog.csdn.net/javazejian/- 自己稍加完善. 线程安全是并发编程中的重要关注点,应该注意到的是,造成线程安全问题的主要诱因有两点,一是存在共享数据(也称临界资源 ...

最新文章

  1. $(function(){})、$(document).ready(function(){})....../ ready和onload的区别
  2. Python符号计算库sympy使用笔记
  3. python里unexpected eof while parsing_使用Python编程时的10个注意事项
  4. 全双工和半双工的区别
  5. 罗森伯格成功布线中国海关博物馆
  6. 菜鸟学Java(七)——Ajax+Servlet实现无刷新下拉联动
  7. linux驱动之I2C
  8. 跟我学Java(配光盘)(跟我学)
  9. start()和run()的区别
  10. python需要什么包装_python学习之包装与授权
  11. 为php-fpm安装pdo pgsql驱动支持
  12. javascript基础知识(3) 基本语法
  13. 达梦数据库的表空间及用户管理
  14. 终端应用变身文件 MD5/SHA1 校验工具
  15. 【飞飞CMS二次开发实录】开篇:安装与运行
  16. vue提示Named Route ‘News‘ has a default child route. When navigating to this named route...问题
  17. 推荐几个不错的美术游戏资源
  18. 11.23 夯实的django基础
  19. CSS基础知识点大全
  20. 关于AUI框架自学心得

热门文章

  1. 达摩java_JAVA面向对象
  2. 【杂项】原来有两种单引号(单引号和反引号)
  3. 【网址收藏】PowerShell因为在此系统中禁止执行脚本的解决方法
  4. helm部署hadoop报错解决方法
  5. golang管道channel的遍历和关闭:应该使用for...range来遍历
  6. nginx https配置
  7. Docker镜像分层和临时容器
  8. Scala中的四种访问权限
  9. Solr的安装步骤及增删改查代码示例
  10. springboot使用Map接收请求参数