Python

Python开发

Python语言

Python3 系列之 并行编程

进程和线程

进程是程序运行的实例。一个进程里面可以包含多个线程,因此同一进程下的多个线程之间可以共享线程内的所有资源,它是操作系统动态运行的基本单元;每一个线程是进程下的一个实例,可以动态调度和独立运行,由于线程和进程有很多类似的特点,因此,线程又被称为轻量级的进程。线程的运行在进程之下,进程的存在依赖于线程;

开胃菜

基于 Python3 创建一个简单的进程示例

from threading import Thread

from time import sleep

class CookBook(Thread):

def __init__(self):

Thread.__init__(self)

self.message = "Hello Parallel Python CookBook!!n"

def print_message(self):

print(self.message)

def run(self):

print("Thread Startingn")

x = 0

while x < 10:

self.print_message()

sleep(2)

x += 1

print("Thread Ended!n")

print("Process Started")

hello_python = CookBook()

hello_python.start()

print("Process Ended")

需要注意的是,永远不要让线程在后台默默执行,当其执行完毕后要及时释放资源。

基于线程的并行

多线程编程一般使用共享内存空间进行线程间的通信,这就使管理内存空间成为多线程编程的关键。Python 通过标准库 threading 模块来管理线程,具有以下的组件:

线程对象

Lock 对象

RLock 对象

信号对象

条件对象

事件对象

定义一个线程

基本语法

示例代码如下所示

import threading

def function(i):

print("function called by thread: {0}".format(i))

return

threads = []

for i in range(5):

t = threading.Thread(target=function, args=(i,))

threads.append(t)

t.start()

lambda t, threads: t.join()

需要注意的是,线程创建后并不会自动运行,需要主动调用 start() 方法来启动线程,join() 会让调用它的线程被阻塞直到执行结束。(PS:可通过调用 t.setDaemon(True) 使其为后台线程避免主线程被阻塞)

线程定位

示例代码如下所示

import threading

import time

def first_function():

print("{0} is starting".format(threading.currentThread().getName()))

time.sleep(2)

print("{0} is Exiting".format(threading.currentThread().getName()))

def second_function():

print("{0} is starting".format(threading.currentThread().getName()))

time.sleep(2)

print("{0} is Exiting".format(threading.currentThread().getName()))

def third_function():

print("{0} is starting".format(threading.currentThread().getName()))

time.sleep(2)

print("{0} is Exiting".format(threading.currentThread().getName()))

if __name__ == "__main__":

t1 = threading.Thread(target=first_function,name="first")

t2 = threading.Thread(target=second_function,name="second")

t3 = threading.Thread(target=third_function,name="third")

t1.start()

t2.start()

t3.start()

t1.join()

t2.join()

t3.join()

通过设置 threading.Thread() 函数的 name 参数来设置线程名称,通过 threading.currentThread().getName() 来获取当前线程名称;线程的默认名称会以 Thread-{i} 格式来定义

自定义一个线程对象

示例代码如下所示

import threading

import time

exitFlag = 0

class myThread(threading.Thread):

def __init__(self, threadID, name, counter):

threading.Thread.__init__(self)

self.threadID = threadID

self.name = name

self.counter = counter

def run(self):

print("Starting:{0}".format(self.name))

print_time(self.name, self.counter, 5)

print("Exiting:{0}".format(self.name))

def print_time(threadName, delay, counter):

while counter:

if exitFlag:

thread.exit()

time.sleep(delay)

print("{0} {1}".format(threadName, time.ctime(time.time())))

counter -= 1

t1 = myThread(1, "Thread-1", 1)

t2 = myThread(2, "Thread-2", 1)

t1.start()

t2.start()

t1.join()

t2.join()

print("Exiting Main Thread.")

如果想自定义一个线程对象,首先就是要定义一个继承 threading.Thread 类的子类,实现构造函数, 并重写 run() 方法即可。

线程同步

Lock

示例代码如下所示

import threading

