Python标准库asyncio模块基本原理浅析

本文环境python3.7.0

asyncio模块的实现思路

当前编程语言都开始在语言层面上,开始简化对异步程序的编程过程,其中Python中也开始了在语言层面上对异步编程的简化,特地使用了await和async这两个关键字来进行对异步代码的简化和封装。本文就开始简单的分析一下asyncio标准库是怎么来封装异步编程的这么一个过程,大致浏览一下asyncio中一个代码示例的工作流程,其实Python异步编程的本质也是封装了yeild等协程来实现的,涉及到的Task,Future等概念可通过前文异步爬虫框架与协程浅析来了解一下如果通过关键字yield等来直接进行协程封装的异步编程过程,该过程与本文待分析的asyncio模块包的工作原理基本一致,只不过在asnycio模块中添加了对await和async关键字的支持。

await和async关键字示例浅析

首先来看一下await和async两个关键字的基本示例:

class TestAsync(object):def __await__(self):yield selfprint("await return value")return 'TestAsync'async def test():print("start")data = await TestAsync()print("over ", data)return 'test return'if __name__ == '__main__':c = test()res = c.send(None)print('res ', res)try:c.send(None)except StopIteration as exc:print(" c last return ", exc.value)

该示例代码的返回结果如下:

start
res  <__main__. TestAsync object at 0x7f29e8ebd240>
await return value
over  TestAsyncc last return  test return

从运行结果可知,首先先得到一个实例c,然后向实例send值None,此时就执行到了TestAsync类实例的__await__方法处的yield方法,此时就yield返回了自己,此时就打印出了res,再一次通过c去send值None时,此时就执行到了TestAsync的__await__方法剩下的函数流程去执行,此时就报错StopIteration错误,此时返回的值就是__await__方法返回的值,此时test函数在await TestAsync之后的代码继续执行,此时的data 就是__await__的返回值,然后test函数执行完成后,返回’test return’,此时整个执行结束,此时报错的返回值就是test函数执行完成的函数值’test return’,至此一个简单的有关await和async关键字的例子就执行完成了。此处其实不用该关键字,也可以实现同等情况下的效果,可参考yield和yield from 的关键字的使用方法,大家有兴趣可自行尝试。

asyncio模块的原理描述与示例代码

有关异步编程,其实本质上都是通过事件触发来实现的异步编程,本质上都采用了IO复用的方式,来实现非阻塞的操作,通过注册读或写事件来注册当该事件发生时的回调函数,完成的时候就执行回调,让逻辑继续执行,有关IO复用的详细内容,大家可自行查阅相关内容,asyncio模块的本质也是围绕IO复用来实现的时间注册的运行模式,只不过配合了协程来实现,从而使编程的方式更加的简化,可以保存当前函数的执行的过程,从而更方便简洁的实现相关业务逻辑。

import asyncio
import urllib.parsedef test():async def print_http_headers(url):url = urllib.parse.urlsplit(url)if url.scheme == 'https':reader, writer = await asyncio.open_connection(url.hostname, 443, ssl=True)else:reader, writer = await asyncio.open_connection(url.hostname, 80)query = (f"HEAD {url.path or '/' } HTTP/1.0\r\n"f"Host: {url.hostname}\r\n"f"\r\n")writer.write(query.encode('latin-1'))while True:line = await reader.readline()if not line:breakline = line.decode('latin1').rstrip()if line:print(f'HTTP header> {line}')writer.close()url = 'https://www.baidu.com'asyncio.run(print_http_headers(url))if __name__ == '__main__':test()

该函数的执行结果如下:

HTTP header> HTTP/1.0 200 OK
HTTP header> Accept-Ranges: bytes
HTTP header> Cache-Control: private, no-cache, no-store, proxy-revalidate, no-transform
HTTP header> Content-Length: 277
HTTP header> Content-Type: text/html
HTTP header> Date: Tue, 22 Jan 2019 00:40:02 GMT
HTTP header> Etag: "575e1f80-115"
HTTP header> Last-Modified: Mon, 13 Jun 2016 02:50:40 GMT
HTTP header> Pragma: no-cache
HTTP header> Server: bfe/1.0.8.18

接下来,本文就简单分析一下该示例代码的执行流程。

asyncio模块示例代码执行原理分析

首先查看实例代码的asyncio.run()函数的基本内容:

def run(main, *, debug=False):"""Run a coroutine.This function runs the passed coroutine, taking care ofmanaging the asyncio event loop and finalizing asynchronousgenerators.This function cannot be called when another asyncio event loop isrunning in the same thread.If debug is True, the event loop will be run in debug mode.This function always creates a new event loop and closes it at the end.It should be used as a main entry point for asyncio programs, and shouldideally only be called once.Example:async def main():await asyncio.sleep(1)print('hello')asyncio.run(main())"""if events._get_running_loop() is not None:                                  # 检查当前线程是否有loop实例,如果不为空则报错raise RuntimeError("asyncio.run() cannot be called from a running event loop")         # 如果获取不到则报错if not coroutines.iscoroutine(main):                                        # 检查是否是协程raise ValueError("a coroutine was expected, got {!r}".format(main))     # 如果不是则报错loop = events.new_event_loop()                                              # 获取loop循环实例,该实例就是IO复用的循环处理实例try:events.set_event_loop(loop)                                             # 设置looploop.set_debug(debug)                                                   # 设置loop是否为调试模式return loop.run_until_complete(main)                                    # 调用run_until_complete方法直到传入的main函数执行完毕finally:try:_cancel_all_tasks(loop)loop.run_until_complete(loop.shutdown_asyncgens())finally:events.set_event_loop(None)loop.close()

该函数主要是做了一些检查,获取loop的实例,然后调用实例的run_until_complete方法,我们查看一下new_event_loop()函数的内容:

def new_event_loop():"""Equivalent to calling get_event_loop_policy().new_event_loop()."""return get_event_loop_policy().new_event_loop()      # 调用loop的new_event_loop函数def _init_event_loop_policy():global _event_loop_policy                           # 全局变量 _event_loop_policywith _lock:                                         # 加锁if _event_loop_policy is None:  # pragma: no branchfrom . import DefaultEventLoopPolicy_event_loop_policy = DefaultEventLoopPolicy()   # 使用默认的DefaultEventLoopPolicy初始化并获取实例def get_event_loop_policy():"""Get the current event loop policy."""if _event_loop_policy is None:                      # 判断全局变量是否为空_init_event_loop_policy()                       # 为空则初始化return _event_loop_policy                           # 返回初始化实例

此时可知,loop就是DefaultEventLoopPolicy类实例调用new_event_loop方法返回的实例,在Linux系统上DefaultEventLoopPolicy就是_UnixDefaultEventLoopPolicy,继续查看该类的代码:

class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):"""UNIX event loop policy with a watcher for child processes."""_loop_factory = _UnixSelectorEventLoop                                          # loop实例化的类...def set_event_loop(self, loop):"""Set the event loop.As a side effect, if a child watcher was set before, then calling.set_event_loop() from the main thread will call .attach_loop(loop) onthe child watcher."""super().set_event_loop(loop)if (self._watcher is not None andisinstance(threading.current_thread(), threading._MainThread)):self._watcher.attach_loop(loop)...class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):"""Default policy implementation for accessing the event loop.In this policy, each thread has its own event loop.  However, weonly automatically create an event loop by default for the mainthread; other threads by default have no event loop.Other policies may have different rules (e.g. a single globalevent loop, or automatically creating an event loop per thread, orusing some other notion of context to which an event loop isassociated)."""_loop_factory = Noneclass _Local(threading.local):_loop = None_set_called = Falsedef __init__(self):self._local = self._Local()                                                 # 线程安全保存def get_event_loop(self):                                                       # 获取loop"""Get the event loop.This may be None or an instance of EventLoop."""if (self._local._loop is None andnot self._local._set_called andisinstance(threading.current_thread(), threading._MainThread)):     # 判断loop是否为空self.set_event_loop(self.new_event_loop())                              # 如果为空则设置一个新的loop实例if self._local._loop is None:                                               # 如果设置完成后仍然为空则报错raise RuntimeError('There is no current event loop in thread %r.'% threading.current_thread().name)return self._local._loop                                                    # 返回loop实例def set_event_loop(self, loop):"""Set the event loop."""self._local._set_called = True                                              # 设置被设置标志为Trueassert loop is None or isinstance(loop, AbstractEventLoop)                  # 判断loop实例是否为AbstractEventLoop子类self._local._loop = loop                                                    # 设置loop类实例def new_event_loop(self):"""Create a new event loop.You must call set_event_loop() to make this the current eventloop."""return self._loop_factory()                                                 # 实例化loop类实例

此时loop就是_UnixSelectorEventLoop类的实例,继续查看该类的run_until_complete方法:

_UnixSelectorEventLoop继承自selector_events.BaseSelectorEventLoop继承自base_events.BaseEventLoop继承自events.AbstractEventLoop

此时调用的就是BaseEventLoop的run_until_complete方法;

