目录

文章目录

  • 目录
  • Python Co-routines
  • Asyncio Module
  • Event Loop
  • Async 与 Await
    • async def
    • async for
    • async with
  • Future 与 Task
  • 并发执行
  • 绑定回调
    • add_done_callback
    • call_soon
    • call_later
    • call_at
  • 任务自省
  • 同步机制
    • Semaphore(信号量)
    • Lock(锁)
    • Condition(条件)
    • Event(事件)
    • Queue(队列)
  • 参考文档

Python Co-routines

Python 对协程(Co-routines)的支持经历了多个版本:

  1. Python2.x 对协程的支持比较有限,通过 yield 关键字支持的生成器实现了一部分协程的功能但不完全。
  2. 第三方库 gevent 对协程有更好的支持。
  3. Python3.4 中提供了 asyncio 模块。
  4. Python3.5 中引入了 async/await 关键字。
  5. Python3.6 中 asyncio 模块更加完善和稳定。
  6. Python3.7 中内置了 async/await 关键字。

Asyncio Module

asyncio — Asynchronous I/O, event loop, coroutines and tasks

  • Asynchronous I/O(异步 I/O):只发出 I/O 的执行,并不等待 I/O 的结果,释放 CPU,提高程序运行效率。
  • Event loop(事件循环):事件循环是一种处理多并发的有效手段。通过启动一个无限的事件循环,提供事件监测、事件触发等处理工作。可以将 Coroutines 对象注册到事件循环上,当特定的某个事件发生时,就会调用相应的协程函数。
  • Coroutines(协程):通过 async def 声明一个协程函数,对协程函数的调用不会立即执行函数体,而是返回一个 Coroutines 对象。 Coroutines 对象需要注册到事件循环,由事件循环调用。
  • Tasks(任务):是对 Coroutines 对象的进一步封装,包含了面向任务的各种状态,支持任务创建、任务取消等管理功能。在注册事件循环时,通过 run_until_complete() 方法会将 Coroutines 对象包装成为了一个 Task 对象。
  • Futures(将来执行任务的结果):Future 类是 Task 类的父类,实现了 Task 对象执行结果的保存。

Event Loop

asyncio 编程模型的核心就是一个基于消息的事件循环,程序从 asyncio 模块中获取到一个 Event Loop 的引用,然后把需要执行的协程都扔到 Event Loop 中执行,就实现了异步 I/O。

asyncio 的 Event Loop 拥有多种方式去启动协程,最简单的一种就是使用 run_until_complete() 方法。

import asyncioasync def coroutine():print('in coroutine')return 'result'coro = coroutine()event_loop = asyncio.get_event_loop()
try:    print('entering event loop')result = event_loop.run_until_complete(coro)print(f'it returned: {result}')
finally:print('closing event loop')event_loop.close()

OUTPUT:

entering event loop
in coroutine
it returned: result
closing event loop

Async 与 Await

async 关键字用于创建一个协程(返回一个 Coroutines 对象),该 Coroutines 对象需要注册到事件循环,由事件循环调用。针对不同的场景场景有 async def、async for、async with 等几种使方式。

await 关键字用于针对阻塞的 I/O 操作进行挂起,作用与生成器中的 yield(让出)关键字相同,协程函数将会让出控制权。也就是说,当执行协程函数体的过程中遇到了 await 语句,Event Loop 就会把该协程挂起,继而执行其他的协程,直到其他的协程也挂起或者执行完毕后,再执行下一个协程。

async def

async def 用于声明一个协程函数,对协程函数的调用不会立即执行函数体,而是返回一个 Coroutines 对象。一个简单的例子如前文所述。

值的注意的是,async def 支持 “链式协程(Chain coroutines)”,即:父协程可以创建子协程,以此往复,形成一条由若干个协程组成的调用链,并且彼此之间遵循顺序执行。

import asyncioasync def compute(x, y):print("Compute %s + %s ..." % (x, y))await asyncio.sleep(1.0)return x + yasync def print_sum(x, y):result = await compute(x, y)print("%s + %s = %s" % (x, y, result))loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