shared_resource_with_lock = 0

shared_resource_with_no_lock = 0

COUNT = 100000

shared_resource_lock = threading.Lock()

def increment_with_lock():

global shared_resource_with_lock

for i in range(COUNT):

shared_resource_lock.acquire()

shared_resource_with_lock += 1

shared_resource_lock.release()

def decrement_with_lock():

global shared_resource_with_lock

for i in range(COUNT):

shared_resource_lock.acquire()

shared_resource_with_lock -= 1

shared_resource_lock.release()

def increment_without_lock():

global shared_resource_with_no_lock

for i in range(COUNT):

shared_resource_with_no_lock += 1

def decrement_wthout_lock():

global shared_resource_with_no_lock

for i in range(COUNT):

shared_resource_with_no_lock -= 1

if __name__ == "__main__":

t1 = threading.Thread(target=increment_with_lock)

t2 = threading.Thread(target=decrement_with_lock)

t3 = threading.Thread(target=increment_without_lock)

t4 = threading.Thread(target=decrement_wthout_lock)

t1.start()

t2.start()

t3.start()

t4.start()

t1.join()

t2.join()

t3.join()

t4.join()

print("the value of shared variable with lock management is :{0}".format(

shared_resource_with_lock))

print("the value of shared variable with race condition is :{0}".format(

shared_resource_with_no_lock))

通过 threading.Lock() 方法我们可以拿到线程锁,一般有两种操作方式:acquire() 和 release() 在两者之间是加锁状态,如果释放失败的话会显示 RuntimError() 的异常。

RLock

RLock 也叫递归锁,和 Lock 的区别在于:谁拿到谁释放,是通过 threading.RLock() 来拿到的;

示例代码如下所示

import threading

import time

class Box(object):

lock = threading.RLock()

def __init__(self):

self.total_items = 0

def execute(self, n):

Box.lock.acquire()

self.total_items += n

Box.lock.release()

def add(self):

Box.lock.acquire()

self.execute(1)

Box.lock.release()

def remove(self):

Box.lock.acquire()

self.execute(-1)

Box.lock.release()

def adder(box, items):

while items > 0:

print("adding 1 item in the box")

box.add()

time.sleep(1)

items -= 1

def remover(box, items):

while items > 0:

print("removing 1 item in the box")

box.remove()

time.sleep(1)

items -= 1

if __name__ == "__main__":

items = 5

print("putting {0} items in the box".format(items))

box = Box()

t1 = threading.Thread(target=adder, args=(box, items))

t2 = threading.Thread(target=remover, args=(box, items))

t1.start()

t2.start()

t1.join()

t2.join()

print("{0} items still remain in the box".format(box.total_items))

信号量

示例代码如下所示

import threading

import time

import random

semaphore = threading.Semaphore(0)

def consumer():

print("Consumer is waiting.")

semaphore.acquire()

print("Consumer notify:consumed item numbers {0}".format(item))

def producer():

global item

time.sleep(10)

item = random.randint(0, 10000)

print("producer notify:produced item number {0}".format(item))

semaphore.release()

if __name__ == "__main__":

for i in range(0, 5):

t1 = threading.Thread(target=producer)

t2 = threading.Thread(target=consumer)

t1.start()

t2.start()

t1.join()

t2.join()

print("program terminated.")

信号量初始化为 0 ,然后在两个并行线程中,通过调用 semaphore.acquire() 函数会阻塞消费者线程,直到 semaphore.release() 在生产者中被调用,这里模拟了生产者-消费者 模式来进行了测试;如果信号量的计数器到了0,就会阻塞 acquire() 方法,直到得到另一个线程的通知。如果信号量的计数器大于0,就会对这个值-1然后分配资源。

使用条件进行线程同步

解释条件机制最好的例子还是生产者-消费者问题。在本例中,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后销毁)。当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。

示例代码如下所示

from threading import Thread, Condition

import time

items = []

condition = Condition()

class consumer(Thread):

def __init__(self):

