python 中的协程

从我个人的理解来说一说 python中的协程,我们知道 linux 中的线程比进程轻量级,切换成本低,协程比线程更轻量级。所以切换成本耕地,是基于生成器来实现的也就是 yield 语句,后来又有 yeild from 子协程的语法出现,生成器是迭代器,迭代器不是生成器,生成器能够输出值,也可以接收值,可以 hang/resume。当然在 python3.5 使用了新的语法 async/await, 本质没啥变化,仅仅是防止在语法上的混淆。可以进行隐式切换或者显式切换,在一个线程中实现多协程切换,asyncio 就是显式的来切换协程。

Asyncio 异步框架

asyncio 框架是建立在 epoll、poll、seledct等功能基之上的,下文统一用 epoll 代替,当然使用那种事件机制取决于操作系统,在使用asyncio时,大部分操作是用asyncio运行任务,运行任务时 asyncio 并没有使用epoll 机制,因为我们知道 epoll 是需要注册文件描述符的,是在使用协程,至于协程和 epoll 怎么结合运行的,下文会细说。epoll 是用来实现异步 web 框架用的。协程使用来运行用户的 task。

Asyncio 的运行流程

简单写一个异步任务,这个任务简单点,因为本篇文章主要讲的是 asyncio 的运行机制而不是 asyncio 的使用

import asyncio

async def print_hello_after3second():

await asyncio.sleep(3)

print("hello")

asyncio.run(print_hello_after3second)

这里使用的 run 这个接口,使用 asyncio 运行异步任务有很多种方式,run 我觉得更像是一个命令行,从外面看接口简单,其实内部帮忙做了很多事情.为了节省篇幅以及使得文章看起来清晰每个代码片段只截取重要部分,其余的省略。

asyncio/runners.py

#run 第一个参数要是个协程

def run(main, *, debug=False):

# loop 理解成 epoll 就好

events.set_event_loop(loop)

#重点在这里

loop.run_until_complete(loop.shutdown_asyncgens())

....

asyncio/base_events.py

def run_until_complete(self, future):

....

# asyncio 会把我们传进来的任务封装成 task,也可以说是 future,task 是 future 的子类

future = tasks.ensure_future(future, loop=self

# 里面有 _run_once 是用来调度事件循环的

self.run_forever()

....

asyncio/task.py

# ensure_future 也是一个传递任务的接口

def ensure_future(coro_or_future, *, loop=None):

....

# 在调用 Task 类中的__init__方法进行初始化,同时将 Task 类中的 _step方法作为回掉函数注册到了事件循环中

task = loop.create_task(coro_or_future)

....

asyncio/base_events.py

#这个方法很重要所以在这里全部列出,里面包含了asyncio的调用思想,调度 task 和 epoll

def _run_once(self):

"""Run one full iteration of the event loop.

This calls all currently ready callbacks, polls for I/O,

schedules the resulting callbacks, and finally schedules

'call_later' callbacks.

"""

print("run once")

sched_count = len(self._scheduled)

if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and

self._timer_cancelled_count / sched_count >

_MIN_CANCELLED_TIMER_HANDLES_FRACTION):

# Remove delayed calls that were cancelled if their number

# is too high

new_scheduled = []

for handle in self._scheduled:

if handle._cancelled:

handle._scheduled = False

else:

new_scheduled.append(handle)

heapq.heapify(new_scheduled)

self._scheduled = new_scheduled

self._timer_cancelled_count = 0

else:

# Remove delayed calls that were cancelled from head of queue.

while self._scheduled and self._scheduled[0]._cancelled:

self._timer_cancelled_count -= 1

handle = heapq.heappop(self._scheduled)

handle._scheduled = False

timeout = None

if self._ready or self._stopping:

timeout = 0

elif self._scheduled:

# Compute the desired timeout.

when = self._scheduled[0]._when

timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

if self._debug and timeout != 0:

t0 = self.time()

# 这里是在检查 epoll 事件

event_list = self._selector.select(timeout)

dt = self.time() - t0

if dt >= 1.0:

level = logging.INFO

else:

level = logging.DEBUG

nevent = len(event_list)

if timeout is None:

logger.log(level, 'poll took %.3f ms: %s events',

dt * 1e3, nevent)