OUTPUT:

Compute 1 + 2 ...
1 + 2 = 3

async for

async for 就相当于一个异步的生成器(Generator)。在 Python3.5 之前,实现异步生成器是一件很麻烦的事情,需要定义一个类,并且定义 __iter____next__ 方法,之后才能使用。

现在只需要使用 async for 就能够简单的实现了,而且还支持列表解析等语法。

import asyncioasync def g2():yield 1yield 2async def g1():async for v in g2():print(v)return [v * 2 async for v in g2()]loop = asyncio.get_event_loop()
try:result = loop.run_until_complete(g1())print(f'Result is {result}')
finally:loop.close()

async with

with 是 Python 的 Context Management(上下文管理)语法糖,async with 即是异步的 with。

  • 实现 with 类型
class Sample:def __enter__(self):print "in __enter__"return "Foo"def __exit__(self, exc_type, exc_val, exc_tb):print "in __exit__"def get_sample():return Sample()with get_sample() as sample:print "Sample: ", sample

注意,async with 语法糖同样需要首先实现 async with 类型。目前原生支持 async with 类型的第三方库有比较典型的 aiohttp。

import aiohttpasync def fetch_page(url):async with aiohttp.ClientSession() as session:async with session.get(url) as response:return await response.json()loop = asyncio.get_event_loop()
result = loop.run_until_complete(fetch_page('http://httpbin.org/get?a=2'))
print(f"Args: {result.get('args')}")
loop.close()

Future 与 Task

正如前文所言,async def 返回的 Coroutines 对象是不能直接运行的,而是在将 Coroutines 对象注册到 Event Loop 之后,由 run_until_complete() 方法将 Coroutines 对象包装成为了一个 Task 对象。

而 Task 类又是 Future 类的子类,Future 类实现了保存 Task 对象运行后的状态,Future 对象可以用于在未来获取协程的执行结果。

当有需要时,我们可以显式地创建了 task 对象。并且,在 task 对象加入 Event Loop 之前的状态为 Pending,执行当完成后的状态为 Finished。

import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()task = loop.create_task(coroutine)
print(task)loop.run_until_complete(task)
print(task)print("Time:", now()-start)

OUTPUT:

<Task pending coro=<do_some_work() running at test_asyncio.py:8>>
waiting: 2
<Task finished coro=<do_some_work() done, defined at test_asyncio.py:8> result=None>
Time: 0.0010194778442382812

另外,除了 loop.create_task() 方法之外,也可以使用 asyncio.ensure_future(coroutine) 方法来将一个 Coroutines 对象封装成一个 Task 对象,但两者有着不同的使用场景。后者更多的是用于确保可以得到相应的 Future Result。

  • loop.create_task
AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.This method was added in Python 3.4.2. Use the async() function to support also older Python versions.
  • asyncio.ensure_future
asyncio.ensure_future(coro_or_future, *, loop=None)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.If the argument is a Future, it is returned directly.

并发执行

前面的例子中都没有十分显着的并发特性。asyncio 提供了 wait(tasks) 和 gather(*coroutines) 两个方法来收集(一次性接受)多个 Tasks 对象,并返回多个 Futures 对象。前者接受一个 Tasks List 类型参数,返回无序的 Future List 对象结果;后者接受任意个(形参可变长)Coroutines 类型参数,返回有序的 Done List 和 Pendings List 组成的元组类型结果。

import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)start = now()coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)
]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))for task in tasks:print("Task ret:", task.result())print("Time:", now()-start)

OUTPUT:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004698991775513

或者可以这样实现:

import asyncioasync def func1(i):print(f"协程函数{i}马上开始执行。")await asyncio.sleep(2)print(f"协程函数{i}执行完毕!")async def main():tasks = []for i in range(1, 5):tasks.append(asyncio.create_task(func1(i)))await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())
  • asyncio.wait
Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).The sequence futures must not be empty.timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.return_when indicates when this function should return.
  • asyncio.gather
Return a future aggregating results from the given coroutine objects or futures.All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

绑定回调