Thread.__init__(self)

def consume(self):

global condition

global items

condition.acquire()

if len(items) == 0:

condition.wait()

print("Consumer notify:no item to consum")

items.pop()

print("Consumer notify: consumed 1 item")

print("Consumer notify: item to consume are:{0}".format(len(items)))

condition.notify()

condition.release()

def run(self):

for i in range(0, 20):

time.sleep(2)

self.consume()

class producer(Thread):

def __init__(self):

Thread.__init__(self)

def produce(self):

global condition

global items

condition.acquire()

if len(items) == 10:

condition.wait()

print("Producer notify:items producted are:{0}".format(len(items)))

print("Producer notify:stop the production!!")

items.append(1)

print("Producer notify:total items producted:{0}".format(len(items)))

condition.notify()

condition.release()

def run(self):

for i in range(0, 20):

time.sleep(1)

self.produce()

if __name__ == "__main__":

producer = producer()

consumer = consumer()

producer.start()

consumer.start()

producer.join()

consumer.join()

通过 condition.acquire() 来获取锁对象,condition.wait() 会使当前线程进入阻塞状态,直到收到 condition.notify() 信号,同时,调用信号的通知的对象也要及时调用 condition.release() 来释放资源;

使用事件进行线程同步

事件是线程之间用于通信的对。有的线程等待信号,有的线程发出信号。

示例代码如下所示

import time

from threading import Thread, Event

import random

items = []

event = Event()

class consumer(Thread):

def __init__(self, items, event):

Thread.__init__(self)

self.items = items

self.event = event

def run(self):

while True:

time.sleep(2)

self.event.wait()

item = self.items.pop()

print('Consumer notify:{0} popped from list by {1}'.format(

item, self.name))

class producer(Thread):

def __init__(self, integers, event):

Thread.__init__(self)

self.items = items

self.event = event

def run(self):

global item

for i in range(100):

time.sleep(2)

item = random.randint(0, 256)

self.items.append(item)

print('Producer notify: item N° %d appended to list by %s' %

(item, self.name))

print('Producer notify: event set by %s' % self.name)

self.event.set()

print('Produce notify: event cleared by %s ' % self.name)

self.event.clear()

if __name__ == "__main__":

t1 = producer(items, event)

t2 = consumer(items, event)

t1.start()

t2.start()

t1.join()

t2.join()

使用 with 语法简化代码

import threading

import logging

logging.basicConfig(level=logging.DEBUG,

format='(%(threadName)-10s) %(message)s')

def threading_with(statement):

with statement:

logging.debug("%s acquired via with" % statement)

def Threading_not_with(statement):

statement.acquire()

try:

logging.debug("%s acquired directly " % statement)

finally:

statement.release()

if __name__ == "__main__":

lock = threading.Lock()

rlock = threading.RLock()

condition = threading.Condition()

mutex = threading.Semaphore(1)

threading_synchronization_list = [lock, rlock, condition, mutex]

for statement in threading_synchronization_list:

t1 = threading.Thread(target=threading_with, args=(statement,))

t2 = threading.Thread(target=Threading_not_with, args=(statement,))

t1.start()

t2.start()

t1.join()

t2.join()

使用 queue 进行线程通信

Queue 常用的方法有以下四个:

put():往 queue 中添加一个元素

get():从 queue 中删除一个元素,并返回该元素

task_done():每次元素被处理的时候都需要调用这个方法

join():所有元素都被处理之前一直阻塞from threading import Thread, Event

from queue import Queue

import time

import random

class producer(Thread):

def __init__(self, queue):

Thread.__init__(self)

self.queue = queue

def run(self):

for i in range(10):

item = random.randint(0, 256)

self.queue.put(item)

print("Producer notify: item item N° %d appended to queue by %s" %

(item, self.name))

time.sleep(1)

class consumer(Thread):

def __init__(self, queue):

Thread.__init__(self)

self.queue = queue

def run(self):

while True:

item = self.queue.get()

