asyncio.Future

第三次更新,2020-02-13

Future 的作用

负责终止 loop 的循环。

1、loop 停止循环的唯一条件为 loop._stopping = True

2、将 loop 的 _stopping 设置为 True 的活,是 future 负责干,其实就是 future 的 callbacks 中实现了终止 loop 的函数,而在给 future 对象 set_result() 时,会将 callbacks 中的函数注册到 loop 对象的 _ready 队列中,故 loop 在轮询执行之后,其状态被设置为 _stopping

事件轮询图

3、比如官方 sample 里

import asyncio

# 定义一个协程

async def slow_operation(future):

await asyncio.sleep(1)

future.set_result('Future is done!')

# 获得全局循环事件

loop = asyncio.get_event_loop()

# 实例化期物对象

future = asyncio.Future()

asyncio.ensure_future(slow_operation(future))

# loop 的 run_until_complete 会将 _run_until_complete_cb 添加到 future 的完成回调列表中。而 _run_until_complete_cb 中会执行 loop.stop() 方法

loop.run_until_complete(future)

print(future.result())

# 关闭事件循环对象

loop.close()

4、可以发现,loop 事件循环执行最外层一般是 Future 期物对象,期物会包裹其他 task 对象,在 future 对象的状态为 PENDING 时,会一直对 future 包裹的 task 对象调用 send(None) 方法,直到报出 StopIteration 方法。期物包裹的每一个 task 对象在执行完成后都会执行回调,如果期物包裹的所有 task 均执行完毕,则执行期物的 future.set_result() 方法,此方法不仅会将期物的状态设置为 FINISHED,还会执行期物的完成回调方法,执行 loop.stop() 方法。此时 loop 停止循环。

Future 的关键属性和方法

属性

_state = _PENDING # 表征状态,5 颗星

_result = None # 存储结果值,5 颗星

_exception

_loop # 5 颗星

_source_traceback

_asyncio_future_blocking = False # 用以在 task._step() 方法中判断是否为期物,以及期物是否已执行完毕,重要指数 5 颗星

_log_traceback

核心方法

__iter__ 方法 # 重要指数,5 颗星

def __iter__(self):

if not self.done():

# 如果期物的状态非 FINISHED,则将其 _asyncio_future_blocking 属性设置为 True,并且 yield self,这个设置是告诉 task,期物未执行完毕,期物的值应该在期物执行完毕后再获取。

self._asyncio_future_blocking = True

yield self # This tells Task to wait for completion.

assert self.done(), "yield from wasn't used with future" # 此处用于在期物没有执行完下强行获取期物值的情况。

return self.result() # May raise too.

set_result 方法,4.5 颗星

def set_result(self, result):

if self._state != _PENDING:

raise InvalidStateError('{}: {!r}'.format(self._state, self))

self._result = result # 给期物的结果值属性赋值

self._state = _FINISHED # 设置期物的状态为 FINISHED

self._schedule_callbacks() # 执行期物的完成回调方法,即将回调方法注册到 loop 对象的 _ready 队列中

add_done_callback 方法,4 颗星

def add_done_callback(self, fn):

if self._state != _PENDING:

self._loop.call_soon(fn, self) # 如果非 pending 状态,则立即将回调方法注册到 loop 对象的 _ready 队列中

else:

self._callbacks.append(fn) # 如果 pending 状态,则将回调添加到回调列表中,等待给期物设置 result 是调用执行

其他的方法

__init__ 方法:初始化 loop 对象和 _callbacks 列表

cancel 方法:将 future 的状态变更为 CANCELLED, 执行完成回调列表

_schedule_callbacks 方法:执行 _callbacks 列表的回调,即往 loop 对象的 _ready 队列中注册方法

总结(期物的设计思想)

1、通过 _state 标记其当前状态

2、将标记 loop._stopping 为 True 的动作或者其他一些标记动作放到 _callbacks 中,在期物设置其结果值set_result()时,将回调函数注册到 loop 对象的 _ready 队列中,在后续轮询时执行从而达到终止 loop 循环的目的。

3、期物因为其__iter__方法的实现方式,

在其 _state 为 PENDING 时,通过标记 _asyncio_future_blocking 为 True,且 yield self 告知 _step()当前期物的状态为 PENDING,需要将 _step() 操作添加到期物的 _callbacks 中,等期物的状态为 FINISHED 时,再获取期物的值。

在 _state 为 FINISHED 时,直接获取期物的 result 值。