asyncio 支持 “绑定回调” 机制。除了 Callback 函数绑定之外,Event Loop 还提供了 3 中普通函数的调用方式:

  1. call_soon:立即调用普通函数。
  2. call_later:在 Delay 了一段时间后调用普通函数。
  3. call_at:在 Delay 了一段相对时间(Absolute time,相对于 event loop’s time() 之后)后调用普通函数。

add_done_callback

在 Task 执行完成时,可以立即执行 Callback 函数。Callback 函数的最后一个参数是 Future 对象,在 Callback 函数中,可以通过 Future 对象来获取协程对象的返回值。

通过 add_done_callback() 方法为 Task 对象绑定一个 Callback 函数,当 Task(本质是 Coroutines)对象执行完成时,就会调用 Callback 函数。

import time
import asyncionow = lambda: time.time()async def do_some_work(x):print("waiting:",x)return "Done after {}s".format(x)def callback(future):print("callback:", future.result())start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)
print(task)task.add_done_callback(callback)
print(task)loop.run_until_complete(task)print("Time:", now()-start)

OUTPUT:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
Time: 0.00039196014404296875

call_soon

import threading
import asynciodef callback(args):print(f'callback: {args} ({threading.currentThread()})')async def my_coroutine(loop):print(f'I ain\'t got no money ({threading.currentThread()})')await asyncio.sleep(1)loop.call_soon(callback, 'first time')loop.call_soon(callback, 'second time')print(f'Do not go gentle into that good night ({threading.currentThread()})')if __name__ == '__main__':loopEvent = asyncio.get_event_loop()tasks = [my_coroutine(loopEvent), my_coroutine(loopEvent)]loopEvent.run_until_complete(asyncio.wait(tasks))loopEvent.close()

call_later

import threading
import asynciodef callback(args):print(f'callback: {args} ({threading.currentThread()})')async def my_coroutine(loop):print(f'I ain\'t got no money ({threading.currentThread()})')loop.call_later(0.4, callback, 'second time')loop.call_soon(callback, 'first time')await asyncio.sleep(0.5)print(f'Do not go gentle into that good night ({threading.currentThread()})')if __name__ == '__main__':loopEvent = asyncio.get_event_loop()tasks = [my_coroutine(loopEvent), my_coroutine(loopEvent)]loopEvent.run_until_complete(asyncio.wait(tasks))loopEvent.close()

call_at

import threading
import asynciodef callback(args):print(f'callback: {args} ({threading.currentThread()})')async def my_coroutine(loop):print(f'I ain\'t got no money ({threading.currentThread()})')loop.call_at(loop.time() + 0.2, callback, 'second time')loop.call_soon(callback, 'first time')await asyncio.sleep(0.5)print(f'Do not go gentle into that good night ({threading.currentThread()})')if __name__ == '__main__':loopEvent = asyncio.get_event_loop()tasks = [my_coroutine(loopEvent), my_coroutine(loopEvent)]loopEvent.run_until_complete(asyncio.wait(tasks))loopEvent.close()

任务自省

asyncio 提供了任务自省机制,可以将 Event Loop 的 current_task 以及 all_tasks 进行返回,继而程序可以判断 Event Loop 的运行情况。

Future 对象有以下几个状态:

  1. Pending:新建时的状态。
  2. Running:运行时的状态。
  3. Done:完成时的状态。
  4. Cancelled:已取消的状态。

如果需要强制停止 Event Loop,就需要先把已经注册到 Event Loop 中的 Tasks 对象都 Cancel 掉。可以通过 task.cancel() 来强制取消任务的执行,还可以通过 asyncio.Task 来获取已经注册到 Event Loop 中的 Tasks 对象。

import asyncio
import timenow = lambda: time.time()async def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3),
]start = now()
loop = asyncio.get_event_loop()
try:loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt:print(asyncio.Task.all_tasks())for task in asyncio.Task.all_tasks():print(task.cancel())loop.stop()loop.run_forever()
finally:loop.close()print("Time:", now()-start)