def run_until_complete(self, future):"""Run until the Future is done.If the argument is a coroutine, it is wrapped in a Task.WARNING: It would be disastrous to call run_until_complete()with the same coroutine twice -- it would wrap it in twodifferent Tasks and that can't be good.Return the Future's result, or raise its exception."""self._check_closed()                                                        # 检查loop是否是关闭状态new_task = not futures.isfuture(future)                                     # 检查是否是future类future = tasks.ensure_future(future, loop=self)                             # 生成Task任务实例if new_task:                                                                # 是否是task# An exception is raised if the future didn't complete, so there# is no need to log the "destroy pending task" messagefuture._log_destroy_pending = False                             future.add_done_callback(_run_until_complete_cb)                            # 添加完成回调方法try:self.run_forever()                                                      # 运行except:if new_task and future.done() and not future.cancelled():               # 报错则检查任务是否完成 是否取消# The coroutine raised a BaseException. Consume the exception# to not log a warning, the caller doesn't have access to the# local task.future.exception()                                                  # 报错raisefinally:future.remove_done_callback(_run_until_complete_cb)                     # 移除回调方法if not future.done():                                                       # 如果任务没有完成则报错raise RuntimeError('Event loop stopped before Future completed.')return future.result()                                                      # 返回future 的执行结果

首先查看tasks.ensure_future方法来查看生成Task的方法;

def ensure_future(coro_or_future, *, loop=None):"""Wrap a coroutine or an awaitable in a future.If the argument is a Future, it is returned directly."""if coroutines.iscoroutine(coro_or_future):                                      # 判断是否是协程 在本例中是协程if loop is None:                                                            # 检查loop是否为空loop = events.get_event_loop()                                          # 为空则创建一个新的loop实例task = loop.create_task(coro_or_future)                                     # 初始化一个taskif task._source_traceback:del task._source_traceback[-1]return task                                                                 # 返回task实例elif futures.isfuture(coro_or_future):                                          # 是否是futureif loop is not None and loop is not futures._get_loop(coro_or_future):raise ValueError('loop argument must agree with Future')return coro_or_futureelif inspect.isawaitable(coro_or_future):                                       # 是否是await类return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)            # 如果是则通过包装后继续生成一个task实例else:raise TypeError('An asyncio.Future, a coroutine or an awaitable is ''required')                                                 # 都不是则报错

在本例中main是否一个coroutine则直接调用了loop.create_task(coro_or_future)来生成一个task实例,此时就调用了base_events.BaseEventLoop的create_task方法;

def create_task(self, coro):"""Schedule a coroutine object.Return a task object."""self._check_closed()                                # 检查是否关闭if self._task_factory is None:                      # 如果_task_factory为空则使用tasks.Task类初始化生成一个该类实例task = tasks.Task(coro, loop=self)              # 生成task实例if task._source_traceback:del task._source_traceback[-1]else:task = self._task_factory(self, coro)           # 如果配置则使用配置的_task_factoryreturn task                                         # 返回task实例

此时我们分析一下tasks.Task的初始化过程;