elif nevent:

logger.log(level,

'poll %.3f ms took %.3f ms: %s events',

timeout * 1e3, dt * 1e3, nevent)

elif dt >= 1.0:

logger.log(level,

'poll %.3f ms took %.3f ms: timeout',

timeout * 1e3, dt * 1e3)

else:

event_list = self._selector.select(timeout)

#这里将从epoll中获取到可读可写的事件后,添加回掉函数到self._ready这个列表中,这个列表同时也包含了用户添加的异步任务,那么在什么时候添加进来的呢?

self._process_events(event_list)

# Handle 'later' callbacks that are ready.

# 定时任务,可以使得异步任务在未来的某个事件点运行,用堆实现了优先级队列,按照时间排序

end_time = self.time() + self._clock_resolution

while self._scheduled:

handle = self._scheduled[0]

if handle._when >= end_time:

break

handle = heapq.heappop(self._scheduled)

handle._scheduled = False

self._ready.append(handle)

# This is the only place where callbacks are actually *called*.

# All other places just add them to ready.

# Note: We run all currently scheduled callbacks, but not any

# callbacks scheduled by callbacks run this time around --

# they will be run the next time (after another I/O poll).

# Use an idiom that is thread-safe without using locks.

ntodo = len(self._ready)

for i in range(ntodo):

handle = self._ready.popleft()

if handle._cancelled:

continue

if self._debug:

try:

self._current_handle = handle

t0 = self.time()

#我们的异步任务对应的回掉函数被封装成了 handler 实例了,这个实例是协程安全的,

handle._run()

dt = self.time() - t0

if dt >= self.slow_callback_duration:

logger.warning('Executing %s took %.3f seconds',

_format_handle(handle), dt)

finally:

self._current_handle = None

else:

print("hanle %s", handle)

handle._run()

handle = None # Needed to break cycles when an exception occurs.

总结一下,asyncio 将异步任务和epoll获取来的可读可写的回掉事件都放到了 self._ready 这个列表中统一运行。那么异步任务什么时候被放到 self._ready 这个列表中来的呢

asyncio/base_events.py

#谁在调用这个函数呢,前文说过我们的异步任务都会被封装成 asyncio 中的 Task 类的 task 类的 __init__ 中的这个方法调用了 call_soon , 那么 call_at 这种未来执行的任务呢?当然最终也会调用 call_soon 的,在运行时间到的时候。

def call_soon(self, callback, *args, context=None):

"""Arrange for a callback to be called as soon as possible.

This operates as a FIFO queue: callbacks are called in the

order in which they are registered. Each callback will be

called exactly once.

Any positional arguments after the callback will be passed to

the callback when it is called.

"""

self._check_closed()

if self._debug:

self._check_thread()

self._check_callback(callback, 'call_soon')

handle = self._call_soon(callback, args, context)

if handle._source_traceback:

del handle._source_traceback[-1]

return handle

# 没错就是在这里进行添加和封装成 handler 的

def _call_soon(self, callback, args, context):

print("register")

handle = events.Handle(callback, args, self, context)

if handle._source_traceback:

del handle._source_traceback[-1]

self._ready.append(handle)

return handle

以上的一些说明只是讲解了 asyncio 如何运行用户侧 task 以及异步事件的,其实用户侧异步task,被隐藏在了epoll的概念中,这也是 asyncio 很高明之处。

到此仅仅说明了 asyncio 是如何调度 task 和 epoll 事件的回掉的执行。但是异步task的回掉在这里很重要,也就是上文提到的 _step 这个方法,这个方法在 Task 类中。这也关系到了 aio-libs 中这些python的异步库如何改造,下文会说如何自己实现python异步库的改造和编写。

asyncio/tasks.py

#方法很重要,不进行删减

def __step(self, exc=None):

if self.done():

raise futures.InvalidStateError(

f'_step(): already done: {self!r}, {exc!r}')

if self._must_cancel:

if not isinstance(exc, futures.CancelledError):

exc = futures.CancelledError()

self._must_cancel = False

# 这个coro 是我们添加进来的异步任务

coro = self._coro

self._fut_waiter = None

_enter_task(self._loop, self)

