多线程基础概念

并行与并发

并行:同时处理多个任务,必须在多核环境下

一段时间内同时处理多个任务,单核也可以并发

并发手段

线程:内核空间的调度

进程:内核空间的调度

协程:用户空间的调度

线程可以允许程序在同一进程空间中并发运行多个操作。本次主要介绍Python标准库中的多线程模块threading。

threading模块

线程初始化

使用threading模块的Thread类初始化对象然后调用start方法启动线程。

import threading

import time

def worker(num):

time.sleep(1)

print('worker-{}'.format(num))

# 创建线程对象 target参数是一个函数, 这个函数即线程要执行的逻辑

threads = [threading.Thread(target=worker, args=(i, ))for i in range(5)]

for t in threads:

t.start()

# start 方法启动一个线程, 当这个线程的逻辑执行完毕的时候,线程自动退出, Python 没有提供主动退出线程的方法

# 输出以下结果

worker-0worker-1worker-2worker-3

worker-4

初始化的五个线程的执行逻辑中的print方法打印字符串及换行符出现了随机分布,即出现了资源竞争。

给线程传递参数

import threading

import time

def worker(*args, **kwargs):

time.sleep(1)

print(args)

print(kwargs)

threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()

# 输出

(1, 2, 3)

{'a': 'b'}

args传递位置参数,kwargs传递关键字参数。

Thread常用参数和方法

>>> help(threading.Thread)

可以看到Thread函数的初始化方法中的参数如下:

| __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)

| This constructor should always be called with keyword arguments. Arguments are:

|

| *group* should be None; reserved for future extension when a ThreadGroup

| class is implemented.

|

| *target* is the callable object to be invoked by the run()

| method. Defaults to None, meaning nothing is called.

|

| *name* is the thread name. By default, a unique name is constructed of

| the form "Thread-N" where N is a small decimal number.

|

| *args* is the argument tuple for the target invocation. Defaults to ().

|

| *kwargs* is a dictionary of keyword arguments for the target

| invocation. Defaults to {}.

name

表示线程名称,默认情况下,线程名称是Thread-N,N是一个较小的十进制数。我们可以传递name参数,控制线程名称。

以下会导入logging模块来显示线程的名称等详细信息

import threading

import time

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(num):

time.sleep(1)

logging.info('worker-{}'.format(num))

threads = [threading.Thread(target=worker, args=(i, ), name='workerthread-{}'.format(i)) for i in range(5)]

for t in threads:

t.start()

# 输出

2017-03-20 21:39:29,339 INFO [workerthread-0] worker-0

2017-03-20 21:39:29,340 INFO [workerthread-1] worker-1

2017-03-20 21:39:29,340 INFO [workerthread-2] worker-2

2017-03-20 21:39:29,340 INFO [workerthread-3] worker-3

2017-03-20 21:39:29,346 INFO [workerthread-4] worker-4

其中logging模块的basicConfig函数的format中的%(threadName)s就是用来输出当前线程的名称的。

线程可以重名, 线程名并不是线程的唯一标识,但是通常应该避免线程重名,通常的处理手段是加前缀

daemon

Daemon:守护

和Daemon线程相对应的还有Non-Daemon线程,在此Thread初始化函数中的daemon参数即表示线程是否是Daemon线程。

Daemon线程:会伴随主线程结束而结束(可以理解为主线程结束,守护线程结束)

Non-Daemon线程:不会随着主线程结束而结束,主线程需要等待Non-Daemon结束

import logging

import time

import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():

logging.info('starting')

time.sleep(2)

logging.info('stopping')

if __name__ == '__main__':

logging.info('starting')

t1 = threading.Thread(target=worker, name='worker1', daemon=False)

t1.start()

time.sleep(1)

t2 = threading.Thread(target=worker, name='worker2', daemon=True)

t2.start()

logging.info('stopping')

# 输出

2017-03-20 23:28:06,404 INFO [MainThread] starting

2017-03-20 23:28:06,436 INFO [worker1] starting

2017-03-20 23:28:07,492 INFO [worker2] starting

2017-03-20 23:28:07,492 INFO [MainThread] stopping # 主线程执行完成

2017-03-20 23:28:08,439 INFO [worker1] stopping # 主线程执行完成之后会等Non-Daemon线程执行完成,但是并不会等Daemon线程执行完成,即Daemon线程会随着主线程执行完成而释放