class Task(futures._PyFuture):  # Inherit Python Task implementation# from a Python Future implementation."""A coroutine wrapped in a Future."""# An important invariant maintained while a Task not done:## - Either _fut_waiter is None, and _step() is scheduled;# - or _fut_waiter is some Future, and _step() is *not* scheduled.## The only transition from the latter to the former is through# _wakeup().  When _fut_waiter is not None, one of its callbacks# must be _wakeup().# If False, don't log a message if the task is destroyed whereas its# status is still pending_log_destroy_pending = True...def __init__(self, coro, *, loop=None):super().__init__(loop=loop)if self._source_traceback:del self._source_traceback[-1]if not coroutines.iscoroutine(coro):# raise after Future.__init__(), attrs are required for __del__# prevent logging for pending task in __del__self._log_destroy_pending = Falseraise TypeError(f"a coroutine was expected, got {coro!r}")self._must_cancel = Falseself._fut_waiter = Noneself._coro = coroself._context = contextvars.copy_context()                              # 获取执行上下文self._loop.call_soon(self.__step, context=self._context)                # 调用loop的call_soon方法将__step方法传入执行_register_task(self)...def __step(self, exc=None):if self.done():                                                         # 检查是否已经完成raise futures.InvalidStateError(f'_step(): already done: {self!r}, {exc!r}')if self._must_cancel:if not isinstance(exc, futures.CancelledError):exc = futures.CancelledError()self._must_cancel = Falsecoro = self._coro                                                       # 获取协程self._fut_waiter = None _enter_task(self._loop, self)# Call either coro.throw(exc) or coro.send(None).try:if exc is None:# We use the `send` method directly, because coroutines# don't have `__iter__` and `__next__` methods.result = coro.send(None)                                        # 通过协程调用send去执行,result就是返回的future或者报错的值else:result = coro.throw(exc)except StopIteration as exc:if self._must_cancel:# Task is cancelled right before coro stops.self._must_cancel = Falsesuper().set_exception(futures.CancelledError())else:super().set_result(exc.value)                                   # 如果StopIteration,则返回函数最后的返回值except futures.CancelledError:super().cancel()  # I.e., Future.cancel(self).except Exception as exc:super().set_exception(exc)                                          # 报错则抛出错误except BaseException as exc:super().set_exception(exc)raiseelse:blocking = getattr(result, '_asyncio_future_blocking', None)        # 如果成功则获取result的_asyncio_future_blocking属性if blocking is not None:                                            # 如果不为空# Yielded Future must come from Future.__iter__().if futures._get_loop(result) is not self._loop:                 # 检查是否是同一个loop 不是则报错new_exc = RuntimeError(f'Task {self!r} got Future 'f'{result!r} attached to a different loop')self._loop.call_soon(self.__step, new_exc, context=self._context)elif blocking:                                                  # 如果为Trueif result is self:                                          # 如果结果为自己则报错new_exc = RuntimeError(f'Task cannot await on itself: {self!r}')self._loop.call_soon(self.__step, new_exc, context=self._context)else:result._asyncio_future_blocking = False                 # 设置为Falseresult.add_done_callback(self.__wakeup, context=self._context)               # 添加self.__wakeup到回调函数列表中self._fut_waiter = result                               # 设置_fut_waiter为resultif self._must_cancel:                                   # 检查是否需要取消if self._fut_waiter.cancel():self._must_cancel = Falseelse:new_exc = RuntimeError(f'yield was used instead of yield from 'f'in task {self!r} with {result!r}')self._loop.call_soon(self.__step, new_exc, context=self._context)            # 其他情况都报错处理elif result is None:                                                # 如果result为空# Bare yield relinquishes control for one event loop iteration.self._loop.call_soon(self.__step, context=self._context)        # 继续调用__stepelif inspect.isgenerator(result):                                   # 检查是否是生成器# Yielding a generator is just wrong.new_exc = RuntimeError(f'yield was used instead of yield from for 'f'generator in task {self!r} with {result!r}')self._loop.call_soon(self.__step, new_exc, context=self._context)                # 是生成器则报错else:# Yielding something else is an error.new_exc = RuntimeError(f'Task got bad yield: {result!r}')self._loop.call_soon(self.__step, new_exc, context=self._context)                # 否则就抛错误的Taskfinally:_leave_task(self._loop, self)                                       # 弹出该taskself = None  # Needed to break cycles when an exception occurs.def __wakeup(self, future):try:future.result()                                                     # 先获取future的值except Exception as exc:# This may also be a cancellation.self.__step(exc)                                                    # 如果报错则直接报错处理else:# Don't pass the value of `future.result()` explicitly,# as `Future.__iter__` and `Future.__await__` don't need it.# If we call `_step(value, None)` instead of `_step()`,# Python eval loop would use `.send(value)` method call,# instead of `__next__()`, which is slower for futures# that return non-generator iterators from their `__iter__`.self.__step()                                                       # 继续调用step执行self = None  # Needed to break cycles when an exception occurs.调用了base_events.BaseEventLoop类的call_soon方法def call_soon(self, callback, *args, context=None):"""Arrange for a callback to be called as soon as possible.This operates as a FIFO queue: callbacks are called in theorder in which they are registered.  Each callback will becalled exactly once.Any positional arguments after the callback will be passed tothe callback when it is called."""self._check_closed()                                                # 检查是否关闭if self._debug:                                                     # 是否是调试模式self._check_thread()self._check_callback(callback, 'call_soon')handle = self._call_soon(callback, args, context)                   # 调用_call_soon方法,包装传入的方法if handle._source_traceback:del handle._source_traceback[-1]return handle                                                       # 返回def _call_soon(self, callback, args, context):handle = events.Handle(callback, args, self, context)               # 实例化一个handler类if handle._source_traceback:del handle._source_traceback[-1]self._ready.append(handle)                                          # 添加到_ready队列中return handle                                                       # 返回handle

其中首先初始化了一个Task实例,在初始化的过程中就调用了loop的call_soon方法,该方法就先执行了传入的main()函数,先给该函数send值None此时就开始执行了,此时就执行到了asyncio.open_connection处,此时分析该代码;

async def open_connection(host=None, port=None, *,loop=None, limit=_DEFAULT_LIMIT, **kwds):"""A wrapper for create_connection() returning a (reader, writer) pair.The reader returned is a StreamReader instance; the writer is aStreamWriter instance.The arguments are all the usual arguments to create_connection()except protocol_factory; most common are positional host and port,with various optional keyword arguments following.Additional optional keyword arguments are loop (to set the event loopinstance to use) and limit (to set the buffer limit passed to theStreamReader).(If you want to customize the StreamReader and/orStreamReaderProtocol classes, just copy the code -- there'sreally nothing special here except some convenience.)"""if loop is None:                                                    # 判断是否为空loop = events.get_event_loop()                                  # 为空则获取loopreader = StreamReader(limit=limit, loop=loop)                       # 初始化一个读实例protocol = StreamReaderProtocol(reader, loop=loop)                  # 初始化一个读实例transport, _ = await loop.create_connection(lambda: protocol, host, port, **kwds)                           # 创建连接writer = StreamWriter(transport, protocol, reader, loop)            # 初始化一个写实例return reader, writer                                               # 返回读和写

