IOStream对tornado的高效起了很大的作用,他封装了socket的非阻塞IO的读写操作。大体上可以这么说,当连接建立后,服务端与客户端的请求响应的读写都是基于IOStream的,也就是说:IOStream是用来处理对连接的读写,当然IOStream是异步的读写而且可以有很多花样的读写。

接下来说一下有关接收请求的大体流程:

  当连接建立,服务器端会产生一个对应该连接的socket,同时将该socket封装至IOStream实例中(这代表着IOStream的初始化)。

  我们知道tornado是基于IO多路复用的(就拿epoll来说),此时将socket进行register,事件为READABLE,这一步与IOStream没有多大关系。

  当该socket事件发生时,也就是意味着有数据从连接发送到了系统缓冲区中,这时就需要将chunk读入到我们在内存中为其开辟的_read_buffer中,在IOStream中使用deque作为buffer。_read_buffer表示读缓冲,当然也有_write_buffer,并且在读的过程中也会检测总尺寸是否大于我们设定的最大缓冲尺寸。不管是读缓冲还是写缓冲本质上就是tornado进程开辟的一段用来存储数据的内存。

  而这些chunk一般都是客户端发送的请求了,但是我们还需要对这些chunk作进一步操作,比如这个chunk中可能包含了多个请求,如何把请求分离?(每个请求首部的结束符是b'\r\n\r\n'),这里就用到read_until来分离请求并设置callback了。同时会将被分离的请求数据从_read_buffer中移除。

  然后就是将callback以及他的参数(被分离的请求数据)添加至IOLoop._callbacks中,等待下一次IOLoop的执行,届时会迭代_callbacks并执行回调函数。 

  补充: tornado是水平触发,所以假如读完一次chunk后系统缓存区中依然还有数据,那么下一次的epoll.poll()依然会返回该socket。

在iostream中有一个类叫做:IOStream

有几个较为重要的属性:

def __init__():self.socket = socket           # 封装socket self.socket.setblocking(False) # 设置socket为非阻塞self.io_loop = io_loop or ioloop.IOLoop.current()    self._read_buffer = deque()    # 读缓冲self._write_buffer = deque()   # 写缓冲 self._read_callback = None     # 读到指定字节数据时,或是指定标志字符串时,需要执行的回调函数self._write_callback = None    # 发送完_write_buffer的数据时,需要执行的回调函数

有几个较为重要的方法

class IOStream(object):def read_until(self, delimiter, callback): def read_bytes(self, num_bytes, callback, streaming_callback=None): def read_until_regex(self, regex, callback): def read_until_close(self, callback, streaming_callback=None): def write(self, data, callback=None):

以上所有的方法都需要一个可选的callback参数,如果该参数为None则该方法会返回一个Future对象。

以上所有的读方法本质上都是读取该socket所发送来的数据,然后当读到指定分隔符或者标记或者条件触发的时候,停止读,然后将该分隔符以及其前面的数据作为callback(如果没有callback,则将数据设置为Future对象的result)的参数,然后将callback添加至IOLoop._callbacks中。当然其中所有的"读"操作是非阻塞的!

  像read_until  read_until_regex 这两个方法相差不大,原理都是差不多的,都是在buffer中找指定的字符或者字符样式。

  而read_bytes则是设置读取字节数,达到这些字节就会触发并运行回调函数(当然这些回调函数不是立刻运行,而是被送到ioloop中的_callbacks中),该方法主要是用来读取包含content-length或者分块传输编码的具有主体信息的请求或者响应。

  而read_until_close则是主要被用在非持久连接上,因为非持久连接响应的结束标志就是连接关闭。

read_bytes和read_until_close这两个方法都有streaming_callback这个参数,假如指定了该参数,那么只要read_buffer中有数据,则将数据作为参数调用该函数

就拿比较常见的read_until方法来说,下面是代码简化版:

    def read_until(self, delimiter, callback=None, max_bytes=None):future = self._set_read_callback(callback)     # 可能是Future对象,也可能是Noneself._read_delimiter = delimiter          # 设置分隔符self._read_max_bytes = max_bytes          # 设置最大读字节数self._try_inline_read()return future