Thread.join()

如果想等Daemon线程执行完成之后主线程再退出,可以使用线程对象的join()方法

import logging

import time

import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():

logging.info('starting')

time.sleep(2)

logging.info('stopping')

if __name__ == '__main__':

logging.info('starting')

t1 = threading.Thread(target=worker, name='worker1', daemon=False)

t1.start()

time.sleep(1)

t2 = threading.Thread(target=worker, name='worker2', daemon=True)

t2.start()

logging.info('stopping')

t1.join()

t2.join()

# 输出

2017-03-20 23:41:07,217 INFO [MainThread] starting

2017-03-20 23:41:07,243 INFO [worker1] starting

2017-03-20 23:41:08,245 INFO [worker2] starting

2017-03-20 23:41:08,246 INFO [MainThread] stopping

2017-03-20 23:41:09,243 INFO [worker1] stopping

2017-03-20 23:41:10,248 INFO [worker2] stopping

使用join函数只有主线程就需要等待Daemon线程执行完成在推出。

join函数的原型:join(self, timeout=None)

join方法会阻塞直到线程退出或者超时, timeout 是可选的,如果不设置timeout, 会一直等待线程退出。如果设置了timeout,会在超时之后退出或者线程执行完成退出。

因为join函数总是返回None,因此在超时时间到达之后如果要知道线程是否还是存活的,可以调用is_alive()方法判断线程是否存活。

threading常用方法

enumerate()

列出当前所有的存活的线程

>>> threading.enumerate()

[<_mainthread started>, , ]

local()

import logging

import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

ctx = threading.local()

ctx.data = 5

data = 'a'

def worker():

logging.info(data)

logging.info(ctx.data)

worker()

threading.Thread(target=worker).start()

# 输出

2017-03-21 00:02:08,102 INFO [MainThread] a

2017-03-21 00:02:08,113 INFO [MainThread] 5

2017-03-21 00:02:08,119 INFO [Thread-34] a

Exception in thread Thread-34:

Traceback (most recent call last):

File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner

self.run()

File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run

self._target(*self._args, **self._kwargs)

File "", line 7, in worker

logging.info(ctx.data)

AttributeError: '_thread._local' object has no attribute 'data'

线程共享内存、状态和资源。但是thread模块的local类的对象的属性, 只在当前线程可见。

Thread类的派生

Python中可以通过继承 Thread 类并重写 run 方法来编写多线程的逻辑,此时逻辑函数就是run。

class mythread(threading.Thread):

def run(self):

print('mythread run')

t = mythread()

t.run() # 输出mythread run

t.start() # 输出mythread run

通过继承方式派生而来的子类对象可以同时执行start方法和run方法,结果是一样的,都是执行子类的run方法。但是非继承的方式不能同时使用start方法和run方法,会报错。

派生时逻辑函数的参数传递

class mythread(threading.Thread):

def __init__(self, *args, **kwargs):

super().__init__() # 需要调用父类的初始化方法初始化

self.args = args

self.kwargs = kwargs

def run(self):

print('mythread run', self.args, self.kwargs)

t = mythread(1, 2, 3, a='b')

t.start() # 输出mythread run (1, 2, 3) {'a': 'b'}

Timer类

Timer类:Thread类的派生类,也在threading模块中。意为定时器,用作线程的延迟执行。

>>> help(threading.Timer)

Timer类的初始化方法:__init__(self, interval, function, args=None, kwargs=None)

interval:时间间隔,即几秒之后开始执行function

function:线程执行的逻辑函数

args:位置参数

kwargs:关键字参数

代码

import threading

import time

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():

logging.info('worker running')

t1 = threading.Timer(interval=3, function=worker)

t2 = threading.Timer(interval=3, function=worker)

t1.setName('t1')

t2.setName('t2')

logging.info('start')

t1.start()

t2.start()

time.sleep(2)

logging.info('canceling {}'.format(t1.name))

t1.cancel() # 2s之后仍然可以取消t1

logging.info('end')

# 输出

2017-03-21 19:28:52,801 INFO [MainThread] start

2017-03-21 19:28:54,811 INFO [MainThread] canceling t1

2017-03-21 19:28:54,819 INFO [MainThread] end

2017-03-21 19:28:55,808 INFO [t2] worker running