4、上述官方 sample 中:

_run_until_complete_cb 即负责设置 loop._stopping=True,在 loop.run_until_complete(future) 方法中,会将_run_until_complete_cb方法添加到 future 的 _callbacks 中,当 future 有结果值(set_result())时,执行回调,终止 loop 的执行。

Note:asyncio.sleep() 解析

源码:

@coroutine

def sleep(delay, result=None, *, loop=None):

"""Coroutine that completes after a given time (in seconds)."""

if delay == 0:

yield

return result

if loop is None:

loop = events.get_event_loop()

future = loop.create_future()

h = future._loop.call_later(delay,

futures._set_result_unless_cancelled,

future, result)

try:

return (yield from future)

finally:

h.cancel()

1、当 delay 为0时,再次使用 send(None) 迭代执行获取 result 值;当 delay 不为0时,将 _set_result_unless_cancelled方法添加到延时 delay 后执行,_set_result_unless_cancelled是设置 future 的结果值。

2、return (yield from future) 相当于

data = None

for _tmp in future:

data = yield _tmp

return data

每个future 都会至少一次,至多两次执行 yield 语句来获取其结果值。

如何高效处理异步协程

1、关于 sleep(duration)

sleep 的设计思想是将回调绑定当前时间+duration实例化 TimeHandle 对象。

将所有的 TimeHandle 对象添加到堆列表 leap 中,TimeHandle 对象使用其时间(time)属性比较大小。

loop 在循环执行 leap 列表时,会判定当前时间 current_time 是否小于 leap 的第一个值 near_time,如果小于则 time.sleep(near_time-current_time) ,然后执行第一个回调,否则立即执行第一个回调。

2、关于 I/O

生成 socket 对象绑定 ip 和 port 然后进行监听,并且将 socket 对象及其回调注册到 selector 对象中,当 socket 收到可读或可写消息时(socket注册到selector时,可选择读消息或写消息或读写消息),执行回调。

回调的应用:一般会在回调中使用 accept 获取连接到当前 socket 的 cli_socket,然后将 cli_socket 注册到 selector 对象,客户端即可以和 cli_socket 进行通信。

分割线,第一次更新,18-03-03

Future

class Future:

"""

这个类同 concurrent.futures.Future基本上是兼容的

Difference:

- result() 和 exception() 不提供 timeout 参数,并且如果期物没有执行完毕会 raise 异常

- 使用 add_done_callback() 注册的回调只能通过循环事件的 call_soon_threadsafe() 执行

- 同 wait() 和 as_completed() 方法不兼容

(Python 3.4 以及后面可能会统一这一块的处理)

"""

# Class variables serving as defaults for instance variables.

_state = _PENDING

_result = None

_exception = None

_loop = None

_source_traceback = None

_blocking = False # proper use of future (yield vs yield from)

_log_traceback = False # Used for Python 3.4 and later

_tb_logger = None # Used for Python 3.3 only

def __init__(self, *, loop=None):

"""

使用 loop 初始化期物实例

"""

if loop is None:

self._loop = events.get_event_loop()

else:

self._loop = loop

self._callbacks = []

if self._loop.get_debug():

self._source_traceback = traceback.extract_stack(sys._getframe(1))

# 格式化展示回调函数

def __format_callbacks(self):

cb = self._callbacks

size = len(cb)

if not size:

cb = ''

def format_cb(callback):

return events._format_callback_source(callback, ())

if size == 1:

cb = format_cb(cb[0])

elif size == 2:

cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))

elif size > 2:

cb = '{}, , {}'.format(format_cb(cb[0]),

size-2,

format_cb(cb[-1]))

return 'cb=[%s]' % cb

# 格式化展示信息

def _repr_info(self):

info = [self._state.lower()]

if self._state == _FINISHED:

if self._exception is not None:

info.append('exception={!r}'.format(self._exception))

else:

# use reprlib to limit the length of the output, especially

# for very long strings

result = reprlib.repr(self._result)

info.append('result={}'.format(result))

if self._callbacks:

info.append(self.__format_callbacks())

if self._source_traceback:

frame = self._source_traceback[-1]

info.append('created at %s:%s' % (frame[0], frame[1]))

return info

def __repr__(self):

info = self._repr_info()

return '' % (self.__class__.__name__, ' '.join(info))

# On Python 3.3 and older, objects with a destructor part of a reference