此时首先初始化了reader和protocol然后进入到loop.create_connections来创建连接,

async def create_connection(self, protocol_factory, host=None, port=None,*, ssl=None, family=0,proto=0, flags=0, sock=None,local_addr=None, server_hostname=None,ssl_handshake_timeout=None):"""Connect to a TCP server.Create a streaming transport connection to a given Internet host andport: socket family AF_INET or socket.AF_INET6 depending on host (orfamily if specified), socket type SOCK_STREAM. protocol_factory must bea callable returning a protocol instance.This method is a coroutine which will try to establish the connectionin the background.  When successful, the coroutine returns a(transport, protocol) pair."""if server_hostname is not None and not ssl:                                 # 检查连接是否是sslraise ValueError('server_hostname is only meaningful with ssl')if server_hostname is None and ssl:                                         # 检查是否传入host# Use host as default for server_hostname.  It is an error# if host is empty or not set, e.g. when an# already-connected socket was passed or when only a port# is given.  To avoid this error, you can pass# server_hostname='' -- this will bypass the hostname# check.  (This also means that if host is a numeric# IP/IPv6 address, we will attempt to verify that exact# address; this will probably fail, but it is possible to# create a certificate for a specific IP address, so we# don't judge it here.)if not host:raise ValueError('You must set server_hostname ''when using ssl without a host')server_hostname = hostif ssl_handshake_timeout is not None and not ssl:                           # 检查是否在ssl时传入time_outraise ValueError('ssl_handshake_timeout is only meaningful with ssl')if host is not None or port is not None:                                    # 如果Host不为空或者port不为空if sock is not None:                                                    raise ValueError('host/port and sock can not be specified at the same time')infos = await self._ensure_resolved((host, port), family=family,type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)       # 解析Host信息if not infos:raise OSError('getaddrinfo() returned empty list')if local_addr is not None:laddr_infos = await self._ensure_resolved(local_addr, family=family,type=socket.SOCK_STREAM, proto=proto,flags=flags, loop=self)if not laddr_infos:raise OSError('getaddrinfo() returned empty list')exceptions = []for family, type, proto, cname, address in infos:try:sock = socket.socket(family=family, type=type, proto=proto)      # 尝试连接sock.setblocking(False)                                          # 设置连接为非阻塞的if local_addr is not None:for _, _, _, _, laddr in laddr_infos:try:sock.bind(laddr)                                     # 监听端口breakexcept OSError as exc:msg = (f'error while attempting to bind on 'f'address {laddr!r}: 'f'{exc.strerror.lower()}')exc = OSError(exc.errno, msg)exceptions.append(exc)else:sock.close()                                             # 关闭端口sock = Nonecontinueif self._debug:logger.debug("connect %r to %r", sock, address)await self.sock_connect(sock, address)                           # 获取连接except OSError as exc:if sock is not None:sock.close()exceptions.append(exc)except:if sock is not None:sock.close()raiseelse:breakelse:if len(exceptions) == 1:raise exceptions[0]else:# If they all have the same str(), raise one.model = str(exceptions[0])if all(str(exc) == model for exc in exceptions):raise exceptions[0]# Raise a combined exception so the user can see all# the various error messages.raise OSError('Multiple exceptions: {}'.format(', '.join(str(exc) for exc in exceptions)))else:if sock is None:raise ValueError('host and port was not specified and no sock specified')if sock.type != socket.SOCK_STREAM:# We allow AF_INET, AF_INET6, AF_UNIX as long as they# are SOCK_STREAM.# We support passing AF_UNIX sockets even though we have# a dedicated API for that: create_unix_connection.# Disallowing AF_UNIX in this method, breaks backwards# compatibility.raise ValueError(f'A Stream Socket was expected, got {sock!r}')transport, protocol = await self._create_connection_transport(sock, protocol_factory, ssl, server_hostname,ssl_handshake_timeout=ssl_handshake_timeout)                                # 创建连接if self._debug:# Get the socket from the transport because SSL transport closes# the old socket and creates a new SSL socketsock = transport.get_extra_info('socket')logger.debug("%r connected to %s:%r: (%r, %r)",sock, host, port, transport, protocol)return transport, protocol                                                      # 返回连接

此时继续分析,self.sock_connect函数相关内容;