Timer.cancel():取消仍然存活的定时器,如果定时器已经开始执行function,则无法取消。

Timer.setDaemon(True):设置定时器为守护线程

线程同步

当使用多个线程来访问同一个数据时,会经常出现资源争用等线程安全问题(比如多个线程都在操作同一数据导致数据不一致),这时候我们就可以使用一些同步技术来解决这类问题。比如Event,Lock,Condition,Barrier,Semaphore等等。

Event

>>> help(threading.Event)

Event对象内置一个标志,这个标志可以由set()方法和clear()方法设定。线程可以使用wait()方法进行阻塞等待,知道Event对象内置标志被set。

clear(self):设置内置标志为False

set(self):设置内置标志为True

wait(self, timeout=None):开始阻塞,直到内置标志被设置为True(即wait会阻塞线程直到set方法被调用或者超时)

is_set(self):当且仅当内置标志为True的时候返回True

代码

以下代码实现的逻辑是:一个boss和五个睡觉工人,只要有一个工人完成了睡觉任务,那么就唤醒boss和其他工人。

import datetime

import threading

import logging

import random

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(event: threading.Event):

s = random.randint(1, 5)

event.wait(s) # wait方法而不使用sleep方法,可以让其他工人收到通知后不再等待

logging.info('sleep {}'.format(s))

event.set()

def boss(event:threading.Event):

start = datetime.datetime.now()

event.wait()

end = datetime.datetime.now()

logging.info('that boss exit takes {}s'.format(end - start))

def start():

event = threading.Event()

b = threading.Thread(target=boss, args=(event, ), name='boss')

b.start()

for i in range(5):

t = threading.Thread(target=worker, args=(event, ), name='worker-{}'.format(i))

t.start()

执行start()方法,测试结果

>>> start()

2017-03-21 21:20:17,195 INFO [worker-2] sleep 1

2017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s

2017-03-21 21:20:17,199 INFO [worker-0] sleep 2

2017-03-21 21:20:17,199 INFO [worker-1] sleep 3

2017-03-21 21:20:17,199 INFO [worker-3] sleep 2

2017-03-21 21:20:17,198 INFO [worker-4] sleep 1

可以看到:worker-2退出之后,boss和另外四个worker也瞬间就退出了。所以event对象的内置状态被set之后,相关线程就不再wait了。

event:在线程之间发送信号,通常用于某个线程需要等待其他线程处理完成某些动作之后才能启动

wait()方法的timeout参数

def worker(event: threading.Event):

while not event.wait(3):

logging.info('run run run')

event = threading.Event()

threading.Thread(target=worker, args=(event, )).start()

# 输出

2017-03-21 21:32:47,275 INFO [Thread-8] run run run

2017-03-21 21:32:50,277 INFO [Thread-8] run run run

2017-03-21 21:32:53,281 INFO [Thread-8] run run run

2017-03-21 21:32:56,284 INFO [Thread-8] run run run

...

程序每隔3s就会输出一次结果,直到执行set()方法才会停止。因此我们可以写一个定时器(类似于Thread类的派生类Timer)。

代码

class Timer:

def __init__(self, interval, function, *args, **kwargs):

self.interval = interval

self.function = function

self.args = args

self.kwargs = kwargs

self.event = threading.Event()

self.thread = threading.Thread(target=self.__target(), args=args, kwargs=kwargs)

def __target(self):

if not self.event.wait(self.interval):

return self.function

def start(self):

self.thread.start()

def cancel(self):

self.event.set()

def worker(act):

logging.info('run-{}'.format(act))

t = Timer(5, worker, 'hahaha')

t.start() # 输出2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha

延迟5s之后执行了逻辑函数,也可以使用cancel函数取消。(要注意参数的传递,此处Timer初始化不能使用关键字参数)

Lock

event是用来同步线程之间的操作的,但是如果要控制共享资源的访问那就需要用到锁机制了,在Python标准库中的实现就是内置的lock类。

>>> help(threading.Lock)

threading.Lock()函数会创建一个lock类的对象。

>>> help(threading.Lock())

锁对象是一个同步原语(synchronization primitive),lock对象主要有以下三个方法:

acquire(): acquire(blocking=True, timeout=-1) -> bool 获得锁(即锁定锁)。成功获得锁返回True,没有获得锁则返回False。