# cycle are never destroyed. It's not more the case on Python 3.4 thanks

# to the PEP 442.

if compat.PY34:

def __del__(self):

if not self._log_traceback:

# set_exception() was not called, or result() or exception()

# has consumed the exception

return

exc = self._exception

context = {

'message': ('%s exception was never retrieved'

% self.__class__.__name__),

'exception': exc,

'future': self,

}

if self._source_traceback:

context['source_traceback'] = self._source_traceback

self._loop.call_exception_handler(context)

def cancel(self):

"""

取消期物,并且执行回调。如果期物已经 done 或者 cancelled,返回 False。

变更期物的状态,通知事件循环去安排回调,并且返回 True

"""

if self._state != _PENDING:

return False

self._state = _CANCELLED

self._schedule_callbacks()

return True

def _schedule_callbacks(self):

"""

通知事件循环尽快的执行回调,清空 callback 列表

"""

callbacks = self._callbacks[:]

if not callbacks:

return

self._callbacks[:] = []

for callback in callbacks:

self._loop.call_soon(callback, self)

def cancelled(self):

return self._state == _CANCELLED

# Don't implement running(); see http://bugs.python.org/issue18699

def done(self):

"""

期物已经运行结束或取消都是 done 的状态

"""

return self._state != _PENDING

def result(self):

"""

不会阻塞等待期物的执行,如果期物的状态非 _FINISHED,则 raise 对应的错误。如果 _exception 值存在,则 raise 对应的值,否则返回 _result值

"""

if self._state == _CANCELLED:

raise CancelledError

if self._state != _FINISHED:

raise InvalidStateError('Result is not ready.')

self._log_traceback = False

if self._tb_logger is not None:

self._tb_logger.clear()

self._tb_logger = None

if self._exception is not None:

raise self._exception

return self._result

def exception(self):

"""

不会阻塞等待期物的执行,如果期物的状态非 _FINISHED,则 raise 对应的错误。返回 _exception 值

"""

if self._state == _CANCELLED:

raise CancelledError

if self._state != _FINISHED:

raise InvalidStateError('Exception is not set.')

self._log_traceback = False

if self._tb_logger is not None:

self._tb_logger.clear()

self._tb_logger = None

return self._exception

def add_done_callback(self, fn):

"""

给期物运行完毕添加方法回调

如果期物已经运行完毕,则立即回调

"""

if self._state != _PENDING:

self._loop.call_soon(fn, self)

else:

self._callbacks.append(fn)

# New method not in PEP 3148.

def remove_done_callback(self, fn):

"""

移除指定回调

"""

filtered_callbacks = [f for f in self._callbacks if f != fn]

removed_count = len(self._callbacks) - len(filtered_callbacks)

if removed_count:

self._callbacks[:] = filtered_callbacks

return removed_count

# So-called internal methods (note: no set_running_or_notify_cancel()).

def set_result(self, result):

"""

标记期物运行结束,设置 _result 值,如果在期物运行结束时调用这个方法,则 raise InvalidStateError

"""

if self._state != _PENDING:

raise InvalidStateError('{}: {!r}'.format(self._state, self))

self._result = result

self._state = _FINISHED

self._schedule_callbacks() # 运行回调

def set_exception(self, exception):

"""

标记期物运行结束,设置 _exception 值,如果在期物运行结束时调用这个方法,则 raise InvalidStateError

"""idStateError.

if self._state != _PENDING:

raise InvalidStateError('{}: {!r}'.format(self._state, self))

if isinstance(exception, type):

exception = exception()

if type(exception) is StopIteration:

raise TypeError("StopIteration interacts badly with generators "

"and cannot be raised into a Future")

self._exception = exception

self._state = _FINISHED

self._schedule_callbacks()

if compat.PY34:

self._log_traceback = True

else:

self._tb_logger = _TracebackLogger(self, exception)

# Arrange for the logger to be activated after all callbacks

# have had a chance to call result() or exception().

self._loop.call_soon(self._tb_logger.activate)

def __iter__(self):

if not self.done():

self._blocking = True

yield self # This tells Task to wait for completion.

assert self.done(), "yield from wasn't used with future"

return self.result() # May raise too.

if compat.PY35:

# 兼容 async/await 方法

__await__ = __iter__ # make compatible with 'await' expression