print('Consumer notify : %d popped from queue by %s' %

(item, self.name))

self.queue.task_done()

if __name__ == "__main__":

queue = Queue()

t1 = producer(queue)

t2 = consumer(queue)

t3 = consumer(queue)

t4 = consumer(queue)

t1.start()

t2.start()

t3.start()

t4.start()

t1.join()

t2.join()

t3.join()

t4.join()

基于进程的并行

multiprocessing 是 Python 标准库中的模块,实现了共享内存机制。

异步编程

使用 concurrent.futures 模块

该模块具有线程池和进程池,管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能;此模块由以下部分组成

concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。

submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。

map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。

shutdown(Wait=True): 发出让执行者释放所有资源的信号。

concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

示例代码如下所示

import concurrent.futures

import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def evaluate_item(x):

result_item = count(x)

return result_item

def count(number):

for i in range(0, 1000000):

i = i + 1

return i * number

if __name__ == "__main__":

# 顺序执行

start_time = time.time()

for item in number_list:

print(evaluate_item(item))

print("Sequential execution in " + str(time.time() - start_time), "seconds")

# 线程池执行

start_time_1 = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

futures = [executor.submit(evaluate_item, item)

for item in number_list]

for future in concurrent.futures.as_completed(futures):

print(future.result())

print("Thread pool execution in " +

str(time.time() - start_time_1), "seconds")

# 线程池执行

start_time_2 = time.time()

with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:

futures = [executor.submit(evaluate_item, item)

for item in number_list]

for future in concurrent.futures.as_completed(futures):

print(future.result())

print("Process pool execution in " +

str(time.time() - start_time_2), "seconds")

使用 Asyncio 管理事件循环

Python 的 Asyncio 模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:

事件循环: 在Asyncio模块中,每一个进程都有一个事件循环。

协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如IO)完成之后,从之前暂停的地方恢复执行。

Futures: 定义了 Future 对象,和 concurrent.futures 模块一样,表示尚未完成的计算。

Tasks: 这是Asyncio的子类,用于封装和管理并行模式下的协程。

Asyncio 提供了以下方法来管理事件循环:

loop = get_event_loop(): 得到当前上下文的事件循环。

loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。

loop.call_soon(callback, argument): 尽可能快调用 callback, call_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。

loop.time(): 以float类型返回当前时间循环的内部时间。

asyncio.set_event_loop(): 为当前上下文设置事件循环。

asyncio.new_event_loop(): 根据此策略创建一个新的时间循环并返回。

loop.run_forever(): 在调用 stop() 之前将一直运行。

示例代码如下所示

import asyncio

import datetime

import time

def fuction_1(end_time, loop):