release(): release() 释放锁

locked(): locked() -> bool 检查锁是否被锁住

代码

以下代码实现了在多个进程同时对资源进行访问时,进行加锁和解锁的操作,保证加减操作和赋值操作组合之后的原子性。

class Counter: # 计时器有加减方法,都会修改value值,因此都需要加锁处理

def __init__(self, start=0):

self.value = start

self.lock = threading.Lock()

def inc(self):

self.lock.acquire()

try:

self.value += 1

finally:

self.lock.release() # 需要用finally语句保证锁一定会被释放,否则资源永远不可访问

def dec(self):

self.lock.acquire()

try:

self.value -= 1

finally:

self.lock.release()

def inc_worker(c: Counter):

pause = random.random()

logging.info('sleeping-{}'.format(pause))

time.sleep(pause)

c.inc()

logging.info('cur_value:{}'.format(c.value))

def dec_worker(c: Counter):

pause = random.random()

logging.info('sleeping-{}'.format(pause))

time.sleep(pause)

c.dec()

logging.info('cur_value:{}'.format(c.value))

c = Counter()

for i in range(2):

threading.Thread(target=inc_worker, args=(c, ), name='inc_worker-{}'.format(i)).start()

for i in range(3):

threading.Thread(target=dec_worker, args=(c, ), name='dec_worker-{}'.format(i)).start()

测试输出

2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.6542416949220327

2017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.48615543229897873

2017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.12355589507242459

2017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.5276710391905681

2017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.5546251407611247

2017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-1

2017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:0

2017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-1

2017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-2

2017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1

可见,各项操作之间保持相互原子性,没有出现干扰。

因为lock类实现了__enter__和__exit__两个魔术方法,因此支持上下文管理器,可以修改以上Counter类的实现方法如下:

class Counter:

def __init__(self, start=0):

self.value = start

self.lock = threading.Lock()

def inc(self):

self.lock.acquire()

with self.lock:

self.value += 1

def dec(self):

self.lock.acquire()

with self.lock:

self.value -= 1

即使用上下文管理器来代替try...finally...语句,测试输出应该以以上结果一致。

acquire方法的blocking参数

当blocking=True时,A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法,如果在B线程中再次执行lock.acquire()方法,则B线程阻塞。

正如以上代码实现,当有n个线程需要修改一个共享资源的时候,其他线程在获取锁之前都处于阻塞状态。(python的阻塞都会让出cpu的时间片,因此不是忙等待)

当blocking=Fasle时,A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法,如果在B线程中再次执行lock.acquire()方法,则B线程不会阻塞,并且acquire函数返回False。

acquire方法的timeout参数

当blocking=True并且timeout>0时,acquire会一直阻塞到超时或者锁被释放。

acquire(0)的参数传递

模拟acquire方法的默认参数,编写一下函数进行模拟参数传递的过程:

def print1(blocking=True, timeout=-1):

print(blocking, timeout)

print1(0) # 输出0 -1

print1(10) # 输出10 -1

可见第一个位置参数,替代了blocking。也就是说lock.acquire(0)等效于lock.acquire(blocking=False)

RLock

正常的lock对象是不能多次调用acquire的,但是可重用锁RLock可以多次调用 acquire 而不阻塞,而且 release 时也要执行和 acquire 一样的次数。

Condition

除了Event对象之外,线程同步还可以使用条件同步机制Condition。一类线程等待特定条件,而另一类线程发出特定条件满足的信号。

>>> help(threading.Condition)

在Condition的帮助中有以下几个方法:

初始化方法:init(self, lock=None)。如果给定了lock参数,那么必须是Lock或者Rlock对象,并且被当做底层锁来使用。如果没有指定,那么会创建一个RLock对象的锁,也被当做底层锁来使用。

实现了__enter__和__exit__方法,支持上下文管理器。

notify(self, n=1):唤醒一个或多个在当前Condition上等待的其他线程,如果此方法的调用线程没有获得锁,那么在调用的时候就会报错RuntimeError

notify_all(self):唤醒所有线程

wait(self, timeout=None):一直等待着知道被notifyed或者发生超时

实例代码

以下代码实现的是:有一个生产者线程,会生产若干次,每次生产结束后需要通知所有的消费者线程来消费,因此下面代码使用的是notify_all方法。

import threading

import time

import logging