其中_set_read_callback会根据callback是否存在返回None或者Future对象(存在返回None,否则返回一个Future实例对象)

如果我们
再来看_try_inline_read方法的简化版:

def _try_inline_read(self):"""尝试从_read_buffer中读取所需数据"""# 查看是否我们已经在之前的读操作中得到了数据self._run_streaming_callback() # 检查字符流回调,如果调用read_bytes和read_until_close并指定了streaming_callback参数就会造成这个回调pos = self._find_read_pos()       # 尝试在_read_buffer中找到分隔符的位置。找到则返回分隔符末尾所处的位置,如果不能,则返回None。if pos is not None:self._read_from_buffer(pos)returnself._check_closed()           # 检查当前IOStream是否关闭pos = self._read_to_buffer_loop()  # 从系统缓冲中读取一个chunk,检查是否含有分隔符,没有则继续读取一个chunk,合并两个chunk,再次检查是否函数分隔符…… 如果找到了分隔符,会返回分隔符末尾在_read_buffer中所处的位置if pos is not None:                # 如果找到了分隔符,self._read_from_buffer(pos)    # 将所需的数据从_read_buffer中移除,并将其作为callback的参数,然后将callback封装后添加至IOLoop._callbacks中     return# 没找到分隔符,要么关闭IOStream,要么为该socket在IOLoop中注册事件if self.closed():     self._maybe_run_close_callback()else:self._add_io_state(ioloop.IOLoop.READ)

上面的代码被我用空行分为了三部分,每一部分顺序的对应下面每一句话

分析该方法:

  1 首先在_read_buffer第一项中找分隔符,找到了就将分隔符以及其前的数据从_read_buffer中移除并将其作为参数传入回调函数,没找到就将第二项与第一项合并然后继续找……;

  2 如果在_read_buffer所有项中都没找到的话就把系统缓存中的数据读取至_read_buffer,然后合并再次查找,

  3 如果把系统缓存中的数据都取完了都还没找到,那么就等待下一次该socket发生READ事件后再找,这时的找则就是:将系统缓存中的数据读取到_read_buffer中然后找,也就是执行第2步。

来看一看这三部分分别调用了什么方法:

第一部分中的_find_read_pos以及_read_from_buffer

前者主要是在_read_buffer中查找分隔符,并返回分隔符的位置,后者则是将分隔符以及分隔符前面的所有数据从_read_buffer中取出并将其作为callback的参数,然后将callback封装后添加至IOLoop._callbacks中

来看_find_read_pos方法的简化版:

def _find_read_pos(self): # 尝试在_read_buffer中寻找分隔符。找到则返回分隔符末尾所处的位置,如果不能,则返回None。if self._read_delimiter is not None:if self._read_buffer:    # 查看_read_buffer中是否有之前未处理的数据while True:loc = self._read_buffer[0].find(self._read_delimiter) # 查找分隔符所出现的首部位置if loc != -1:     # 在_read_buffer的首项中找到了delimiter_len = len(self._read_delimiter)self._check_max_bytes(self._read_delimiter, loc + delimiter_len)return loc + delimiter_len    # 分隔符末尾的位置if len(self._read_buffer) == 1:break_double_prefix(self._read_buffer)self._check_max_bytes(self._read_delimiter, len(self._read_buffer[0]))return None

_find_read_pos

def _read_from_buffer(self, pos): # 将所需的数据从_read_buffer中移除,并将其作为callback的参数,然后将callback封装后添加至IOLoop._callbacks中 self._read_bytes = self._read_delimiter = self._read_regex = Noneself._read_partial = Falseself._run_read_callback(pos, False)

_read_from_buffer

来看_run_read_callback源码简化版:def _run_read_callback(self, size, streaming):if streaming:callback = self._streaming_callbackelse:callback = self._read_callbackself._read_callback = self._streaming_callback = Noneif self._read_future is not None:        # 这里将_read_future进行set_resultassert callback is Nonefuture = self._read_futureself._read_future = Nonefuture.set_result(self._consume(size))if callback is not None:assert (self._read_future is None) or streamingself._run_callback(callback, self._consume(size))    # 将后者作为前者的参数,然后将前者进行封装后添加至IOLoop._callbacks中