print("function_1 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, fuction_2, end_time, loop)

else:

loop.stop()

def fuction_2(end_time, loop):

print("function_2 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, function_3, end_time, loop)

else:

loop.stop()

def function_3(end_time, loop):

print("function_3 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, fuction_1, end_time, loop)

else:

loop.stop()

def function_4(end_time, loop):

print("function_4 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, function_4, end_time, loop)

else:

loop.stop()

loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0

loop.call_soon(fuction_1, end_loop, loop)

loop.run_forever()

loop.close()

使用 Asyncio 管理协程

示例代码如下所示

import asyncio

import time

from random import randint

@asyncio.coroutine

def StartState():

print("Start State called n")

input_val = randint(0, 1)

time.sleep(1)

if input_val == 0:

result = yield from State2(input_val)

else:

result = yield from State1(input_val)

print("Resume of the Transition:nStart State calling" + result)

@asyncio.coroutine

def State1(transition_value):

outputVal = str("State 1 with transition value=%s n" % (transition_value))

input_val = randint(0, 1)

time.sleep(1)

print("...Evaluating...")

if input_val == 0:

result = yield from State3(input_val)

else:

result = yield from State2(input_val)

@asyncio.coroutine

def State2(transition_value):

outputVal = str("State 2 with transition value= %s n" %

(transition_value))

input_Val = randint(0, 1)

time.sleep(1)

print("...Evaluating...")

if (input_Val == 0):

result = yield from State1(input_Val)

else:

result = yield from State3(input_Val)

result = "State 2 calling " + result

return outputVal + str(result)

@asyncio.coroutine

def State3(transition_value):

outputVal = str("State 3 with transition value = %s n" %

(transition_value))

input_val = randint(0, 1)

time.sleep(1)

print("...Evaluating...")

if(input_val == 0):

result = yield from State1(input_val)

else:

result = yield from State2(input_val)

result = "State 3 calling " + result

return outputVal + str(result)

@asyncio.coroutine

def EndState(transition_value):

outputVal = str("End State With transition value = %s n" %

(transition_value))

print("...Stop Computation...")

return outputVal

if __name__ == "__main__":

print("Finites State Machine simulation with Asyncio Coroutine")

loop = asyncio.get_event_loop()

loop.run_until_complete(StartState())

使用 Asyncio 控制任务

示例代码如下所示

import asyncio

@asyncio.coroutine

def factorial(number):

f = 1

for i in range(2, number + 1):

print("Asyncio.Task:Compute factorial(%s)" % (i))

yield from asyncio.sleep(1)

f *= i

print("Asyncio.Task - factorial(%s) = %s" % (number, f))

@asyncio.coroutine

def fibonacci(number):

a, b = 0, 1

for i in range(number):

print("Asyncio.Task:Complete fibonacci (%s)" % (i))

yield from asyncio.sleep(1)

a, b = b, a+b

print("Asyncio.Task - fibonaci (%s)= %s" % (number, a))

@asyncio.coroutine

def binomialCoeff(n, k):

result = 1

for i in range(1, k+1):

result = result * (n-i+1) / i

print("Asyncio.Task:Compute binomialCoeff (%s)" % (i))

yield from asyncio.sleep(1)

print("Asyncio.Task - binomialCoeff (%s,%s) = %s" % (n, k, result))

if __name__ == "__main__":

tasks = [asyncio.Task(factorial(10)), asyncio.Task(

fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

使用Asyncio和Futures

示例代码如下所示

import asyncio

import sys

@asyncio.coroutine

def first_coroutine(future, N):

count = 0

for i in range(1, N + 1):

count = count + i

yield from asyncio.sleep(4)

future.set_result(

"first coroutine (sum of N integers) result = " + str(count))

@asyncio.coroutine

def second_coroutine(future, N):

count = 1

for i in range(2, N + 1):

count *= i

yield from asyncio.sleep(3)

future.set_result("second coroutine (factorial) result = " + str(count))

def got_result(future):

print(future.result())

if __name__ == "__main__":

N1 = 1

N2 = 1

loop = asyncio.get_event_loop()

future1 = asyncio.Future()

future2 = asyncio.Future()

tasks = [

first_coroutine(future1, N1),

second_coroutine(future2, N2)

]

future1.add_done_callback(got_result)

future2.add_done_callback(got_result)

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

分布式编程

GPU 编程

相关参考

内容来源于网络,如有侵权请联系客服删除

python并行编程语言_Python3 系列之 并行编程相关推荐

  1. THRUST:一个开源的、面向异构系统的并行编程语言:编程模型主要包括:数据并行性、任务并行性、内存管理、内存访问控制、原子操作、同步机制、错误处理机制、混合编程模型、运行时系统等

    作者:禅与计算机程序设计艺术 1.简介 https://github.com/NVIDIA/thrust 2021年8月,当代科技巨头Facebook宣布其开发了名为THRUST的高性能计算语言,可用 ...

  2. Python猫荐书系列之五:Python高性能编程

    2019独角兽企业重金招聘Python工程师标准>>> 稍微关心编程语言的使用趋势的人都知道,最近几年,国内最火的两种语言非 Python 与 Go 莫属,于是,隔三差五就会有人问: ...

  3. python语言入门编程猫-少儿编程语言Python入门课程,尽在厦门编程猫

    计算机有各种层面的编程语言,Python相对来说语法简单,是不错的入门级编程语言,适合学员作为编程启蒙课程进行学习,六年级到初一的孩子学习能力强,厦门编程猫注重这一时期孩子的个人专业技能发展,为学员以 ...

  4. 大牛推荐的30本经典编程书籍,从Python到前端全系列。

    注:为了方便阅读与收藏,我们也制作了30本书籍完整清单的Markdown.PDF版以及思维导图版,大家可以在实验楼公众号后台回复关键字"书籍推荐"获取. Python 系列(10本 ...

  5. python视频网站项目_价值2400元的python全栈开发系列Flask Python Web 网站编程视频

    2 e/ b4 F1 c' H$ D! X 价值2400元的python全栈开发系列Flask Python Web 网站编程视频-优品课堂' z3 _1 Y7 ]6 j4 z # p# r# g* ...

  6. SIGIA_4P python学习 列表 字典 集合 面对对象编程 闭包 装饰器 函数式编程 作用域 异常处理

    SIGIA_4P python学习 列表 字典 集合 面对对象编程 闭包 装饰器 函数式编程 作用域 异常处理 本文连接 简介 SIGIA_4P 网址 a. 课程OKR Objectives and ...

  7. Python 的Tkinter包系列之一:窗口初步

    Python 的Tkinter包系列之一:窗口初步 图形用户界面(GUI.Graphical User Interface)是基于图形的界面,windows就是一个图形用户界面的操作系统,而DOS是基 ...

  8. Python 的Tkinter包系列之六:好例子

    Python 的Tkinter包系列之六:好例子 用Tkinter写一个桌面应用程序,只需要三步: 1)创建一个窗体 2)把需要的控件放到窗体上(控件布局:设置控件在窗体内的位置以及填充.间隔等属性, ...

  9. Python 在编程语言中是什么地位?为什么很多大学不教 Python?

    随着这两年人工智能大热,作为AI届的"网红",Python的地位也有一定变化,所以今天再把这个问题扒出来研究一下. Python是当下非常热门的一种编程语言.热门到什么程度?我们首 ...

最新文章

  1. 屏幕滚动控件Scrollview
  2. 感性理解Berlekamp-Massey算法
  3. Tomcat log文件
  4. WORD给文档设置密码保护?
  5. c++删除数组中重复元素_leetcode 数组中重复的数字
  6. Linux学习笔记---常用shell命令
  7. Eclipse 快捷键 查看方法在那里被调用~
  8. 6.关于 MySQL
  9. li:hover与a:hover的区别
  10. Mac远程连接Linux桌面教程
  11. Mac双开微信(2种方法)、Win多开微信
  12. 【NISP一级】1.4 信息安全管理
  13. 易基因|靶基因DNA甲基化测序(Target-BS)
  14. 小蜜蜂java小游戏_小游戏-打小蜜蜂
  15. 867 · 四键键盘
  16. 避免c++程序在windows7或vista下关闭后出现程序兼容性助手
  17. python绘制对数函数_python中如何画对数函数图?
  18. win10禁用驱动程序强制签名_只需一个简单命令,在Win10上启用Windows恢复环境(WinRE)...
  19. Solr DIH上传索引操作实例
  20. 语音识别ASR - HTK(HResults)计算字错率WER、句错率SER

热门文章

  1. VS和matlab混合编程的推荐书籍!
  2. 谷歌OKR指导手册 (译)
  3. 在vue中let var 和const 区别
  4. Log4j的isdebugEnabled的作用
  5. 浅谈Spring框架注解的用法分析
  6. 从零开始学习jQuery (七) jQuery动画-让页面动起来!
  7. 极大似然法估计与极大验后法估计
  8. 云计算的学习路线是什么?云计算的应用场景分析
  9. 乱入Linux界的我是如何学习的
  10. Openstack平台搭建(先电版)