import random

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Producer_Consumer_Model:

def __init__(self):

self.data = None

self.event = threading.Event() # 用来控制消费者退出

self.condition = threading.Condition()

def Consumer(self):

while not self.event.is_set():

with self.condition:

self.condition.wait() # 一直等待直到收到生产者通知notify_all

logging.info(self.data) # 收到通知之后,开始执行消费者的业务逻辑部分

def Producer(self):

for _ in range(4): # 每个生产者生产4次

data = random.randint(0, 100)

logging.info(data)

with self.condition:

self.data = data # 写入成功就表示生产成功,因此需要在此加锁并且能够通知消费者线程去消费,因此选择使用condition来处理

self.condition.notify_all() # 生产成功之后通知所有的消费者去消费

self.event.wait(1) # 没生产一次等待1s

self.event.set() # 所有的生产完成之后通知消费者退出

m = Producer_Consumer_Model()

for i in range(3):

threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()

p = threading.Thread(target=m.Producer, name='Producer')

p.start()

测试结果(一个生产者,三个消费者)

2017-03-22 22:07:42,875 INFO [Producer] 16

2017-03-22 22:07:42,883 INFO [Consumer-0] 16

2017-03-22 22:07:42,890 INFO [Consumer-2] 16

2017-03-22 22:07:42,894 INFO [Consumer-1] 16

2017-03-22 22:07:43,884 INFO [Producer] 76

2017-03-22 22:07:43,888 INFO [Consumer-0] 76

2017-03-22 22:07:43,895 INFO [Consumer-2] 76

2017-03-22 22:07:43,898 INFO [Consumer-1] 76

2017-03-22 22:07:44,889 INFO [Producer] 31

2017-03-22 22:07:44,891 INFO [Consumer-0] 31

2017-03-22 22:07:44,911 INFO [Consumer-2] 31

2017-03-22 22:07:44,913 INFO [Consumer-1] 31

2017-03-22 22:07:45,892 INFO [Producer] 17

2017-03-22 22:07:45,894 INFO [Consumer-0] 17

2017-03-22 22:07:45,907 INFO [Consumer-2] 17

2017-03-22 22:07:45,910 INFO [Consumer-1] 17

可见,生产者每生产一次,所有的消费者就会去消费。如果想控制每次生产之后通知几个消费者来消费,那么就可以使用notify方法,指定消费者线程个数。

代码如下

import threading

import time

import logging

import random

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Producer_Consumer_Model:

def __init__(self):

self.data = None

self.event = threading.Event() # 用来控制消费者退出

self.condition = threading.Condition()

def Consumer(self):

while not self.event.is_set():

with self.condition:

self.condition.wait() # 一直等待直到收到生产者通知notify_all

logging.info(self.data) # 收到通知之后,开始执行消费者的业务逻辑部分

def Producer(self):

for _ in range(4): # 每个生产者生产4次

data = random.randint(0, 100)

logging.info(data)

with self.condition:

self.data = data # 写入成功就表示生产成功,因此需要在此加锁并且能够通知消费者线程去消费,因此选择使用condition来处理

self.condition.notify(1) # 生产成功之后通知所有的消费者去消费

self.event.wait(1) # 没生产一次等待1s

self.event.set() # 所有的生产完成之后通知消费者退出

m = Producer_Consumer_Model()

for i in range(3):

threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()

p = threading.Thread(target=m.Producer, name='Producer')

p.start()

测试结果(一个生产者,三个消费者,每次生产之后只通知一个消费者去消费)

2017-03-22 22:24:52,933 INFO [Producer] 11

2017-03-22 22:24:52,948 INFO [Consumer-0] 11

2017-03-22 22:24:53,949 INFO [Producer] 47

2017-03-22 22:24:53,967 INFO [Consumer-1] 47

2017-03-22 22:24:54,967 INFO [Producer] 14

2017-03-22 22:24:54,983 INFO [Consumer-2] 14

2017-03-22 22:24:55,986 INFO [Producer] 54

2017-03-22 22:24:55,993 INFO [Consumer-0] 54

Condition 通常用于生产者消费者模式, 生产者生产消息之后, 使用notify 或者 notify_all 通知消费者消费。

消费者使用wait方法阻塞等待生产者通知

notify通知指定个wait的线程, notify_all通知所有的wait线程