来看_consume的源码:def _consume(self, loc): # 将self._read_buffer 的首项改为 原首项[loc:] ,然后返回 原首项[:loc]if loc == 0:return b""_merge_prefix(self._read_buffer, loc)  # 将双端队列(deque)的首项调整为指定大小。self._read_buffer_size -= locreturn self._read_buffer.popleft()来看_run_callback源码简化版:def _run_callback(self, callback, *args):# 将callback封装后添加至ioloop._callbacks中def wrapper():self._pending_callbacks -= 1try:return callback(*args)finally:self._maybe_add_error_listener()with stack_context.NullContext():self._pending_callbacks += 1self.io_loop.add_callback(wrapper)    # 将callback添加至IOLoop._callbacks中

_run_read_callback

这里面还用到一个很有意思的函数:_merge_prefix ,这个函数的作用就是将deque的首项调整为指定大小

def _merge_prefix(deque, size):"""Replace the first entries in a deque of strings with a singlestring of up to size bytes.>>> d = collections.deque(['abc', 'de', 'fghi', 'j'])>>> _merge_prefix(d, 5); print(d)deque(['abcde', 'fghi', 'j'])Strings will be split as necessary to reach the desired size.>>> _merge_prefix(d, 7); print(d)deque(['abcdefg', 'hi', 'j'])>>> _merge_prefix(d, 3); print(d)deque(['abc', 'defg', 'hi', 'j'])>>> _merge_prefix(d, 100); print(d)deque(['abcdefghij'])"""if len(deque) == 1 and len(deque[0]) <= size:returnprefix = []remaining = sizewhile deque and remaining > 0:chunk = deque.popleft()if len(chunk) > remaining:deque.appendleft(chunk[remaining:])chunk = chunk[:remaining]prefix.append(chunk)remaining -= len(chunk)if prefix:deque.appendleft(type(prefix[0])().join(prefix))if not deque:deque.appendleft(b"")

_merge_prefix

第二部分的_read_to_buffer_loop

来看_read_to_buffer_loop简化版:系统缓冲中的data可能十分长,为了查找指定的字符,我们应该先读一个chunk,检查其中是否有指定的字符,若有则返回分隔符末尾所处的位置若没有则继续读第二个chunk,然后将这两个chunk合并(多字节分隔符(例如“\ r \ n”)可能跨读取缓冲区中的两个块),重复查找过程def _read_to_buffer_loop(self):try:next_find_pos = 0self._pending_callbacks += 1while not self.closed():if self._read_to_buffer() == 0:    # 从系统缓冲中读一个chunk并将其添加至_read_buffer中,然后返回chunk的大小,如果无数据则返回0breakself._run_streaming_callback()    if self._read_buffer_size >= next_find_pos:    # _read_buffer_size 表示_read_buffer的大小pos = self._find_read_pos()    # 尝试在_read_buffer中寻找分隔符。找到则返回分隔符末尾所处的位置,如果不能,则返回None。 if pos is not None:return posnext_find_pos = self._read_buffer_size * 2return self._find_read_pos()finally:self._pending_callbacks -= 1

_read_to_buffer_loop

第三部分_add_io_state,该函数和ioloop异步相关

def _add_io_state(self, state):if self.closed():    # 连接已经关闭returnif self._state is None:self._state = ioloop.IOLoop.ERROR | state    with stack_context.NullContext():self.io_loop.add_handler(self.fileno(), self._handle_events, self._state) # 为对应socket的文件描述符添加事件及其处理函数,elif not self._state & state:self._state = self._state | stateself.io_loop.update_handler(self.fileno(), self._state)# self._handle_events 是根据events选择对应的处理函数,在这里我们假设处理函数是_handle_readdef _handle_read(self):try:pos = self._read_to_buffer_loop()except UnsatisfiableReadError:raiseexcept Exception as e:gen_log.warning("error on read: %s" % e)self.close(exc_info=True)returnif pos is not None:self._read_from_buffer(pos)returnelse:self._maybe_run_close_callback()

_add_io_state

参考:

  http://www.nowamagic.net/academy/detail/13321051

转载于:https://www.cnblogs.com/MnCu8261/p/6703778.html