async def sock_connect(self, sock, address):"""Connect to a remote socket at address.This method is a coroutine."""if self._debug and sock.gettimeout() != 0:                                  # 检查是否在调试模式下是阻塞连接raise ValueError("the socket must be non-blocking")if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:resolved = await self._ensure_resolved(address, family=sock.family, proto=sock.proto, loop=self)_, _, _, _, address = resolved[0]                                       # 解析地址fut = self.create_future()                                                  # 创建futureself._sock_connect(fut, sock, address)                                      # 调用连接return await fut                                                            # 返回该futdef _sock_connect(self, fut, sock, address):fd = sock.fileno()                                                          # 获取文件描述符try:sock.connect(address)                                                   # 连接该地址except (BlockingIOError, InterruptedError):# Issue #23618: When the C function connect() fails with EINTR, the# connection runs in background. We have to wait until the socket# becomes writable to be notified when the connection succeed or# fails.fut.add_done_callback(functools.partial(self._sock_connect_done, fd))                     #  如果报错则添加_sock_connect_done到执行完成的会回调函数列表中self.add_writer(fd, self._sock_connect_cb, fut, sock, address)          # 添加写事件到loop中注册回调方法_sock_connect_cbexcept Exception as exc:fut.set_exception(exc)                                                  # 如果报错则直接报错else:fut.set_result(None)                                                    # 如果此时连接成功则直接让task进行下一步def _sock_connect_done(self, fd, fut):self.remove_writer(fd)                                                      # 当连接完成后从监听列表中删除该文件描述符def _sock_connect_cb(self, fut, sock, address):if fut.cancelled():                                                         # 检查是否已经取消returntry:err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)               # 获取连接信息if err != 0:# Jump to any except clause below.raise OSError(err, f'Connect call failed {address}')except (BlockingIOError, InterruptedError):# socket is still registered, the callback will be retried later        # 如果当前还不能连接则继续不调用fut去执行下一步passexcept Exception as exc:fut.set_exception(exc)                                                  # 如果其他异常else:fut.set_result(None)                                                    # 连接成功则执行下一步,让task继续执行

如果此时连接失败则会注册一个可写事件到循环中,此时就查看send之后的操作,此时继续返回run_until_complete函数,此时就执行到self.run_forever()处,此时的逻辑代码执行过程如下;

    def run_forever(self):"""Run until stop() is called."""self._check_closed()                                                    # 检查是否关闭if self.is_running():                                                   # 检查是否已经在运行raise RuntimeError('This event loop is already running')if events._get_running_loop() is not None:                              # 获取Loopraise RuntimeError('Cannot run the event loop while another loop is running')self._set_coroutine_origin_tracking(self._debug)                        # 设置是否为调试模式self._thread_id = threading.get_ident()                                 # 获取当前线程idold_agen_hooks = sys.get_asyncgen_hooks()sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,finalizer=self._asyncgen_finalizer_hook)try:events._set_running_loop(self)                                      # 设置当前运行的loopwhile True:self._run_once()                                                # 运行if self._stopping:breakfinally:self._stopping = Falseself._thread_id = Noneevents._set_running_loop(None)self._set_coroutine_origin_tracking(False)sys.set_asyncgen_hooks(*old_agen_hooks)def _run_once(self):"""Run one full iteration of the event loop.This calls all currently ready callbacks, polls for I/O,schedules the resulting callbacks, and finally schedules'call_later' callbacks."""sched_count = len(self._scheduled)                                          # 获取定时器相关的跳读if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES andself._timer_cancelled_count / sched_count >_MIN_CANCELLED_TIMER_HANDLES_FRACTION):                             # 检查待执行的定时任务并重新移动到一个调度列表中# Remove delayed calls that were cancelled if their number# is too highnew_scheduled = []for handle in self._scheduled:if handle._cancelled:handle._scheduled = Falseelse:new_scheduled.append(handle)heapq.heapify(new_scheduled)self._scheduled = new_scheduledself._timer_cancelled_count = 0else:# Remove delayed calls that were cancelled from head of queue.while self._scheduled and self._scheduled[0]._cancelled:                # 移除已经超时的任务self._timer_cancelled_count -= 1handle = heapq.heappop(self._scheduled)handle._scheduled = Falsetimeout = Noneif self._ready or self._stopping:                                           # 是否有已经准备的任务timeout = 0                                                             # 如果有则设置为0elif self._scheduled:                                                       # 是否有可调度的任务# Compute the desired timeout.when = self._scheduled[0]._when                                         # 获取调度任务的时间timeout = max(0, when - self.time())                                    # 比较timeout与下一次需要执行的任务的时间if self._debug and timeout != 0:                                            # 是否为调试模式并且timeout不为0t0 = self.time()event_list = self._selector.select(timeout)dt = self.time() - t0if dt >= 1.0:level = logging.INFOelse:level = logging.DEBUGnevent = len(event_list)if timeout is None:logger.log(level, 'poll took %.3f ms: %s events',dt * 1e3, nevent)elif nevent:logger.log(level,'poll %.3f ms took %.3f ms: %s events',timeout * 1e3, dt * 1e3, nevent)elif dt >= 1.0:logger.log(level,'poll %.3f ms took %.3f ms: timeout',timeout * 1e3, dt * 1e3)else:event_list = self._selector.select(timeout)                             # 调用IO复用self._process_events(event_list)                                            # 处理读写事件# Handle 'later' callbacks that are ready.end_time = self.time() + self._clock_resolution                             # 获取当前时间while self._scheduled:                                                      # 遍历待执行任务列表handle = self._scheduled[0]                                             # 获取定时任务if handle._when >= end_time:                                            # 如果时间还未到则停止breakhandle = heapq.heappop(self._scheduled)                                 # 如果时间到了handle._scheduled = Falseself._ready.append(handle)                                              # 添加到_ready队列中# This is the only place where callbacks are actually *called*.# All other places just add them to ready.# Note: We run all currently scheduled callbacks, but not any# callbacks scheduled by callbacks run this time around --# they will be run the next time (after another I/O poll).# Use an idiom that is thread-safe without using locks.ntodo = len(self._ready)                                                    # 获取队列长度for i in range(ntodo):handle = self._ready.popleft()                                          # 弹出handleif handle._cancelled:                                                   # 如果handle取消则循环下一个continueif self._debug:                                                         # 是否是调试模式try:self._current_handle = handlet0 = self.time()handle._run()dt = self.time() - t0if dt >= self.slow_callback_duration:logger.warning('Executing %s took %.3f seconds',_format_handle(handle), dt)finally:self._current_handle = Noneelse:handle._run()                                                       # 调用回调函数进行处理handle = None  # Needed to break cycles when an exception occurs.此时调用的是BaseSelectorEventLoop的_process_events方法def _process_events(self, event_list):for key, mask in event_list:                                                # 遍历任务列表fileobj, (reader, writer) = key.fileobj, key.data                       # 获取值if mask & selectors.EVENT_READ and reader is not None:                  # 如果可读if reader._cancelled:                                               # 如果读取消了self._remove_reader(fileobj)                                    # 移除该文件监听else:self._add_callback(reader)                                      # 否则就添加到回调函数列表中if mask & selectors.EVENT_WRITE and writer is not None:if writer._cancelled:self._remove_writer(fileobj)else:self._add_callback(writer)此时调用了BaseEventLoop的_add_callback方法def _add_callback(self, handle):"""Add a Handle to _scheduled (TimerHandle) or _ready."""assert isinstance(handle, events.Handle), 'A Handle is required here'if handle._cancelled:                                                        # 如果任务取消则返回returnassert not isinstance(handle, events.TimerHandle)self._ready.append(handle)                                                   # 添加到准备好列表中