无论notify/notify_all还是wait 都必须先acqurie, 完成后必须确保release, 通常使用with语法

Barrier

Barrier类存在于threading模块中,中文可以翻译成栅栏

>>> help(threading.Barrier)

可以看到Barrier的主要方法和属性:

__init__(self, parties, action=None, timeout=None):初始化方法,创建一个Barrier

parties:所有参与的线程的数量

action:所有的线程都wait之后并且在线程释放之前就会执行这个action函数,相当于集结之后要做的事情。

timeout:相当于给需要等待的每个线程的wait方法加上timeout参数,超时则barrier不再生效

abort(self):将Barrier设置成broken状态

reset(self):将Barrier重置为最初状态

wait(self, timeout=None):在Barrier前等待,返回在Barrier前等待的下标,从0到parties-1

broken:如果Barrier处于broken状态则返回True

n_waiting:当前已经在Barrier处等待的线程的数量

parties:需要在Barrier处等待的线程的数量

示例代码

import threading

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

barrier = threading.Barrier(parties=3)

def worker(barrier: threading.Barrier):

logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))

worker_id = barrier.wait()

logging.info('after barrier {}'.format(worker_id))

for i in range(3):

threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start()

测试结果

2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others

2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others

2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others

2017-03-22 23:25:04,001 INFO [worker-2] after barrier 2

2017-03-22 23:25:04,001 INFO [worker-0] after barrier 0

2017-03-22 23:25:04,001 INFO [worker-1] after barrier 1

可见,所有的线程都会一直等待,知道所有的线程都到期了,然后就通过barrier,继续执行后续操作。

Barrier会建立一个控制点,所有参与的线程都会阻塞,直到所有参与的“各方”达到这一点。 它让线程分开启动,然后暂停,直到它们都准备好再继续。因此,这一点可以理解为各个线程的一个集结点。

abort函数的使用

将Barrier设置成broken状态。所有线程在参与集结过程中,只要执行了barrier.abort方法,那么正在等待的线程都会抛出threading.BrokenBarrierError异常。可以理解为,只要有一个线程确定已经到不了Barrier并且通知了Barrier,那么Barrier就会执行abort方法,通知所有正在wait的线程放弃集结。

实例代码

import threading

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(barrier: threading.Barrier):

logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))

try:

worker_id = barrier.wait()

except threading.BrokenBarrierError:

logging.info('aborting')

else:

logging.info('after barrier {}'.format(worker_id))

barrier = threading.Barrier(4) # 需要等待4个线程

for i in range(3):

threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start() # 3个线程都开始wait

barrier.abort() # 还有一个线程没有到wait,此时执行abort方法,则所有正在wait的线程都抛出异常

测试结果

2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others

2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others

2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others

2017-03-22 23:47:43,211 INFO [worker-2] aborting

2017-03-22 23:47:43,207 INFO [worker-1] aborting

2017-03-22 23:47:43,207 INFO [worker-0] aborting

Semaphore

Semaphore类存在于threading模块中

help(threading.Semaphore)

信号量内部管理者一个计数器,这个计数器的值等于release()方法调用的次数减去acquire()方法调用的次数然后再加上初始值value,value默认为1。

可以看到Semaphore的主要方法:

__init__(self, value=1):初始化一个信号量,value为内部计数器赋初值,默认为1

acquire(self, blocking=True, timeout=None):获取信号量,内部计数器减一

release(self):释放信号量,内部计数器加一

示例代码

import threading

import time

import logging

import random

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Pool:

def __init__(self, num):

self.num = num

self.conns = [self._make_connect(i) for i in range(num)] # 存放连接

self.sem = threading.Semaphore(num) # 信号量内部计数器初始为连接数

def _make_connect(self, name): # 根据连接名称创建连接

conn = 'connect-{}'.format(name)

return conn

def get_connect(self): # 从连接池获取连接

self.sem.acquire()

return self.conns.pop()

def return_connect(self, conn): # 将连接conn返还到连接池中

self.conns.insert(0, conn)

self.sem.release()

def worker(pool: Pool):

logging.info('starting')

conn = pool.get_connect() # 如果获取不到则会阻塞在acquire处

logging.info('get connect {}'.format(conn))

t = random.randint(1, 3)

time.sleep(t)

logging.info('takes {}s'.format(t))

pool.return_connect(conn)