深入tornado中的IOStream相关推荐

  1. Linux中使用iOStream头文件,在Linux中使用gcc链接iostream.h

    我想在Linux(Linux MINT 8)中运行我的第一个C++程序.我使用gcc或g++,两者都有相同的问题:编译器找不到我要导入的库. 我怀疑我应该复制工作文件夹中的iostream.h文件(我 ...

  2. tornado中的协程是如何工作的

    转自:http://blog.csdn.net/wyx819/article/details/45420017 本文将按以下结构进行组织,说明tornado中协程的执行原理 协程定义 生成器和yiel ...

  3. tornado中log使用与logging模块的关联影响详解

    目录 [问题总结及代码] 参考一:参考点:logging基础内容了解 参考二:参考点:tornado.log 的 配置设置.日志自动分割.自定义格式化输出日志 参考三:参考点:自定义日志输出.日志分割 ...

  4. python模块介绍-Tornado:Tornado中文文档-概述

    2019独角兽企业重金招聘Python工程师标准>>> 快速链接 tornado-4.1.tar.gz, tornado最新版本 tornado源码@github 邮件列表:讨论.最 ...

  5. 在tornado中使用异步mysql操作

    在使用tornado框架进行开发的过程中,发现tornado的mysql数据库操作并不是一步的,造成了所有用户行为的堵塞.tornado本身是一个异步的框架,要求所有的操作都应该是异步的,但是数据库这 ...

  6. peewee的使用与异步peewee-async在tornado中的使用总结

    peewee是数据库ORM操作的第三方库. 文章目录 建表 保存数据 查询 更新和删除 tornado使用异步ORM库peewee-async 建表 from datetime import date ...

  7. 深入tornado中的ioLoop

    本文所剖析的tornado源码版本为4.4.2 ioloop是tornado的关键,是他的最底层. ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._insta ...

  8. 关于tornado中session的总结

    #!/usr/bin/env python# _*_ coding:utf-8 _*_ import tornado.webimport tornado.ioloop container = {} # ...

  9. tornado中数据库ORM操作(二):通过peewee-async集成到tornado中

    github地址:https://github.com/05bit/peewee-async Install Install with pip for PostgreSQL: pip install ...

最新文章

  1. LeetCode刷题记录3——237. Delete Node in a Linked List(easy)
  2. sourcesafe管理phpproj文件的补充说明(downmoon)
  3. 正确的WordPress文件权限[关闭]
  4. 给GAN一句描述,它就能按要求画画,微软CVPR新研究 | 附PyTorch代码
  5. time Machine备份时间间隔
  6. ado批量执行sql mysql_C++ 使用 ADO 批量操作数据库
  7. ChainBuilder Connect SOA平台解决方案的案例研究
  8. Oracle单行函数
  9. 一个AI设计的思考过程(旧文)
  10. python免费学校_清华大学,的python学习路线,免费分享给小白福利
  11. python输出引号内的变量值_引号中的Python变量值
  12. 双十一重磅福利来袭,拯救 “四大皆空” 的你!
  13. Oracle数据库时间戳转date类型进行判断操作
  14. Halcon 学习总结——邮票目录检测(stamp_catalogue)
  15. Hamcrest 断言
  16. 批量修改pdf文件名称的方法
  17. 如何把模糊照片变清晰把相片变高清修图#ps教程#ps学习视频
  18. torrents.php怎么下载,PT站自动收藏免费种下载|PT Add Free Torrents To Bookmark脚本js插件_ - 极光下载站...
  19. 去掉WebView中的白色背景
  20. 七剑下天山?七步搞定DB2查询优化!

热门文章

  1. 详解Struts2 Action名称的搜索顺序
  2. 试了下CommonLisp的WEB开发
  3. windows下安装docker
  4. 算法:最接近的三数之和
  5. mysql常见关键字
  6. shell实例第17讲:连续输入4个100以内的数字,统计和、乘、平均、最小和最大
  7. 如何快速学好Shell脚本?
  8. 安卓自动化测试(1)安卓自动化测试原理概念
  9. oracle sql命令行中上下左右使用
  10. Objective-C中.h、.m、.mm的区别