此时由于已经注册了写事件到列表中,此时就只需等待IO复用的事件通知,通知完成后则调用_process_events函数处理事件,然后通过_add_callback将唤醒的时间添加到_ready列表中去,待此时可以连接之后就执行了fut.set_result(None) ,此时查看Future类的定义;

def set_result(self, result):"""Mark the future done and set its result.If the future is already done when this method is called, raisesInvalidStateError."""if self._state != _PENDING:                                             # 检查状态是否还是未执行raise InvalidStateError('{}: {!r}'.format(self._state, self))self._result = result                                                   # 设置返回结果self._state = _FINISHED                                                 # 设置状态为完成self.__schedule_callbacks()                                             # 调用回调方法def __schedule_callbacks(self):"""Internal: Ask the event loop to call all callbacks.The callbacks are scheduled to be called as soon as possible. Alsoclears the callback list."""callbacks = self._callbacks[:]                                          # 获取回调方法if not callbacks:returnself._callbacks[:] = []for callback, ctx in callbacks:                                         # 依次遍历添加到可执行列表中self._loop.call_soon(callback, self, context=ctx)def result(self):"""Return the result this future represents.If the future has been cancelled, raises CancelledError.  If thefuture's result isn't yet available, raises InvalidStateError.  Ifthe future is done and has an exception set, this exception is raised."""if self._state == _CANCELLED:raise CancelledErrorif self._state != _FINISHED:raise InvalidStateError('Result is not ready.')self.__log_traceback = Falseif self._exception is not None:raise self._exceptionreturn self._result                                                     # 获取返回值def __await__(self):if not self.done():                                                     # 检查是否完成self._asyncio_future_blocking = True                                # 设置标志位trueyield self  # This tells Task to wait for completion.if not self.done():                                                     # 如果未完成则报错raise RuntimeError("await wasn't used with future")return self.result()  # May raise too.                                  # 最后返回fut设置的result__iter__ = __await__  # make compatible with 'yield from'.

由于在Task的初始化过程中将__step传入回调函数中,在send值None后获得的fut中也添加了__wakeup函数作为回调函数,此时在传入call_soon后就直接调用了__wakeup函数,然后又继续send推进到下一步执行,从而达到了在IO复用的注册的回调函数中,通过调用fut的set_result方法继续推进Task的协程继续向下执行,这也就是Python支持的异步编程的主要的思路。后续的读和写的方法,同理一样的执行流程,限于本文长度就不再本文中继续展开分析。