# Call either coro.throw(exc) or coro.send(None).

try:

if exc is None:

# We use the `send` method directly, because coroutines

# don't have `__iter__` and `__next__` methods.

#触发异步任务运行

result = coro.send(None)

else:

result = coro.throw(exc)

# 协程运行结束会抛出这个异常

except StopIteration as exc:

if self._must_cancel:

# Task is cancelled right before coro stops.

self._must_cancel = False

super().set_exception(futures.CancelledError())

else:

# future(不打算翻译成中文啦,也不解释在编程语言或者python中什么意思),抛出 StopIteration 异常代表异步任务运行结束,设置结果给 future,很重要,不设置结果异步任务就停不下来了(哈哈!)

super().set_result(exc.value)

except futures.CancelledError:

super().cancel() # I.e., Future.cancel(self).

except Exception as exc:

super().set_exception(exc)

except BaseException as exc:

super().set_exception(exc)

raise

else:

#result 是一个future实例

blocking = getattr(result, '_asyncio_future_blocking', None)

if blocking is not None:

# Yielded Future must come from Future.__iter__().

if futures._get_loop(result) is not self._loop:

new_exc = RuntimeError(

f'Task {self!r} got Future '

f'{result!r} attached to a different loop')

self._loop.call_soon(

self.__step, new_exc, context=self._context)

elif blocking:

if result is self:

new_exc = RuntimeError(

f'Task cannot await on itself: {self!r}')

self._loop.call_soon(

self.__step, new_exc, context=self._context)

else:

#注册wakeup到future的callback中,这个wakeup是用来提取future中的结果用的

result._asyncio_future_blocking = False

result.add_done_callback(

self.__wakeup, context=self._context)

self._fut_waiter = result

if self._must_cancel:

if self._fut_waiter.cancel():

self._must_cancel = False

#小面除了 result 为none的分之外都是出现异常

else:

new_exc = RuntimeError(

f'yield was used instead of yield from '

f'in task {self!r} with {result!r}')

print("call soon")

self._loop.call_soon(

self.__step, new_exc, context=self._context)

elif result is None:

# Bare yield relinquishes control for one event loop iteration.

self._loop.call_soon(self.__step, context=self._context)

elif inspect.isgenerator(result):

# Yielding a generator is just wrong.

new_exc = RuntimeError(

f'yield was used instead of yield from for '

f'generator in task {self!r} with {result!r}')

self._loop.call_soon(

self.__step, new_exc, context=self._context)

else:

# Yielding something else is an error.

new_exc = RuntimeError(f'Task got bad yield: {result!r}')

self._loop.call_soon(

self.__step, new_exc, context=self._context)

finally:

_leave_task(self._loop, self)

self = None # Needed to break cycles when an exception occurs.

_run_once 方法是调度,调度 task 和 epoll 事件,_step 是在处理 await something() 这种语句,_step方法不是很好理解,用一段话总结一下 _step 做的事情。当然描述起来也是很抽象,

用户写的协程函数,会被 asyncio 封装成 task,协程函数作为 Task 类中 _step方法中的一个属性,_step又会被封装成 handler 作为异步事件被调用,每一个协程函数都有一个 future 和 wakeup 与之绑定,函数运行结果会设置到 future 中,wakeup 作为 future 的回掉被调用(真正调用的还是事件循环),当设置好结果后 wakeup 唤醒协程函数来提取结果,

async def a():

await b() # 协程函数 b 有个future 和 wakeup 与之绑定

asyncio 无论是使用还是理解原理都是很难的,不像 golang 这种原生支持协程,python的协程经历了很慢长的步伐,不去理解背后的原理在使用过程会出现很多问题。asyncio 的生态也还不完善,有时需要自己去实现异步改造。所以理解 asyncio 背后的原理很重要,只有知道原理后才知道如何自己去改造或者写出与 asyncio 配套的工具。

参考