logging.info('return connect {}'.format(conn))

pool = Pool(2) # 连接池中有两个连接可以使用

for i in range(3): # 三个线程使用两个连接开始任务

threading.Thread(target=worker, args=(pool, ), name='worker-{}'.format(i)).start()

测试结果

2017-03-23 00:54:36,056 INFO [worker-0] starting

2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-1

2017-03-23 00:54:36,074 INFO [worker-1] starting

2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-0

2017-03-23 00:54:36,089 INFO [worker-2] starting

2017-03-23 00:54:39,074 INFO [worker-0] takes 3s

2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-1

2017-03-23 00:54:39,076 INFO [worker-2] get connect connect-1

2017-03-23 00:54:39,093 INFO [worker-1] takes 3s

2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-0

2017-03-23 00:54:40,093 INFO [worker-2] takes 1s

2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1

这个测试结果显示:三个线程获取连接池中的两个连接,结果出现了一个线程等待其他线程执行完成之后再获取连接的过程。

Queue

Condition线程同步部分用来传递数据的是一个封装在生产者消费者模型中的元素data(正常使用情况下一般封装的都是一个列表,类似与Barrier部分的连接池中的conns列表)。

Python的queue模块中提供了同步的、线程安全的队列类,包括三种队列:

FIFO(先入先出)队列Queue

LIFO(后入先出)队列LifoQueue

优先级队列PriorityQueue

这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。因此我们可以使用queue模块来替换掉生产者消费者中的全局元素,代码如下:

import random

import queue

import threading

class Producer_Consumer_Model:

def __init__(self):

self.q = queue.Queue()

self.event = threading.Event()

def Consumer(self):

while not self.event.is_set():

logging.info(self.q.get())

def Producer(self):

while not self.event.wait(3):

data = random.randint(1, 100)

logging.info(data)

self.q.put(data)

m = Producer_Consumer_Model()

threading.Thread(target=m.Consumer, name='Consumer').start()

threading.Thread(target=m.Producer, name='Producer').start()

测试结果

2017-03-23 10:11:22,990 INFO [Producer] 26

2017-03-23 10:11:22,993 INFO [Consumer] 26

2017-03-23 10:11:25,993 INFO [Producer] 89

2017-03-23 10:11:26,003 INFO [Consumer] 89

2017-03-23 10:11:29,004 INFO [Producer] 14

2017-03-23 10:11:29,006 INFO [Consumer] 14

2017-03-23 10:11:32,007 INFO [Producer] 17

2017-03-23 10:11:32,009 INFO [Consumer] 17

每生产一次,消费者就会消费一次。当消费者线程,读取Queue则调用Queue.get()方法,若Queue为空时消费者线程获取不到内容,就会阻塞在这里,直到成功获取内容。

线程同步总结

Event:主要用于线程之间的事件通知

Lock,Rlock:主要用于保护共享资源

Condition:主要用于生产者消费者模型,可以理解为Event和Lock的结合体

Barrier:同步指定个等待的线程

Semaphore:主要用于保护资源,和Lock的区别在于可以多个线程访问共享资源,而锁一次只能一个线程访问到共享资源,即锁是value=1的信号量

Queue:使用FIFO队列进行同步,适用于生产者消费者模型

GIL

GIL(Global Interpreter Lock):全局解释器锁

Python代码的执行由Python 主循环来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 主循环的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

因此Python多线程程序的执行顺序如下:

设置GIL

切换到一个线程去运行

运行

结束线程

解锁GIL

重复以上步骤

因此,Python的多线程并没有实现并行,只是实现了并发而已。如果要实现真正的并行,那就需要使用Python的多进程模块multiprocessing(multiprocessing模块的宗旨是像管理线程一样来管理进程)。

参考资料

记得帮我点赞哦!

精心整理了计算机各个方向的从入门、进阶、实战的视频课程和电子书,按照目录合理分类,总能找到你需要的学习资料,还在等什么?快去关注下载吧!!!

念念不忘,必有回响,小伙伴们帮我点个赞吧,非常感谢。

我是职场亮哥,YY高级软件工程师、四年工作经验,拒绝咸鱼争当龙头的斜杠程序员。

听我说,进步多,程序人生一把梭

如果有幸能帮到你,请帮我点个【赞】,给个关注,如果能顺带评论给个鼓励,将不胜感激。