python asyncio future_Python 期物之 asyncio.Future相关推荐

  1. python asyncio future_Python中的asyncio模块中的Future和Task的区别?

    问题一 按照官方文档的描述,Task是Futrue的一个subclass,标准库中也分别提供了create_task和create_future.请问这两者有功能上的什么区别? 问题二 对于ensur ...

  2. python asyncio future_Python asyncio.isfuture方法代码示例

    本文整理汇总了Python中asyncio.isfuture方法的典型用法代码示例.如果您正苦于以下问题:Python asyncio.isfuture方法的具体用法?Python asyncio.i ...

  3. python asyncio future_Python asyncio.ensure_future方法代碼示例

    本文整理匯總了Python中asyncio.ensure_future方法的典型用法代碼示例.如果您正苦於以下問題:Python asyncio.ensure_future方法的具體用法?Python ...

  4. 【Python】【容器 | 迭代对象 | 迭代器 | 生成器 | 生成器表达式 | 协程 | 期物 | 任务】...

    Python 的 asyncio 类似于 C++ 的 Boost.Asio. 所谓「异步 IO」,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知. Asyn ...

  5. python并发编程:协程asyncio、多线程threading、多进程multiprocessing

    python并发编程:协程.多线程.多进程 CPU密集型计算与IO密集型计算 多线程.多进程与协程的对比 多线程 创建多线程的方法 多线程实现的生产者-消费者爬虫 Lock解决线程安全问题 使用线程池 ...

  6. python asyncio理解_深入理解asyncio(二)

    Asyncio.gather vs asyncio.wait 在上篇文章已经看到多次用asyncio.gather了,还有另外一个用法是asyncio.wait,他们都可以让多个协程并发执行.那为什么 ...

  7. python异步asy_Python 异步编程之asyncio【转载】

    一.协程的认识 协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术. 简而言之,其实就是通过一个线程实现代码块相互切换执行.例如:deffunc1():print(1) . ...

  8. python协程asyncio 应用_Python-如何使用asyncio同时运行多个协程?

    TL:DR使用^{}同时运行多个协程.Maybe this scenario requires a framework based on events/callbacks rather than on ...

  9. python视频学习002(2017年老男孩最新全栈python第2期视频教程)---电脑简史

    申明 计算机简史 申明 最近开始系统的学习python,跟着老男孩的python教程一起学习并记录下相关内容,2017年老男孩最新全栈python第2期视频教程 相关视频下载请参考我上一篇博客,上面介 ...

最新文章

  1. app如何打开了request url_手机日历app内如何打开节日提醒功能?支持提前提醒节日的云便签...
  2. 图像孔洞填充与小连通域的删除
  3. php实现单个用户禁止重复登录,防止同一用户同时登陆
  4. python牛顿法寻找极值_python使用梯度下降和牛顿法寻找Rosenbrock函数最小值实例...
  5. datagridview选中获取行号_DataGridView控件显示行号的正确代码及分析
  6. 沈阳大学计算机考研,计算机专业考研成功经验谈:掌握方法,以不变应万变
  7. Android笔记 xml补间动画
  8. mysql判断可用性,MySQL -- 主從復制的可靠性與可用性
  9. java 定义三维列表_java 多维数据定义
  10. 八.nginx网站服务实践应用
  11. [x-means] 1.x-means简介
  12. 最近amd.dll闹的很火
  13. C# 类2010-11-07
  14. windows配置Hadoop开发环境
  15. HTML和CSS面试题—整理过的48题,关注收藏,持续更新
  16. Git 2.29.2 64位安装包
  17. tp1900芯片对比7621a_MT7621A /MT7620N / MT7620A单频双频刷机教程(区别于高通芯片刷机过程)...
  18. 性能测试七种常用方法,以及四大应用领域
  19. 【软件测试】可以写进简历的项目实战内容
  20. 飞塔防火墙命令行终端修改输出长度

热门文章

  1. android 记录血糖的折线图_撼高组血压血糖app下载-撼高组血压血糖安卓版 v1.0.4 - 安下载...
  2. APP制作:APP设计的过程与方法
  3. Android 编程之第三方开发 MaoZhuaWeiBo微博开发示例-1
  4. RIGOL示波器使用
  5. 基于云开发的二手书交易微信小程序
  6. 超强跑得快机器人智能算法深度研究与设计
  7. 微信分享链接含敏感词被屏蔽的问题
  8. 微信支付--敏感词加密
  9. bp神经网络算法的优缺点,bp神经网络与bp算法区别
  10. TCL彩电总线调整进入宝典――1