OUTPUT:

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task pending coro=<do_some_work() running at test_asyncio.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd39f9c5a08>()]> cb=[_wait.<locals>._on_completion() at /usr/lib64/python3.6/asyncio/tasks.py:380]>, <Task pending coro=<do_some_work() running at test_asyncio.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd39f9c59d8>()]> cb=[_wait.<locals>._on_completion() at /usr/lib64/python3.6/asyncio/tasks.py:380]>, <Task pending coro=<do_some_work() running at test_asyncio.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd39f9c5a68>()]> cb=[_wait.<locals>._on_completion() at /usr/lib64/python3.6/asyncio/tasks.py:380]>, <Task pending coro=<wait() running at /usr/lib64/python3.6/asyncio/tasks.py:313> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd39f9c5bb8>()]>>}
True
True
True
True
Time: 0.828188419342041

同步机制

为了保证并发安全,asyncio 模块也提供了与多线程 / 多进程同步机制相同的原语。

Semaphore(信号量)

import aiohttp
import asyncioNUMBERS = range(6)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)async def fetch_async(a):async with aiohttp.request('GET', URL.format(a)) as r:data = await r.json()return data['args']['a']async def print_result(a):with (await sema):r = await fetch_async(a)print('fetch({}) = {}'.format(a, r))loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)

Lock(锁)

import asyncio
import functoolsdef unlock(lock):print('callback releasing lock')lock.release()async def test(locker, lock):print('{} waiting for the lock'.format(locker))with await lock:print('{} acquired lock'.format(locker))print('{} released lock'.format(locker))async def main(loop):lock = asyncio.Lock()await lock.acquire()loop.call_later(0.1, functools.partial(unlock, lock))await asyncio.wait([test('l1', lock), test('l2', lock)])loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Condition(条件)

import asyncio
import functoolsasync def consumer(cond, name, second):await asyncio.sleep(second)with await cond:await cond.wait()print('{}: Resource is available to consumer'.format(name))async def producer(cond):await asyncio.sleep(2)for n in range(1, 3):with await cond:print('notifying consumer {}'.format(n))cond.notify(n=n)await asyncio.sleep(0.1)async def producer2(cond):await asyncio.sleep(2)with await cond:print('Making resource available')cond.notify_all()async def main(loop):condition = asyncio.Condition()task = loop.create_task(producer(condition))consumers = [consumer(condition, name, index)for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()task = loop.create_task(producer2(condition))consumers = [consumer(condition, name, index)for index, name in enumerate(('c1', 'c2'))]await asyncio.wait(consumers)task.cancel()loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Event(事件)

import asyncio
import functoolsdef set_event(event):print('setting event in callback')event.set()async def test(name, event):print('{} waiting for event'.format(name))await event.wait()print('{} triggered'.format(name))async def main(loop):event = asyncio.Event()print('event start state: {}'.format(event.is_set()))loop.call_later(0.1, functools.partial(set_event, event))await asyncio.wait([test('e1', event), test('e2', event)])print('event end state: {}'.format(event.is_set()))loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Queue(队列)

import asyncio
import random
import aiohttpNUMBERS = random.sample(range(100), 7)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)async def fetch_async(a):async with aiohttp.request('GET', URL.format(a)) as r:data = await r.json()return data['args']['a']async def collect_result(a):with (await sema):return await fetch_async(a)async def produce(queue):for num in NUMBERS:print('producing {}'.format(num))item = (num, num)await queue.put(item)async def consume(queue):while 1:item = await queue.get()num = item[0]rs = await collect_result(num)print('consuming {}...'.format(rs))queue.task_done()async def run():queue = asyncio.PriorityQueue()consumer = asyncio.ensure_future(consume(queue))await produce(queue)await queue.join()consumer.cancel()loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

参考文档

https://www.escapelife.site/posts/c8f1143.html
https://www.jb51.net/article/212475.htm
https://docs.python.org/zh-cn/3.6/library/asyncio-task.html