职场亮哥文章列表:更多文章

本人所有文章、回答都与版权保护平台有合作,著作权归职场亮哥所有,未经授权,转载必究!

python多线程输出_Python多线程相关推荐

  1. python多线程库_python多线程库

    广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! python 多线程 多线程类似于同时执行多个不同程序,多线程运行有如下优点:使 ...

  2. python线程唤醒_python 多线程

    python 多线程 真正的多线程吗? 对于多核处理器,在同一时间确实可以多个线程独立运行,但在Python中确不是这样的了.原因在于,python虚拟机中引入了GIL这一概念.GIL(Global ...

  3. python数据输出_python数据输出

    下面的是我的code, 但是最后的结果全部都是others,有哪位大神能帮忙看产生输出的最简单方法是使用print语句,可以通过用逗号分隔零个或多个表达式.这个函数传递表达式转换为一个字符串,如下结果 ...

  4. python怎么输出_python输出语句怎么用

    Python中输出语句是print语句 1.逗号分隔符 在Python中print语句可根据sys.stdout中包含的文件生成输出,它接受一个逗号分隔的对象表,如下实例:x = 1 y = 2 z ...

  5. python怎么输出_Python如何输出整数

    Python输出整数的方法: 先使用str()函数将数字转换成字符串赋值给变量i,再用"if i.count('.') == 0"语句判断字符串中是否没有小数点,如果是则输出这个字 ...

  6. python多线程挂了_python多线程输入的问题 python高效编程技巧13(如何在线程之间实现...

    python3 创建线程时不用args传参,执行线程时为什如果创建线程时在target里就传入了参数,为什么在启动线程时,线程不是在Python多线程下,每个线程的执行方式: 有什么了不起,大不了继续 ...

  7. python多线程模块_python 多线程模块参考

    threading.active_count() 返回当前处于 active 状态的线程的数目 threading.current_thread() 返回调用者当前的 Thread 对象 thread ...

  8. python技术简介_Python多线程技术简介,简单,阐述,python

    python多线程 python中创建多线程方法有两种,这里只介绍简单的一种: from threading import Thread #导入模块 import time def test(thre ...

  9. python 多线程 模块_Python多线程threading和multiprocessing模块实例解析

    本文研究的主要是Python多线程threading和multiprocessing模块的相关内容,具体介绍如下. 线程是一个进程的实体,是由表示程序运行状态的寄存器(如程序计数器.栈指针)以及堆栈组 ...

最新文章

  1. 像淘宝、京东这样的大型电商网站服务器构架完全部署解决方案
  2. Java 8 的 JVM 有多快?Fork-Join 性能基准测试
  3. C宏定义中的##,#,#@用法介绍
  4. ubuntu部署git
  5. destoon 自定义session丢失
  6. Anime Studio Pro v6 汉化版
  7. MySQL的主动优化和被动优化_MySQL“被动”性能优化汇总
  8. vmware开机自动进入BIOS vmware 进入BIOS方法
  9. jfinal 模板引擎
  10. 前端项目架构模板-(三)交互式打包及自动化部署前端项目
  11. php使用blob加密视频,javascript实现blob加密视频源地址的方法
  12. scratch作品导出html5,抓取scratch源码,使用二进制方式保存。
  13. matlab 自激振荡,基于Simulink的非线性系统自激振荡的仿真
  14. 90.pygame游戏-玩个球(play the ball)最终版
  15. 灰关联分析与语音/音乐信号识别
  16. 计算机三级 网络技术 大题第一题 答题技巧分享
  17. WLAN驱动分析文档_gzc126_新浪博客
  18. java 四则运算_四则运算————javaweb版
  19. Android四大天王
  20. MATLAB: Table example

热门文章

  1. 流利的验证组件:FluentValidation
  2. 关于不能远程连接Linux中Mysql数据库的问题
  3. module_init和init_module的区别
  4. jsp,OGNL调用后台Action的某方法
  5. 新一代平板电脑 三星Galaxy Note 10.1将于本月末发布 - TECH2IPO创见
  6. 使用common-fileUpload制作文件上传【转】
  7. Silverlight 控件开发记录之 extern alias” 关键字
  8. 用投资的观点学习编程
  9. python flask 返回值 状态码 设置
  10. 数字货币 区块链 双花攻击 Double Spend Attack 简介