python asyncio原理_Asyncio 源码分析相关推荐

  1. ConcurrentHashMap实现原理及源码分析

    ConcurrentHashMap是Java并发包中提供的一个线程安全且高效的HashMap实现(若对HashMap的实现原理还不甚了解,可参考我的另一篇文章HashMap实现原理及源码分析),Con ...

  2. concurrenthashmap_ConcurrentHashMap实现原理及源码分析

    ConcurrentHashMap是Java并发包中提供的一个线程安全且高效的HashMap实现(若对HashMap的实现原理还不甚了解,可参考我的另一篇文章HashMap实现原理及源码分析),Con ...

  3. SIFT原理与源码分析:DoG尺度空间构造

    <SIFT原理与源码分析>系列文章索引:http://blog.csdn.net/xiaowei_cqu/article/details/8069548 尺度空间理论 自然界中的物体随着观 ...

  4. 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析

    在博文<深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析 >中我们提到了: 使用Sort等对数据进行排序,其中用到了TimSort 这篇博文我们就来 ...

  5. 深入理解Spark 2.1 Core (十一):Shuffle Reduce 端的原理与源码分析

    我们曾经在<深入理解Spark 2.1 Core (一):RDD的原理与源码分析 >讲解过: 为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RD ...

  6. 深入理解Spark 2.1 Core (十):Shuffle Map 端的原理与源码分析

    在上一篇<深入理解Spark 2.1 Core (九):迭代计算和Shuffle的原理与源码分析>提到经过迭代计算后, SortShuffleWriter.write中: // 根据排序方 ...

  7. 深入理解Spark 2.1 Core (八):Standalone模式容错及HA的原理与源码分析

    第五.第六.第七篇博文,我们讲解了Standalone模式集群是如何启动的,一个App起来了后,集群是如何分配资源,Worker启动Executor的,Task来是如何执行它,执行得到的结果如何处理, ...

  8. 深入理解Spark 2.1 Core (七):Standalone模式任务执行的原理与源码分析

    这篇博文,我们就来讲讲Executor启动后,是如何在Executor上执行Task的,以及其后续处理. 执行Task 我们在<深入理解Spark 2.1 Core (三):任务调度器的原理与源 ...

  9. 深入理解Spark 2.1 Core (六):Standalone模式运行的原理与源码分析

    我们讲到了如何启动Master和Worker,还讲到了如何回收资源.但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的.这篇博文,我们就来讲一下AppClient ...

最新文章

  1. C#与RSS亲密接触
  2. Ubuntu14.04 64位上配置终端显示git分支名称
  3. python【蓝桥杯vip练习题库】ALGO-77 斜率计算
  4. matlab未定义函数lp,matlab未定义函数或变量(附上源代码)
  5. Kali下JDK1.8的安装过程
  6. 同样的电器,为什么官网能比实体店的价格便宜那么多?
  7. 移动端 IP 优选方案
  8. mac11.14 mysql_mysql 5.7 11 章 数据类型(1)
  9. android setflag找不到_Android面试题4–Activity之Intent的Flag
  10. 可以用来做ppt的网页效果
  11. 邬贺铨/余晓晖/田溯宁…千家从业者,数十位行业大咖共同烹制了一场怎样的AIoT“盛宴”?
  12. 面向服务的架构SOA
  13. 链表c++语言 解析,C++ 单链表的基本操作(详解)
  14. Redis内存回收机制(Redis 过期策略、淘汰策略)
  15. 财富提升成都IT产业吸引力
  16. 翠竹林 Java 实现对Sql语句解析
  17. C++ 填入数字1-9 使数学等式成立
  18. Javasctipt面试题整理
  19. 可以修饰的基团有:氨基类,NHBOC类,Fmoc类不等,DSPE-PEG7-Mal
  20. STL容器底层实现数据结构

热门文章

  1. java 浮点类型声明_java 浮点数据类型
  2. 次世代游戏建模工作流程是什么?
  3. 001 扩展传感器分类介绍
  4. kali中安装arachni出现的问题
  5. 计算机鸣响的十大故障,电脑运行的时候呲呲啦啦的响
  6. lisp修改断面属性_HEC-RAS精要(三)——断面附加属性
  7. 【大模型】Lamini:用于快速定制模型的 LLM 引擎 | Introducing Lamini, the LLM Engine for Rapidly Customizing Models
  8. java有画图的库吗,Java基础之画图板
  9. 阿里开源流程引擎,轻松干掉 activity 、flowable
  10. pyecharts 实现双 Y 轴