Python Module — asyncio 协程并发相关推荐

  1. python并发之协程_python并发编程之协程

    一 引子 本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态 cpu正在运行一个任务,会在两种情况下切走去 ...

  2. python协程 并发数量_Python-并发编程(协程)

    今天说说协程 一.引子 本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态 cpu正在运行一个任务,会在两 ...

  3. python 协程并发

    # 协程并发 import gevent from gevent import monkey monkey.patch_all() from socket import * from time imp ...

  4. Python中的协程

    Python中的协程 文章目录 Python中的协程 一.什么是协程 1.概念 2.协程的好处 3.缺点 二.了解协程的过程 1.yield工作原理 2.协程在运行过程中有四个状态: 3.预激协程的装 ...

  5. python中的协程(二)

    协程 1.协程: 单线程实现并发 在应用程序里控制多个任务的切换+保存状态 优点: 应用程序级别速度要远远高于操作系统的切换 缺点: 多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地,该线程内的其他 ...

  6. Python与Golang协程异同

    背景知识 这里先给出一些常用的知识点简要说明,以便理解后面的文章内容. 进程的定义: 进程,是计算机中已运行程序的实体.程序本身只是指令.数据及其组织形式的描述,进程才是程序的真正运行实例. 线程的定 ...

  7. windows下多进程加协程并发模式

    好久没更新博客了.正好最近要整理一下最近这段时间做过的项目以及学习python的一些心得.如标题所示,今天就来说说windows下多进程加协程并发模式.其实网上还是蛮多在linux下的多进程加协程并发 ...

  8. 二十五、深入Python中的协程

    @Author: Runsen 一说并发,你肯定想到了多线程+进程模型,确实,多线程+进程,正是解决并发问题的经典模型之一.但对于多核CPU,利用多进程+协程的方式,能充分利用CPU,获得极高的性能. ...

  9. Python并发之协程gevent基础(5)

    1,gevent介绍 gevent是第三方库,通过 greenlet 实现 coroutine,创建.调度的开销比 线程(thread) 还小,因此程序内部的 执行流 效率高. gevent 实现了 ...

最新文章

  1. InnoDB解决幻读的方案--LBCCMVCC
  2. 求13-23+33……-1003(3.6)(Java)
  3. 大轴纸怎么上机器人_岛国首发和尚机器人,地位直逼观世音
  4. C++类和对象学习总结
  5. 大数据查询——HBase读写设计与实践--转
  6. 【机器学习入门笔记4:OpenCV图片的写入和不同图片质量保存】20190203
  7. MapxTreme2008的打包过程
  8. Angry Birds Rio 攻略 1-1
  9. 在linux运行math_neon库,linux - 仅使用带交叉编译器的本地库 - 堆栈内存溢出
  10. delphi socket 流的使用_Socket
  11. dll加载问题的解决方法
  12. 更新MYSQL生成日历表,支持跨年份 存储过程
  13. bash脚本创建变量_创建一个Bash脚本模板
  14. Spring Boot如何以优雅的姿势校验参数
  15. 【NOIP2013模拟】七夕祭
  16. 什么是Web应用程序防火墙?
  17. NetBean中添加tomcat时候出现错误
  18. 航空货运物流的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告
  19. 未来一年西藏旅行时间表,此生必去一次。
  20. element-ui 带序号表格如何让序号递增

热门文章

  1. Unity网络多玩家游戏开发教程(上册)
  2. ms-sql是mysql吗_mssql和mysql有哪些区别?
  3. oracle中or的替函数,Oracle常用内置Or自定义函数-SQL宝典
  4. oracle sql 导入mysql数据库备份_Oracle 备份、导入数据库命令
  5. pytorch reshape_pytorch常用总结 之 tensor维度变换
  6. dataframe常用操作_【Data Mining】机器学习三剑客之Pandas常用算法总结上
  7. 澳大利亚科学家开发出可用于脑机接口的新型碳基生物传感器
  8. Cell子刊:MRI有助于揭示睡眠之谜
  9. seaborn系列 (16) | 变量关系组图pairplot()
  10. Xbox“天蝎计划”中国区负责人:“今年的E3展会将超乎你想像”