总结

Python提供的asyncio模块,是更高层的IO复用和协程的封装,其本质也是使用了yield关键字作为协程的执行和流程推进方式,从而大大的方便了用户去编写异步程序,该模块的基本原理都是基于此来实现,其他提供的IO的操作都是基于此原理进行扩展编写完成,大家有兴趣可自行去查看源码学习。鉴于本人才疏学浅,如有疏漏请批评指正。

Python标准库asyncio模块基本原理浅析相关推荐

  1. Python标准库queue模块原理浅析

    Python标准库queue模块原理浅析 本文环境python3.5.2 queue模块的实现思路 作为一个线程安全的队列模块,该模块提供了线程安全的一个队列,该队列底层的实现基于Python线程th ...

  2. Python标准库threading模块Condition原理浅析

    Python标准库threading模块Condition原理浅析 本文环境python3.5.2 threading模块Condition的实现思路 在Python的多线程实现过程中,在Linux平 ...

  3. Python 标准库 functools 模块详解

    functools 官方文档:https://docs.python.org/zh-cn/3/library/functools.html Python 标准模块 --- functools:http ...

  4. python的csv标准库,Python标准库: csv模块——CSV文件的读写

    CSV简介 CSV(Comma Separated Values,逗号分隔值)也称字符分隔值,因为分隔符可以不是逗号,是一种常用的文本格式,用以存储表格数据,包括数字或者字符.很多程序在处理数据时都会 ...

  5. Python标准库——collections模块的Counter类

    更多16 最近在看一本名叫<Python Algorithm: Mastering Basic Algorithms in the Python Language>的书,刚好看到提到这个C ...

  6. Python标准库collections模块的Counter类

    collections模块 collections模块自Python 2.4版本开始被引入,包含了dict.set.list.tuple以外的一些特殊的容器类型,分别是: OrderedDict类:排 ...

  7. Python 标准库 - Pprint 模块 - 用于打印 Python 数据结构

    使用 pprint 模块 pprint 模块( pretty printer ) 用于打印 Python 数据结构. 当你在命令行下打印特定数据结构时你会发现它很有用(输出格式比较整齐, 便于阅读). ...

  8. Python标准库--time模块的详解

    time模块 - - -时间获取和转换 在我们学习time模块之前需要对以下的概念进行了解: 时间戳:时间戳是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08 ...

  9. Python标准库glob模块

    最近做了一个将.dat文件转化为.nc文件的任务,由于要进行批量转化所以遍历文件夹必不可少,刚开始学习python编程,所以把用过的模块用法记录下来,以加深记忆,方便查阅. glob模块的主要方法就是 ...

最新文章

  1. 云计算服务需求促进边缘计算的应用与发展
  2. 数据结构-String、char
  3. 北京大学 AdaMod优化器 孙栩
  4. ubuntu 下安装 activate-power-mode
  5. 如何打开.etl文件?
  6. GameMaker Studio 之中的攻击与受击判定盒
  7. Java线程:线程的调度-优先级
  8. android权限 启动失败,Android 6.0打开失败:EACCES(权限被拒绝)
  9. 5种ASP.NET页面间传递参数实例代码
  10. solr crud_Spring Data Solr教程:CRUD(差不多)
  11. c语言printf %llo,c++ - Printf疯狂了 - 堆栈内存溢出
  12. sql游标循环结果集
  13. Android7.1+查看audio policy使用.conf/.xml(二十七)
  14. 据说这些基础知识90%的人都回答错了,你呢?
  15. Springboot+CAS下Session过期无效,页面请求302问题解决
  16. php获取ip所有方式,php获取用户(客户端)真实IP地址的三种方法
  17. 该如何提高个人影响力
  18. mysql ip访问限制解除
  19. mongo启动报错:ERROR: child process failed, exited with error number 1
  20. golang vendor介绍

热门文章

  1. 图像分析用 OpenCV 与 Skimage,哪一个更好?
  2. 人工智能是否能开启人类世界新纪元?
  3. 我输给了一个 25 岁的男人
  4. 代码写对了还挂了?程序媛小姐姐从 LRU Cache 带你看面试的本质
  5. 号称3个月发布最强量子计算机,卖口罩的霍尼韦尔凭什么?
  6. 华人“霸榜”ACL最佳长短论文、杰出论文一作,华为、南理工等获奖
  7. 轻松练:如何从900万张图片中对600类照片进行分类|技术头条
  8. 冠军揭晓!京东Alpha开发者大赛Pick谁上了C位
  9. 9月推荐 | 精选机器学习文章Top10
  10. TensorFlow1.8.0正式发布,Bug修复和改进内容都在这里了