tornado源码分析

本源码为tornado1.0版本

源码附带例子helloworld

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.webfrom tornado.options import define, optionsdefine("port", default=8888, help="run on the given port", type=int)class MainHandler(tornado.web.RequestHandler):def get(self):self.write("Hello, world")def main():tornado.options.parse_command_line()                               # 解析配置文件application = tornado.web.Application([                            # 定义handler,并添加到app中并初始化app(r"/", MainHandler),])http_server = tornado.httpserver.HTTPServer(application)           # 将app传入server中并实例化http_server.listen(options.port)                                   # 配置监听的端口tornado.ioloop.IOLoop.instance().start()                           # 运行开始if __name__ == "__main__":main()

通过上述文件附带使用例子,可以看出tornado的主要构成就是由Application,HTTPServer,IOLoop这三个主要类进行请求的处理。
其中Application分析如下:

class Application(object):def __init__(self, handlers=None, default_host="", transforms=None,wsgi=False, **settings):if transforms is None:self.transforms = []if settings.get("gzip"):self.transforms.append(GZipContentEncoding)                         # 返回数据是否压缩self.transforms.append(ChunkedTransferEncoding)                         # chunked返回else:self.transforms = transforms                                            self.handlers = []                                                          # 处理的handlersself.named_handlers = {}self.default_host = default_host                                            self.settings = settings                                                    # 配置参数self.ui_modules = {}                                                        # ui的模块self.ui_methods = {}                                                        # ui的方法self._wsgi = wsgiself._load_ui_modules(settings.get("ui_modules", {}))                       # 加载ui模块self._load_ui_methods(settings.get("ui_methods", {}))                       # 加载ui方法if self.settings.get("static_path"):                                        # 获取静态文件的配置path = self.settings["static_path"]                                     # 获取静态文件配置的文件夹handlers = list(handlers or [])                                         # 处理的handlerstatic_url_prefix = settings.get("static_url_prefix",                   # 获取静态文件的路由前缀"/static/")handlers = [(re.escape(static_url_prefix) + r"(.*)", StaticFileHandler,         # 添加静态文件的路由,并设置静态文件的处理handlerdict(path=path)),(r"/(favicon\.ico)", StaticFileHandler, dict(path=path)),           # 网站小图片的,路由配置(r"/(robots\.txt)", StaticFileHandler, dict(path=path)),            # 网站爬虫规范文件] + handlersif handlers: self.add_handlers(".*$", handlers)                             # 将handler添加进去# Automatically reload modified modulesif self.settings.get("debug") and not wsgi:                                 # 如果文件修改自动加载import autoreloadautoreload.start()def add_handlers(self, host_pattern, host_handlers):"""Appends the given handlers to our handler list."""if not host_pattern.endswith("$"):                                          host_pattern += "$"handlers = []# The handlers with the wildcard host_pattern are a special# case - they're added in the constructor but should have lower# precedence than the more-precise handlers added later.# If a wildcard handler group exists, it should always be last# in the list, so insert new groups just before it.if self.handlers and self.handlers[-1][0].pattern == '.*$':                 # 保证最后一个url的匹配是'.*$',保证最新添加的在全部匹配之前self.handlers.insert(-1, (re.compile(host_pattern), handlers))else:self.handlers.append((re.compile(host_pattern), handlers))for spec in host_handlers:                                                  # 遍历handlesif type(spec) is type(()):                                              # 保证每个spec是元组类型assert len(spec) in (2, 3)                                          # 保证每个spec的长度在2,3之间,如果有第三个参数则第三个参数为静态文件配置pattern = spec[0]                                                   # 路由handler = spec[1]                                                   # 相应路由处理的handlerif len(spec) == 3:                                                  # 如果处理参数有三个,kwargs = spec[2]                                                # 静态文件处理的数据存入,kwargselse:kwargs = {}spec = URLSpec(pattern, handler, kwargs)                            # 将解析出来的三个参数,实例化一个URLSpec实例handlers.append(spec)                                                   # 添加到handles列表中if spec.name:                                                           # 处理有名字的handlerif spec.name in self.named_handlers:logging.warning("Multiple handlers named %s; replacing previous value",spec.name)self.named_handlers[spec.name] = specdef add_transform(self, transform_class):"""Adds the given OutputTransform to our transform list."""self.transforms.append(transform_class)                                     # 添加处理数据传输转换的类def _get_host_handlers(self, request):host = request.host.lower().split(':')[0]                                   # 获取request中的域名for pattern, handlers in self.handlers:                                     # 找出hanlders中,对应该域名的handlersif pattern.match(host):return handlers# Look for default host if not behind load balancer (for debugging)if "X-Real-Ip" not in request.headers:for pattern, handlers in self.handlers:if pattern.match(self.default_host):return handlersreturn Nonedef _load_ui_methods(self, methods):if type(methods) is types.ModuleType:self._load_ui_methods(dict((n, getattr(methods, n))for n in dir(methods)))elif isinstance(methods, list):for m in methods: self._load_ui_methods(m)else:for name, fn in methods.iteritems():if not name.startswith("_") and hasattr(fn, "__call__") \and name[0].lower() == name[0]:self.ui_methods[name] = fndef _load_ui_modules(self, modules):if type(modules) is types.ModuleType:self._load_ui_modules(dict((n, getattr(modules, n))for n in dir(modules)))elif isinstance(modules, list):for m in modules: self._load_ui_modules(m)else:assert isinstance(modules, dict)for name, cls in modules.iteritems():try:if issubclass(cls, UIModule):self.ui_modules[name] = clsexcept TypeError:passdef __call__(self, request):"""Called by HTTPServer to execute the request."""transforms = [t(request) for t in self.transforms]                          # 当有数据进来处理时,如果同时有两个处理类,两个都处理handler = None                                              args = []kwargs = {}handlers = self._get_host_handlers(request)                                 # 获取request中的,域名配置,找出与域名匹配的处理handersif not handlers:handler = RedirectHandler(request, "http://" + self.default_host + "/")else:for spec in handlers:match = spec.regex.match(request.path)                             # 匹配当前请求的urlif match:# None-safe wrapper around urllib.unquote to handle# unmatched optional groups correctlydef unquote(s):if s is None: return sreturn urllib.unquote(s)handler = spec.handler_class(self, request, **spec.kwargs)     # 找出相应的handler# Pass matched groups to the handler.  Since# match.groups() includes both named and unnamed groups,# we want to use either groups or groupdict but not both.kwargs = dict((k, unquote(v))for (k, v) in match.groupdict().iteritems())     # 解析出相应的参数if kwargs:args = []else:args = [unquote(s) for s in match.groups()]breakif not handler:handler = ErrorHandler(self, request, 404)# In debug mode, re-compile templates and reload static files on every# request so you don't need to restart to see changesif self.settings.get("debug"):if getattr(RequestHandler, "_templates", None):map(lambda loader: loader.reset(),RequestHandler._templates.values())RequestHandler._static_hashes = {}handler._execute(transforms, *args, **kwargs)                            # 执行相应的方法return handlerdef reverse_url(self, name, *args):"""Returns a URL path for handler named `name`The handler must be added to the application as a named URLSpec"""if name in self.named_handlers:return self.named_handlers[name].reverse(*args)raise KeyError("%s not found in named urls" % name)

主要功能是注册处理请求的handler,将对应的url和handler进行匹配然后处理请求,配置静态文件路径,配置传输的格式等。

HTTPServer类分析如下

class HTTPServer(object):def __init__(self, request_callback, no_keep_alive=False, io_loop=None,xheaders=False, ssl_options=None):"""Initializes the server with the given request callback.If you use pre-forking/start() instead of the listen() method tostart your server, you should not pass an IOLoop instance to thisconstructor. Each pre-forked child process will create its ownIOLoop instance after the forking process."""self.request_callback = request_callback                                  # 传入request-callback,就是application对象self.no_keep_alive = no_keep_alive                                        # 是保持连接self.io_loop = io_loop                                                    # 循环loop对象self.xheaders = xheadersself.ssl_options = ssl_options                                            # https证书文件配置self._socket = None                                                       # server的socket实例self._started = False                                                     def listen(self, port, address=""):self.bind(port, address)                                                  # 绑定端口self.start(1)                                                             # 开始def bind(self, port, address=""):assert not self._socket                                                   # 当_socket实例为空继续执行self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)       # 生成实例socketflags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD)                 # 这个句柄我在fork子进程后执行exec时就关闭flags |= fcntl.FD_CLOEXEC                                                 # 涉及到close-on-execfcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags)                  #http://blog.csdn.net/chrovery/article/details/48545531self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)        # 设置端口可重用self._socket.setblocking(0)                                               # 设置server为非阻塞self._socket.bind((address, port))                                        # 绑定端口self._socket.listen(128)                                                  # 设置最大监听数量def start(self, num_processes=1):assert not self._started                                                  # 如果已经开始就不能再次开始self._started = Trueif num_processes is None or num_processes <= 0:                           # 启动的进程数量# Use sysconf to detect the number of CPUs (cores)try:num_processes = os.sysconf("SC_NPROCESSORS_CONF")                 # 获取系统的cpu个数except ValueError:logging.error("Could not get num processors from sysconf; "       # 如果获取失败就设置成一个进程"running with one process")                         # http://blog.sina.com.cn/s/blog_9b0604b40101g049.htmlnum_processes = 1if num_processes > 1 and ioloop.IOLoop.initialized():                     # 如果进程数大于1,但是ioloop实力已经存在则不能创建多个,只能创建一个logging.error("Cannot run in multiple processes: IOLoop instance ""has already been initialized. You cannot call ""IOLoop.instance() before calling start()")num_processes = 1if num_processes > 1:                                                     # 如果数量大于1logging.info("Pre-forking %d server processes", num_processes)     for i in range(num_processes):                        if os.fork() == 0:self.io_loop = ioloop.IOLoop.instance()                       # 每生产一个进程就生成一个ioloop实例self.io_loop.add_handler(self._socket.fileno(), self._handle_events,ioloop.IOLoop.READ)                                       # 并将当前socket添加到监听读事件列表中returnos.waitpid(-1, 0)                                                     # 如果子进程杀死,啥也不做,else:if not self.io_loop:                                                  # 如果只有一个实例self.io_loop = ioloop.IOLoop.instance()                           self.io_loop.add_handler(self._socket.fileno(),self._handle_events,ioloop.IOLoop.READ)def stop(self):self.io_loop.remove_handler(self._socket.fileno())self._socket.close()def _handle_events(self, fd, events):while True:                                                               # socket注册的读事件try:connection, address = self._socket.accept()                       # 接受连接except socket.error, e:if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):returnraiseif self.ssl_options is not None:                                      # 如果配置了sslassert ssl, "Python 2.6+ and OpenSSL required for SSL"connection = ssl.wrap_socket(connection, server_side=True, **self.ssl_options)             # 对连接进行ssl处理try:stream = iostream.IOStream(connection, io_loop=self.io_loop)      # 调用IOStream进行处理HTTPConnection(stream, address, self.request_callback,            # 实例化处理的streamself.no_keep_alive, self.xheaders)except:logging.error("Error in connection callback", exc_info=True)

HTTPServer类在默认情况下启动就启动一个进程,初始化监听的端口,生成相应的server并将server加入到ioloop的监听列表中,监听处理的时间。
当server接受到请求的时候的处理:

                stream = iostream.IOStream(connection, io_loop=self.io_loop)      # 调用IOStream进行处理HTTPConnection(stream, address, self.request_callback,            # 实例化处理的streamself.no_keep_alive, self.xheaders)

其中IOStream类分析如下:

class IOStream(object):def __init__(self, socket, io_loop=None, max_buffer_size=104857600,read_chunk_size=4096):self.socket = socket                                                # 接受的新连接self.socket.setblocking(False)                                      # 连接设置为非阻塞self.io_loop = io_loop or ioloop.IOLoop.instance()                  # io_loop实例self.max_buffer_size = max_buffer_size                              # 最大缓存区大小self.read_chunk_size = read_chunk_size                              # 读的最大缓存区大小self._read_buffer = ""                                              # 读到的数据self._write_buffer = ""                                             # 写出的数据self._read_delimiter = None                                         # 读的终止符self._read_bytes = None                                             self._read_callback = Noneself._write_callback = Noneself._close_callback = Noneself._state = self.io_loop.ERRORself.io_loop.add_handler(self.socket.fileno(), self._handle_events, self._state)        # 注册到io_loop的错误事件中, 注册self._handle_events函数该函数可以处理可读可写出错事件def read_until(self, delimiter, callback):"""Call callback when we read the given delimiter."""assert not self._read_callback, "Already reading"loc = self._read_buffer.find(delimiter)                            # 在read_buffer中查找该 delimiterif loc != -1:                                                      # 如果找到该标志self._run_callback(callback, self._consume(loc + len(delimiter)))     # 找到就消费掉returnself._check_closed()                                               # 检查是否关闭self._read_delimiter = delimiter                                   # 设置读取的终止符self._read_callback = callback                                     # request_callback函数self._add_io_state(self.io_loop.READ)                              # 在io_loop中注册读方法def read_bytes(self, num_bytes, callback):"""Call callback when we read the given number of bytes."""assert not self._read_callback, "Already reading"if len(self._read_buffer) >= num_bytes:callback(self._consume(num_bytes))returnself._check_closed()self._read_bytes = num_bytesself._read_callback = callbackself._add_io_state(self.io_loop.READ)def write(self, data, callback=None):"""Write the given data to this stream.If callback is given, we call it when all of the buffered writedata has been successfully written to the stream. If there waspreviously buffered write data and an old write callback, thatcallback is simply overwritten with this new callback."""self._check_closed()self._write_buffer += data                                              # 将要发送的数据添加到缓冲区中self._add_io_state(self.io_loop.WRITE)                                  # 更改事件为写事件self._write_callback = callbackdef set_close_callback(self, callback):"""Call the given callback when the stream is closed."""self._close_callback = callbackdef close(self):"""Close this stream."""if self.socket is not None:self.io_loop.remove_handler(self.socket.fileno())                       # 移除该事件监听self.socket.close()                                                     # 关闭连接self.socket = None                                                      # 设置为空if self._close_callback:                                                # 如果注册了关闭回调函数则执行该回调函数self._run_callback(self._close_callback)def reading(self):"""Returns true if we are currently reading from the stream."""return self._read_callback is not Nonedef writing(self):"""Returns true if we are currently writing to the stream."""return len(self._write_buffer) > 0                                          # 如果写出缓冲区里面有数据def closed(self):return self.socket is Nonedef _handle_events(self, fd, events):if not self.socket:logging.warning("Got events for closed stream %d", fd)returnif events & self.io_loop.READ:self._handle_read()                                                     # 处理读事件if not self.socket:                                                         # 如果socket没有则停止执行returnif events & self.io_loop.WRITE:self._handle_write()                                                    # 处理写状态if not self.socket:returnif events & self.io_loop.ERROR:                                             # 处理出错状态self.close()returnstate = self.io_loop.ERROR                                                  # 更新状态if self._read_delimiter or self._read_bytes:                                # 如果有读取的终止符或者读取的字节数有设置,则继续设置为读状态state |= self.io_loop.READif self._write_buffer:                                                      # 如果写出的缓冲区有数据则写出state |= self.io_loop.WRITEif state != self._state:                                                    # 如果状态有更新self._state = stateself.io_loop.update_handler(self.socket.fileno(), self._state)          # 更新io_loop中的监听事件def _run_callback(self, callback, *args, **kwargs):try:callback(*args, **kwargs)                                               # 调用注册事件的处理函数except:# Close the socket on an uncaught exception from a user callback# (It would eventually get closed when the socket object is# gc'd, but we don't want to rely on gc happening before we# run out of file descriptors)self.close()# Re-raise the exception so that IOLoop.handle_callback_exception# can see it and log the errorraisedef _handle_read(self):try:chunk = self.socket.recv(self.read_chunk_size)                          # 读取read_chunk_size大小的数据except socket.error, e:if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):returnelse:logging.warning("Read error on %d: %s",self.socket.fileno(), e)self.close()returnif not chunk:                                                               # 如果没有数据则关闭self.close()returnself._read_buffer += chunk                                                  # 将读入的数据添加到读入缓冲区if len(self._read_buffer) >= self.max_buffer_size:                          # 如果已经接受的数据大小已经超过设置的缓冲区大小则放弃该请求处理logging.error("Reached maximum read buffer size")self.close()returnif self._read_bytes:                                                        # 如果设置了读多少字节if len(self._read_buffer) >= self._read_bytes:                          # 如果缓冲区数据大于设置的读字节数num_bytes = self._read_bytes                           callback = self._read_callback                                  self._read_callback = Noneself._read_bytes = Noneself._run_callback(callback, self._consume(num_bytes))              # 用回调函数处理相应字节数的数据elif self._read_delimiter:                                                  # 如果设置了读取终止位符号loc = self._read_buffer.find(self._read_delimiter)                      # 从缓冲区中找该标志位if loc != -1:                                                           # 如果找到callback = self._read_callback                                      delimiter_len = len(self._read_delimiter) self._read_callback = Noneself._read_delimiter = Noneself._run_callback(callback,self._consume(loc + delimiter_len))              # 用注册的回调函数处理在该终止位之前的所有数据def _handle_write(self):while self._write_buffer:try:num_bytes = self.socket.send(self._write_buffer)                    # 发送相应的数据给客户端self._write_buffer = self._write_buffer[num_bytes:]                 # 将_write_buffer更新为剩余的数据except socket.error, e:if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):breakelse:logging.warning("Write error on %d: %s",self.socket.fileno(), e)self.close()returnif not self._write_buffer and self._write_callback:                         # 如果写缓冲区已经清空,并且有写完注册函数callback = self._write_callback                                     self._write_callback = Noneself._run_callback(callback)                                            # 执行回调函数def _consume(self, loc):result = self._read_buffer[:loc]                                           # 读取相应loc的数据self._read_buffer = self._read_buffer[loc:]                                # 将_read_buffer设置成剩余还没读取的数据return resultdef _check_closed(self):if not self.socket:raise IOError("Stream is closed")def _add_io_state(self, state):if not self._state & state:self._state = self._state | stateself.io_loop.update_handler(self.socket.fileno(), self._state)  # 更新io_loop中相应的状态

该类的主要功能就是将请求处理的时候,通过新接受的socket,然后接受和发送处理的数据,更改socket的状态。

接着就通过HTTPConnection来处理:

class HTTPConnection(object):"""Handles a connection to an HTTP client, executing HTTP requests.We parse HTTP headers and bodies, and execute the request callbackuntil the HTTP conection is closed."""def __init__(self, stream, address, request_callback, no_keep_alive=False,xheaders=False):self.stream = stream                                                      # 实例化的stream实例self.address = address                                                    # 接受的地址self.request_callback = request_callback                                  # request_callback函数self.no_keep_alive = no_keep_alive                                        # 是否继续存活self.xheaders = xheaders                                                    self._request = Noneself._request_finished = Falseself.stream.read_until("\r\n\r\n", self._on_headers)                      # 调用stream实例中的方法,读取出请求的头部信息def write(self, chunk):assert self._request, "Request closed"if not self.stream.closed():self.stream.write(chunk, self._on_write_complete)                     # 调用stream中的写方法,写完调用_on_write_complete方法def finish(self):assert self._request, "Request closed"self._request_finished = Trueif not self.stream.writing():                                             # 如果写出缓冲区中已经没有数据self._finish_request()                                                def _on_write_complete(self):if self._request_finished:self._finish_request()def _finish_request(self):if self.no_keep_alive:                                                     # 如果不继续连接为真disconnect = True                                                      # 则断开连接else:connection_header = self._request.headers.get("Connection")            # 获取请求头部信息中的Connectionif self._request.supports_http_1_1():                                  # 支持http1.1disconnect = connection_header == "close"                          # 如果连接设置成close则关闭连接elif ("Content-Length" in self._request.headers                        # 如果不支持http1.1,检查请求方法or self._request.method in ("HEAD", "GET")): disconnect = connection_header != "Keep-Alive"                     # 判断是否是keep-aliveelse:disconnect = True                                                  # 断开连接为trueself._request = None                 self._request_finished = Falseif disconnect:self.stream.close()                                                    # 如果断开连接为真,则关闭连接returnself.stream.read_until("\r\n\r\n", self._on_headers)                       # 如果继续处理请求则,注册读事件def _on_headers(self, data):eol = data.find("\r\n")                                                    # 找到第一个请求行start_line = data[:eol]                                                    # 获取第一行的内容method, uri, version = start_line.split(" ")                               # 获取方法,uri, http版本号if not version.startswith("HTTP/"):raise Exception("Malformed HTTP version in HTTP Request-Line")headers = httputil.HTTPHeaders.parse(data[eol:])                           # 将剩余的数据,传入并解析http头数据self._request = HTTPRequest(connection=self, method=method, uri=uri, version=version,headers=headers, remote_ip=self.address[0])                            # 实例化出一个request对象content_length = headers.get("Content-Length")                             # 如果头部数据中有长度信息if content_length:                                                          content_length = int(content_length)                                   if content_length > self.stream.max_buffer_size:                       # 如果长度大于设置的接受缓冲区大小则报错raise Exception("Content-Length too long")if headers.get("Expect") == "100-continue":                            # 如果获取是继续传送self.stream.write("HTTP/1.1 100 (Continue)\r\n\r\n")               # 直接发送该报文给客户端self.stream.read_bytes(content_length, self._on_request_body)          # 读取相应字节数的数据,读完后设置_on_request_body处理该数据returnself.request_callback(self._request)                                       # 执行application方法def _on_request_body(self, data):self._request.body = data                                                   content_type = self._request.headers.get("Content-Type", "")               # 获取提交数据的类型if self._request.method == "POST":                                         # 如果提交方法为postif content_type.startswith("application/x-www-form-urlencoded"):       # form提交数据arguments = cgi.parse_qs(self._request.body)for name, values in arguments.iteritems():values = [v for v in values if v]if values:self._request.arguments.setdefault(name, []).extend(values)elif content_type.startswith("multipart/form-data"):                   # 如果form提交数据并且里面是多个不同类型数据上传if 'boundary=' in content_type:boundary = content_type.split('boundary=',1)[1]if boundary: self._parse_mime_body(boundary, data)             #通过boundary解析出相应数据else:logging.warning("Invalid multipart/form-data")self.request_callback(self._request)                                      # 执行application方法def _parse_mime_body(self, boundary, data):# The standard allows for the boundary to be quoted in the header,# although it's rare (it happens at least for google app engine# xmpp).  I think we're also supposed to handle backslash-escapes# here but I'll save that until we see a client that uses them# in the wild.if boundary.startswith('"') and boundary.endswith('"'):                    # 解析每个boundary边界boundary = boundary[1:-1]if data.endswith("\r\n"):footer_length = len(boundary) + 6else:footer_length = len(boundary) + 4parts = data[:-footer_length].split("--" + boundary + "\r\n")for part in parts:if not part: continueeoh = part.find("\r\n\r\n")if eoh == -1:logging.warning("multipart/form-data missing headers")continueheaders = httputil.HTTPHeaders.parse(part[:eoh])name_header = headers.get("Content-Disposition", "")if not name_header.startswith("form-data;") or \not part.endswith("\r\n"):logging.warning("Invalid multipart/form-data")continuevalue = part[eoh + 4:-2]name_values = {}for name_part in name_header[10:].split(";"):name, name_value = name_part.strip().split("=", 1)name_values[name] = name_value.strip('"').decode("utf-8")if not name_values.get("name"):logging.warning("multipart/form-data value missing name")continuename = name_values["name"]if name_values.get("filename"):ctype = headers.get("Content-Type", "application/unknown")self._request.files.setdefault(name, []).append(dict(filename=name_values["filename"], body=value,content_type=ctype))else:self._request.arguments.setdefault(name, []).append(value)

先在ioloop中注册注册该socket的读事件,第一次只读取请求的头部数据,并解析头部数据,根据请求的方法再判断是否有提交的内容如果有提交的内容就解析该提交的内容,如果没有就解析头部信息即可。
在解析完成请求后,通过HTTPRequest类来将解析的数据进行封装。

class HTTPRequest(object):"""A single HTTP request.GET/POST arguments are available in the arguments property, whichmaps arguments names to lists of values (to support multiple valuesfor individual names). Names and values are both unicode always.File uploads are available in the files property, which maps filenames to list of files. Each file is a dictionary of the form{"filename":..., "content_type":..., "body":...}. The content_typecomes from the provided HTTP header and should not be trustedoutright given that it can be easily forged.An HTTP request is attached to a single HTTP connection, which canbe accessed through the "connection" attribute. Since connectionsare typically kept open in HTTP/1.1, multiple requests can be handledsequentially on a single connection."""def __init__(self, method, uri, version="HTTP/1.0", headers=None,body=None, remote_ip=None, protocol=None, host=None,files=None, connection=None):self.method = method                                                        # 方法self.uri = uri                                                              # uriself.version = version                                                      # http版本self.headers = headers or httputil.HTTPHeaders()                            # 头部数据self.body = body or ""                                                      # 提交内容if connection and connection.xheaders:# Squid uses X-Forwarded-For, others use X-Real-Ipself.remote_ip = self.headers.get("X-Real-Ip", self.headers.get("X-Forwarded-For", remote_ip))        # 获取远端访问ip,该情况是出现代理self.protocol = self.headers.get("X-Scheme", protocol) or "http"else:self.remote_ip = remote_ip                                              # 获取远端ipself.protocol = protocol or "http"                                      # 协议self.host = host or self.headers.get("Host") or "127.0.0.1"                 # 如果没有ip 则默认本机self.files = files or {}                                                    # 文件self.connection = connection                                                # 连接实例self._start_time = time.time()                                              # 开始处理时间self._finish_time = None                                                    # 完成时间scheme, netloc, path, query, fragment = urlparse.urlsplit(uri)              # 解析出uri上面的参数self.path = path                                                            # http://www.cnblogs.com/cemaster/p/6435711.htmlself.query = queryarguments = cgi.parse_qs(query)self.arguments = {}for name, values in arguments.iteritems():values = [v for v in values if v]if values: self.arguments[name] = values                                # 如果解析出数据则,添加到self.arguments字典中def supports_http_1_1(self):"""Returns True if this request supports HTTP/1.1 semantics"""return self.version == "HTTP/1.1"def write(self, chunk):"""Writes the given chunk to the response stream."""assert isinstance(chunk, str)self.connection.write(chunk)                                                # 写数据def finish(self):"""Finishes this HTTP request on the open connection."""self.connection.finish()                                                    # 结束该次请求self._finish_time = time.time()                                             # 完成时间def full_url(self):"""Reconstructs the full URL for this request."""return self.protocol + "://" + self.host + self.uri                         # 该次请求完整的urldef request_time(self):"""Returns the amount of time it took for this request to execute."""if self._finish_time is None:return time.time() - self._start_timeelse:return self._finish_time - self._start_time                             # 返回该次请求处理的时间def __repr__(self):attrs = ("protocol", "host", "method", "uri", "version", "remote_ip","remote_ip", "body")args = ", ".join(["%s=%r" % (n, getattr(self, n)) for n in attrs])return "%s(%s, headers=%s)" % (self.__class__.__name__, args, dict(self.headers))

接受请求处理数据的流程已经分析完成,接下来就分析ioloop的处理,来分析数据的发送出去的过程。

class IOLoop(object):# Constants from the epoll module_EPOLLIN = 0x001_EPOLLPRI = 0x002_EPOLLOUT = 0x004_EPOLLERR = 0x008_EPOLLHUP = 0x010_EPOLLRDHUP = 0x2000_EPOLLONESHOT = (1 << 30)_EPOLLET = (1 << 31)# Our events map exactly to the epoll eventsNONE = 0READ = _EPOLLINWRITE = _EPOLLOUTERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUPdef __init__(self, impl=None):self._impl = impl or _poll()if hasattr(self._impl, 'fileno'):self._set_close_exec(self._impl.fileno())self._handlers = {}self._events = {}self._callbacks = set()self._timeouts = []self._running = Falseself._stopped = Falseself._blocking_log_threshold = None# Create a pipe that we send bogus data to when we want to wake# the I/O loop when it is idleif os.name != 'nt':r, w = os.pipe()                                                    # 打开管道操作self._set_nonblocking(r)self._set_nonblocking(w)self._set_close_exec(r)self._set_close_exec(w)self._waker_reader = os.fdopen(r, "r", 0)self._waker_writer = os.fdopen(w, "w", 0)else:self._waker_reader = self._waker_writer = win32_support.Pipe()r = self._waker_writer.reader_fdself.add_handler(r, self._read_waker, self.READ)                        # 将管道的读句柄加入读事件@classmethoddef instance(cls):"""Returns a global IOLoop instance.Most single-threaded applications have a single, global IOLoop.Use this method instead of passing around IOLoop instancesthroughout your code.A common pattern for classes that depend on IOLoops is to usea default argument to enable programs with multiple IOLoopsbut not require the argument for simpler applications:class MyClass(object):def __init__(self, io_loop=None):self.io_loop = io_loop or IOLoop.instance()"""if not hasattr(cls, "_instance"):                                       # 实现的单例模式cls._instance = cls()return cls._instance@classmethoddef initialized(cls):return hasattr(cls, "_instance")                                        # 判断是否有_instance属性def add_handler(self, fd, handler, events):"""Registers the given handler to receive the given events for fd."""self._handlers[fd] = handler                                            # 保存该fd对应的handlerself._impl.register(fd, events | self.ERROR)                            # 注册事件def update_handler(self, fd, events):"""Changes the events we listen for fd."""self._impl.modify(fd, events | self.ERROR)                              # 更新fd的注册事件def remove_handler(self, fd):"""Stop listening for events on fd."""self._handlers.pop(fd, None)                                        # 移除监听列表该文件描述符self._events.pop(fd, None)                                          # 移除事件列表中该事件的处理函数try:self._impl.unregister(fd)                                       # 取消该fd的监听事件except (OSError, IOError):logging.debug("Error deleting fd from IOLoop", exc_info=True)def set_blocking_log_threshold(self, s):"""Logs a stack trace if the ioloop is blocked for more than s seconds.Pass None to disable.  Requires python 2.6 on a unixy platform."""if not hasattr(signal, "setitimer"):logging.error("set_blocking_log_threshold requires a signal module ""with the setitimer method")returnself._blocking_log_threshold = sif s is not None:signal.signal(signal.SIGALRM, self._handle_alarm)def _handle_alarm(self, signal, frame):logging.warning('IOLoop blocked for %f seconds in\n%s',self._blocking_log_threshold,''.join(traceback.format_stack(frame)))def start(self):"""Starts the I/O loop.The loop will run until one of the I/O handlers calls stop(), whichwill make the loop stop after the current event iteration completes."""if self._stopped:self._stopped = Falsereturnself._running = Truewhile True:                                                             # 开始运行# Never use an infinite timeout here - it can stall epollpoll_timeout = 0.2# Prevent IO event starvation by delaying new callbacks# to the next iteration of the event loop.callbacks = list(self._callbacks)                                   # 回调函数列表for callback in callbacks:# A callback can add or remove other callbacksif callback in self._callbacks:      self._callbacks.remove(callback)self._run_callback(callback)                                # 如果有回调函数就执行该回调函数if self._callbacks:                                                 # 如果有回调函数则优先执行该函数poll_timeout = 0.0if self._timeouts:                                                  # 如果设置了过期执行回调则执行now = time.time()while self._timeouts and self._timeouts[0].deadline <= now:     # 如果设置的时间没有到期timeout = self._timeouts.pop(0)                             # 找出该事件self._run_callback(timeout.callback)                        # 执行对应的回调方法if self._timeouts:milliseconds = self._timeouts[0].deadline - now             poll_timeout = min(milliseconds, poll_timeout)              # 判断该时间后回调执行完成后的时间,选择最小的时间作为轮训时间if not self._running:                                               # 如果没有运行则退出breakif self._blocking_log_threshold is not None:# clear alarm so it doesn't fire while poll is waiting for# events.signal.setitimer(signal.ITIMER_REAL, 0, 0)try:event_pairs = self._impl.poll(poll_timeout)                     # 轮训事件except Exception, e:# Depending on python version and IOLoop implementation,# different exception types may be thrown and there are# two ways EINTR might be signaled:# * e.errno == errno.EINTR# * e.args is like (errno.EINTR, 'Interrupted system call')if (getattr(e, 'errno') == errno.EINTR or(isinstance(getattr(e, 'args'), tuple) andlen(e.args) == 2 and e.args[0] == errno.EINTR)):logging.warning("Interrupted system call", exc_info=1)continueelse:raiseif self._blocking_log_threshold is not None:signal.setitimer(signal.ITIMER_REAL,self._blocking_log_threshold, 0)# Pop one fd at a time from the set of pending fds and run# its handler. Since that handler may perform actions on# other file descriptors, there may be reentrant calls to# this IOLoop that update self._eventsself._events.update(event_pairs)                                    # 更新未处理时间到列表中while self._events:fd, events = self._events.popitem()                             # 处理第一个事件try:self._handlers[fd](fd, events)                              # 处理对应连接的回调函数 对应iostream中的_handle_events方法except (KeyboardInterrupt, SystemExit):raiseexcept (OSError, IOError), e:if e[0] == errno.EPIPE:# Happens when the client closes the connectionpasselse:logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)except:logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)# reset the stopped flag so another start/stop pair can be issuedself._stopped = Falseif self._blocking_log_threshold is not None:signal.setitimer(signal.ITIMER_REAL, 0, 0)def stop(self):"""Stop the loop after the current event loop iteration is complete.If the event loop is not currently running, the next call to start()will return immediately.To use asynchronous methods from otherwise-synchronous code (such asunit tests), you can start and stop the event loop like this:ioloop = IOLoop()async_method(ioloop=ioloop, callback=ioloop.stop)ioloop.start()ioloop.start() will return after async_method has run its callback,whether that callback was invoked before or after ioloop.start."""self._running = Falseself._stopped = Trueself._wake()def running(self):"""Returns true if this IOLoop is currently running."""return self._runningdef add_timeout(self, deadline, callback):"""Calls the given callback at the time deadline from the I/O loop."""timeout = _Timeout(deadline, callback)                                      # 添加过期时间执行的回调函数bisect.insort(self._timeouts, timeout)                                      # 把该实例添加到列表中return timeoutdef remove_timeout(self, timeout):self._timeouts.remove(timeout)                                              # 移除该过期的事件def add_callback(self, callback):"""Calls the given callback on the next I/O loop iteration."""self._callbacks.add(callback)                                               # 添加回调函数self._wake()def remove_callback(self, callback):"""Removes the given callback from the next I/O loop iteration."""self._callbacks.remove(callback)                                            # 移除回调函数def _wake(self):try:self._waker_writer.write("x")except IOError:passdef _run_callback(self, callback):try:callback()                                                              # 执行回调函数except (KeyboardInterrupt, SystemExit):raiseexcept:self.handle_callback_exception(callback)def handle_callback_exception(self, callback):"""This method is called whenever a callback run by the IOLoopthrows an exception.By default simply logs the exception as an error.  Subclassesmay override this method to customize reporting of exceptions.The exception itself is not passed explicitly, but is availablein sys.exc_info."""logging.error("Exception in callback %r", callback, exc_info=True)def _read_waker(self, fd, events):try:while True:self._waker_reader.read()except IOError:passdef _set_nonblocking(self, fd):flags = fcntl.fcntl(fd, fcntl.F_GETFL)fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)                     # 设置成非阻塞方式def _set_close_exec(self, fd):flags = fcntl.fcntl(fd, fcntl.F_GETFD)fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)                  # 设置成在子进程执行就关闭设置class _Timeout(object):"""An IOLoop timeout, a UNIX timestamp and a callback"""# Reduce memory overhead when there are lots of pending callbacks__slots__ = ['deadline', 'callback']def __init__(self, deadline, callback):self.deadline = deadlineself.callback = callbackdef __cmp__(self, other):return cmp((self.deadline, id(self.callback)),(other.deadline, id(other.callback)))                          # 比较最后需要死亡的时间

ioloop的轮训,接受处理的请求,通过注册回调函数处理相应函数。

当在HTTPConnection类中的_on_headers或者_on_request_body最后一行执行

 self.request_callback(self._request) 

时就调用了Application类中的call方法:

    def __call__(self, request):"""Called by HTTPServer to execute the request."""transforms = [t(request) for t in self.transforms]                          # 当有数据进来处理时,如果同时有两个处理类,两个都处理handler = None                                              args = []kwargs = {}handlers = self._get_host_handlers(request)                                 # 获取request中的,域名配置,找出与域名匹配的处理handersif not handlers:handler = RedirectHandler(request, "http://" + self.default_host + "/")else:for spec in handlers:match = spec.regex.match(request.path)                             # 匹配当前请求的urlif match:# None-safe wrapper around urllib.unquote to handle# unmatched optional groups correctlydef unquote(s):if s is None: return sreturn urllib.unquote(s)handler = spec.handler_class(self, request, **spec.kwargs)     # 找出相应的handler# Pass matched groups to the handler.  Since# match.groups() includes both named and unnamed groups,# we want to use either groups or groupdict but not both.kwargs = dict((k, unquote(v))for (k, v) in match.groupdict().iteritems())     # 解析出相应的参数if kwargs:args = []else:args = [unquote(s) for s in match.groups()]breakif not handler:handler = ErrorHandler(self, request, 404)# In debug mode, re-compile templates and reload static files on every# request so you don't need to restart to see changesif self.settings.get("debug"):if getattr(RequestHandler, "_templates", None):map(lambda loader: loader.reset(),RequestHandler._templates.values())RequestHandler._static_hashes = {}handler._execute(transforms, *args, **kwargs)                            # 执行相应的方法return handler

执行到

handler._execute(transforms, *args, **kwargs)

时,通过uri的匹配找到相应的handler,然后调用handler._execute方法来执行。
我们分析一下handler类,分析如下:

class RequestHandler(object):                                                        # 处理handler"""Subclass this class and define get() or post() to make a handler.If you want to support more methods than the standard GET/HEAD/POST, youshould override the class variable SUPPORTED_METHODS in yourRequestHandler class."""SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT")def __init__(self, application, request, transforms=None):self.application = applicationself.request = requestself._headers_written = Falseself._finished = Falseself._auto_finish = Trueself._transforms = transforms or []self.ui = _O((n, self._ui_method(m)) for n, m inapplication.ui_methods.iteritems())self.ui["modules"] = _O((n, self._ui_module(n, m)) for n, m inapplication.ui_modules.iteritems())self.clear()# Check since connection is not available in WSGIif hasattr(self.request, "connection"):self.request.connection.stream.set_close_callback(self.on_connection_close)@propertydef settings(self):return self.application.settingsdef head(self, *args, **kwargs):raise HTTPError(405)def get(self, *args, **kwargs):raise HTTPError(405)def post(self, *args, **kwargs):raise HTTPError(405)def delete(self, *args, **kwargs):raise HTTPError(405)def put(self, *args, **kwargs):raise HTTPError(405)def prepare(self):"""Called before the actual handler method.Useful to override in a handler if you want a common bottleneck forall of your requests."""passdef on_connection_close(self):"""Called in async handlers if the client closed the connection.You may override this to clean up resources associated withlong-lived connections.Note that the select()-based implementation of IOLoop does not detectclosed connections and so this method will not be called untilyou try (and fail) to produce some output.  The epoll- and kqueue-based implementations should detect closed connections even whilethe request is idle."""passdef clear(self):"""Resets all headers and content for this response."""self._headers = {"Server": "TornadoServer/1.0","Content-Type": "text/html; charset=UTF-8",}if not self.request.supports_http_1_1():if self.request.headers.get("Connection") == "Keep-Alive":self.set_header("Connection", "Keep-Alive")self._write_buffer = []self._status_code = 200def set_status(self, status_code):"""Sets the status code for our response."""assert status_code in httplib.responsesself._status_code = status_code                                         # 设置响应状态码def set_header(self, name, value):"""Sets the given response header name and value.If a datetime is given, we automatically format it according to theHTTP specification. If the value is not a string, we convert it toa string. All header values are then encoded as UTF-8."""if isinstance(value, datetime.datetime):t = calendar.timegm(value.utctimetuple())                          # 转换时间格式value = email.utils.formatdate(t, localtime=False, usegmt=True)elif isinstance(value, int) or isinstance(value, long):value = str(value)                                                 # 转换数字类型else:value = _utf8(value)# If \n is allowed into the header, it is possible to inject# additional headers or split the request. Also cap length to# prevent obviously erroneous values.safe_value = re.sub(r"[\x00-\x1f]", " ", value)[:4000]            # 确保头部没有空格等影响http协议解析的字符if safe_value != value:raise ValueError("Unsafe header value %r", value)self._headers[name] = value                                           # 存入数据_ARG_DEFAULT = []def get_argument(self, name, default=_ARG_DEFAULT, strip=True):"""Returns the value of the argument with the given name.If default is not provided, the argument is considered to berequired, and we throw an HTTP 404 exception if it is missing.If the argument appears in the url more than once, we return thelast value.The returned value is always unicode."""args = self.get_arguments(name, strip=strip)if not args:if default is self._ARG_DEFAULT:raise HTTPError(404, "Missing argument %s" % name)return defaultreturn args[-1]def get_arguments(self, name, strip=True):"""Returns a list of the arguments with the given name.If the argument is not present, returns an empty list.The returned values are always unicode."""values = self.request.arguments.get(name, [])# Get rid of any weird control charsvalues = [re.sub(r"[\x00-\x08\x0e-\x1f]", " ", x) for x in values]values = [_unicode(x) for x in values]if strip:values = [x.strip() for x in values]return values@propertydef cookies(self):"""A dictionary of Cookie.Morsel objects."""if not hasattr(self, "_cookies"):self._cookies = Cookie.BaseCookie()                                         # 设置存储cookieif "Cookie" in self.request.headers:                                        # 如果头部文件中有Cookietry:self._cookies.load(self.request.headers["Cookie"])                  # 将头部信息中的cookie信息保存在请求handler中except:self.clear_all_cookies()return self._cookiesdef get_cookie(self, name, default=None):"""Gets the value of the cookie with the given name, else default."""if name in self.cookies:return self.cookies[name].valuereturn defaultdef set_cookie(self, name, value, domain=None, expires=None, path="/",expires_days=None, **kwargs):"""Sets the given cookie name/value with the given options.Additional keyword arguments are set on the Cookie.Morseldirectly.See http://docs.python.org/library/cookie.html#morsel-objectsfor available attributes."""name = _utf8(name)                                                  # 设置cookievalue = _utf8(value)if re.search(r"[\x00-\x20]", name + value):# Don't let us accidentally inject bad stuffraise ValueError("Invalid cookie %r: %r" % (name, value))if not hasattr(self, "_new_cookies"):                               # 如果没有_new_cookies则设置为空列表self._new_cookies = []new_cookie = Cookie.BaseCookie()self._new_cookies.append(new_cookie)                                # 添加到_new_cookies列表中new_cookie[name] = valueif domain:new_cookie[name]["domain"] = domain                             # 设置域名if expires_days is not None and not expires:                        # 过期时间expires = datetime.datetime.utcnow() + datetime.timedelta(days=expires_days)if expires:timestamp = calendar.timegm(expires.utctimetuple())new_cookie[name]["expires"] = email.utils.formatdate(timestamp, localtime=False, usegmt=True)if path:                                                            # 路径new_cookie[name]["path"] = pathfor k, v in kwargs.iteritems():new_cookie[name][k] = v                                         # 如果cookie中有多个值存入def clear_cookie(self, name, path="/", domain=None):"""Deletes the cookie with the given name."""expires = datetime.datetime.utcnow() - datetime.timedelta(days=365)self.set_cookie(name, value="", path=path, expires=expires,domain=domain)                                      # 将cookie的时间设置成负数则该cookie过期def clear_all_cookies(self):"""Deletes all the cookies the user sent with this request."""for name in self.cookies.iterkeys():self.clear_cookie(name)                                         # 清除所有的cookiedef set_secure_cookie(self, name, value, expires_days=30, **kwargs):"""Signs and timestamps a cookie so it cannot be forged.You must specify the 'cookie_secret' setting in your Applicationto use this method. It should be a long, random sequence of bytesto be used as the HMAC secret for the signature.To read a cookie set with this method, use get_secure_cookie()."""timestamp = str(int(time.time()))value = base64.b64encode(value)                                    # 设置安全的cookie,base64转码内容signature = self._cookie_signature(name, value, timestamp)         # cookie签名value = "|".join([value, timestamp, signature])                    # 将值进行拼接,设置到cookie中self.set_cookie(name, value, expires_days=expires_days, **kwargs)def get_secure_cookie(self, name, include_name=True, value=None):"""Returns the given signed cookie if it validates, or None.In older versions of Tornado (0.1 and 0.2), we did not include thename of the cookie in the cookie signature. To read these old-stylecookies, pass include_name=False to this method. Otherwise, allattempts to read old-style cookies will fail (and you may log allyour users out whose cookies were written with a previous Tornadoversion)."""if value is None: value = self.get_cookie(name)                         # 解析加密的cookieif not value: return Noneparts = value.split("|")if len(parts) != 3: return Noneif include_name:signature = self._cookie_signature(name, parts[0], parts[1])else:signature = self._cookie_signature(parts[0], parts[1])if not _time_independent_equals(parts[2], signature):logging.warning("Invalid cookie signature %r", value)return Nonetimestamp = int(parts[1])if timestamp < time.time() - 31 * 86400:logging.warning("Expired cookie %r", value)return Nonetry:return base64.b64decode(parts[0])                                   # 返回解密后的数据except:return Nonedef _cookie_signature(self, *parts):self.require_setting("cookie_secret", "secure cookies")hash = hmac.new(self.application.settings["cookie_secret"],digestmod=hashlib.sha1)                        # 加密cookiefor part in parts: hash.update(part)                           return hash.hexdigest()def redirect(self, url, permanent=False):"""Sends a redirect to the given (optionally relative) URL."""if self._headers_written:raise Exception("Cannot redirect after headers have been written")     # 如果没有头部信息则报错self.set_status(301 if permanent else 302)                                 # 设置响应头的状态值# Remove whitespaceurl = re.sub(r"[\x00-\x20]+", "", _utf8(url))                              #去除多余空格信息self.set_header("Location", urlparse.urljoin(self.request.uri, url))       # 将重定向设置到头部信息中self.finish()                                                              # 将数据发送出去def write(self, chunk):"""Writes the given chunk to the output buffer.To write the output to the network, use the flush() method below.If the given chunk is a dictionary, we write it as JSON and setthe Content-Type of the response to be text/javascript."""assert not self._finishedif isinstance(chunk, dict):chunk = escape.json_encode(chunk)self.set_header("Content-Type", "text/javascript; charset=UTF-8")chunk = _utf8(chunk)                                                       # 转换内容格式self._write_buffer.append(chunk)                                           # 添加到写缓冲区中def render(self, template_name, **kwargs):"""Renders the template with the given arguments as the response."""html = self.render_string(template_name, **kwargs)                          # 模板渲染# Insert the additional JS and CSS added by the modules on the pagejs_embed = []js_files = []css_embed = []css_files = []html_heads = []html_bodies = []for module in getattr(self, "_active_modules", {}).itervalues():embed_part = module.embedded_javascript()if embed_part: js_embed.append(_utf8(embed_part))file_part = module.javascript_files()if file_part:if isinstance(file_part, basestring):js_files.append(file_part)else:js_files.extend(file_part)embed_part = module.embedded_css()if embed_part: css_embed.append(_utf8(embed_part))file_part = module.css_files()if file_part:if isinstance(file_part, basestring):css_files.append(file_part)else:css_files.extend(file_part)head_part = module.html_head()if head_part: html_heads.append(_utf8(head_part))body_part = module.html_body()if body_part: html_bodies.append(_utf8(body_part))if js_files:# Maintain order of JavaScript files given by modulespaths = []unique_paths = set()for path in js_files:if not path.startswith("/") and not path.startswith("http:"):path = self.static_url(path)if path not in unique_paths:paths.append(path)unique_paths.add(path)js = ''.join('<script src="' + escape.xhtml_escape(p) +'" type="text/javascript"></script>'for p in paths)sloc = html.rindex('</body>')html = html[:sloc] + js + '\n' + html[sloc:]if js_embed:js = '<script type="text/javascript">\n//<![CDATA[\n' + \'\n'.join(js_embed) + '\n//]]>\n</script>'sloc = html.rindex('</body>')html = html[:sloc] + js + '\n' + html[sloc:]if css_files:paths = set()for path in css_files:if not path.startswith("/") and not path.startswith("http:"):paths.add(self.static_url(path))else:paths.add(path)css = ''.join('<link href="' + escape.xhtml_escape(p) + '" ''type="text/css" rel="stylesheet"/>'for p in paths)hloc = html.index('</head>')html = html[:hloc] + css + '\n' + html[hloc:]if css_embed:css = '<style type="text/css">\n' + '\n'.join(css_embed) + \'\n</style>'hloc = html.index('</head>')html = html[:hloc] + css + '\n' + html[hloc:]if html_heads:hloc = html.index('</head>')html = html[:hloc] + ''.join(html_heads) + '\n' + html[hloc:]if html_bodies:hloc = html.index('</body>')html = html[:hloc] + ''.join(html_bodies) + '\n' + html[hloc:]self.finish(html)def render_string(self, template_name, **kwargs):"""Generate the given template with the given arguments.We return the generated string. To generate and write a templateas a response, use render() above."""# If no template_path is specified, use the path of the calling filetemplate_path = self.get_template_path()                                    # 获取配置的模板文件路径if not template_path:frame = sys._getframe(0)web_file = frame.f_code.co_filenamewhile frame.f_code.co_filename == web_file:frame = frame.f_backtemplate_path = os.path.dirname(frame.f_code.co_filename)if not getattr(RequestHandler, "_templates", None):                         # 如果获取为空,则创建一个字典RequestHandler._templates = {}if template_path not in RequestHandler._templates:                          # 如果模板文件不再字典中则添加对应的名称和loader进该字典loader = self.application.settings.get("template_loader") or\template.Loader(template_path)RequestHandler._templates[template_path] = loader                       # 要么使用配置的Loader要么使用默认的loadt = RequestHandler._templates[template_path].load(template_name)args = dict(handler=self,request=self.request,current_user=self.current_user,locale=self.locale,_=self.locale.translate,static_url=self.static_url,xsrf_form_html=self.xsrf_form_html,reverse_url=self.application.reverse_url)args.update(self.ui)args.update(kwargs)return t.generate(**args)def flush(self, include_footers=False):"""Flushes the current output buffer to the nextwork."""if self.application._wsgi: raise Exception("WSGI applications do not support flush()")        chunk = "".join(self._write_buffer)                                 # 拼接处理完成的数据self._write_buffer = []                                             # 将写缓存清空if not self._headers_written:                                       # 如果头部为空self._headers_written = True                                    for transform in self._transforms:self._headers, chunk = transform.transform_first_chunk(self._headers, chunk, include_footers)                  # 处理压缩转换的数据headers = self._generate_headers()                              # 获取拼接的头部文件字符串else:for transform in self._transforms:chunk = transform.transform_chunk(chunk, include_footers)headers = ""# Ignore the chunk and only write the headers for HEAD requestsif self.request.method == "HEAD":if headers: self.request.write(headers)returnif headers or chunk:self.request.write(headers + chunk)def finish(self, chunk=None):"""Finishes this response, ending the HTTP request."""assert not self._finishedif chunk is not None: self.write(chunk)                                    # 调用完成后调用,此时写入传入值# Automatically support ETags and add the Content-Length header if# we have not flushed any content yet.if not self._headers_written:                                              # 如果头部长度为空,自动添加长度等返回信息if (self._status_code == 200 and self.request.method == "GET" and      # 第一次请求数据的时候,"Etag" not in self._headers):                                      # 详情可参考:https://www.cnblogs.com/softidea/p/5986339.htmlhasher = hashlib.sha1()                               for part in self._write_buffer:hasher.update(part)etag = '"%s"' % hasher.hexdigest()                                 # 对将要发送的数据进行sha1计算inm = self.request.headers.get("If-None-Match")                    if inm and inm.find(etag) != -1:self._write_buffer = []self.set_status(304)                                           # 如果该数据已经发送,则不发送该数据else:self.set_header("Etag", etag)                                  # 如果首次就设置etagif "Content-Length" not in self._headers:content_length = sum(len(part) for part in self._write_buffer)self.set_header("Content-Length", content_length)                  # 设置返回长度信息if hasattr(self.request, "connection"):# Now that the request is finished, clear the callback we# set on the IOStream (which would otherwise prevent the# garbage collection of the RequestHandler when there# are keepalive connections)self.request.connection.stream.set_close_callback(None)if not self.application._wsgi:self.flush(include_footers=True)                                    # 将处理的数据写出self.request.finish()                                               # 调用request.finish()self._log()self._finished = Truedef send_error(self, status_code=500, **kwargs):"""Sends the given HTTP error code to the browser.We also send the error HTML for the given error code as returned byget_error_html. Override that method if you want custom error pagesfor your application."""if self._headers_written:logging.error("Cannot send error response after headers written")if not self._finished:self.finish()returnself.clear()self.set_status(status_code)message = self.get_error_html(status_code, **kwargs)self.finish(message)def get_error_html(self, status_code, **kwargs):"""Override to implement custom error pages.If this error was caused by an uncaught exception, theexception object can be found in kwargs e.g. kwargs['exception']"""return "<html><title>%(code)d: %(message)s</title>" \"<body>%(code)d: %(message)s</body></html>" % {"code": status_code,"message": httplib.responses[status_code],}@propertydef locale(self):"""The local for the current session.Determined by either get_user_locale, which you can override toset the locale based on, e.g., a user preference stored in adatabase, or get_browser_locale, which uses the Accept-Languageheader."""if not hasattr(self, "_locale"):self._locale = self.get_user_locale()if not self._locale:self._locale = self.get_browser_locale()assert self._localereturn self._localedef get_user_locale(self):"""Override to determine the locale from the authenticated user.If None is returned, we use the Accept-Language header."""return Nonedef get_browser_locale(self, default="en_US"):"""Determines the user's locale from Accept-Language header.See http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.4"""if "Accept-Language" in self.request.headers:                                   # 获取语言字段languages = self.request.headers["Accept-Language"].split(",")locales = []for language in languages:parts = language.strip().split(";")if len(parts) > 1 and parts[1].startswith("q="):try:score = float(parts[1][2:])except (ValueError, TypeError):score = 0.0else:score = 1.0locales.append((parts[0], score))if locales:locales.sort(key=lambda (l, s): s, reverse=True)codes = [l[0] for l in locales]return locale.get(*codes)return locale.get(default)@propertydef current_user(self):"""The authenticated user for this request.Determined by either get_current_user, which you can override toset the user based on, e.g., a cookie. If that method is notoverridden, this method always returns None.We lazy-load the current user the first time this method is calledand cache the result after that."""if not hasattr(self, "_current_user"):self._current_user = self.get_current_user()return self._current_userdef get_current_user(self):"""Override to determine the current user from, e.g., a cookie."""return Nonedef get_login_url(self):"""Override to customize the login URL based on the request.By default, we use the 'login_url' application setting."""self.require_setting("login_url", "@tornado.web.authenticated")return self.application.settings["login_url"]def get_template_path(self):"""Override to customize template path for each handler.By default, we use the 'template_path' application setting.Return None to load templates relative to the calling file."""return self.application.settings.get("template_path")@propertydef xsrf_token(self):"""The XSRF-prevention token for the current user/session.To prevent cross-site request forgery, we set an '_xsrf' cookieand include the same '_xsrf' value as an argument with all POSTrequests. If the two do not match, we reject the form submissionas a potential forgery.See http://en.wikipedia.org/wiki/Cross-site_request_forgery"""if not hasattr(self, "_xsrf_token"):                                        # 跨站请求tokentoken = self.get_cookie("_xsrf")if not token:token = binascii.b2a_hex(uuid.uuid4().bytes)expires_days = 30 if self.current_user else Noneself.set_cookie("_xsrf", token, expires_days=expires_days)self._xsrf_token = tokenreturn self._xsrf_tokendef check_xsrf_cookie(self):"""Verifies that the '_xsrf' cookie matches the '_xsrf' argument.To prevent cross-site request forgery, we set an '_xsrf' cookieand include the same '_xsrf' value as an argument with all POSTrequests. If the two do not match, we reject the form submissionas a potential forgery.See http://en.wikipedia.org/wiki/Cross-site_request_forgery"""if self.request.headers.get("X-Requested-With") == "XMLHttpRequest":       # 检查跨站请求tokenreturntoken = self.get_argument("_xsrf", None)if not token:raise HTTPError(403, "'_xsrf' argument missing from POST")if self.xsrf_token != token:raise HTTPError(403, "XSRF cookie does not match POST argument")def xsrf_form_html(self):"""An HTML <input/> element to be included with all POST forms.It defines the _xsrf input value, which we check on all POSTrequests to prevent cross-site request forgery. If you have setthe 'xsrf_cookies' application setting, you must include thisHTML within all of your HTML forms.See check_xsrf_cookie() above for more information."""return '<input type="hidden" name="_xsrf" value="' + \escape.xhtml_escape(self.xsrf_token) + '"/>'                        # 从html中获取跨站请求信息标签def static_url(self, path):"""Returns a static URL for the given relative static file path.This method requires you set the 'static_path' setting in yourapplication (which specifies the root directory of your staticfiles).We append ?v=<signature> to the returned URL, which makes ourstatic file handler set an infinite expiration header on thereturned content. The signature is based on the content of thefile.If this handler has a "include_host" attribute, we include thefull host for every static URL, including the "http://". Setthis attribute for handlers whose output needs non-relative staticpath names."""self.require_setting("static_path", "static_url")                       # 获取静态文件路径if not hasattr(RequestHandler, "_static_hashes"):RequestHandler._static_hashes = {}hashes = RequestHandler._static_hashesif path not in hashes:try:f = open(os.path.join(self.application.settings["static_path"], path))hashes[path] = hashlib.md5(f.read()).hexdigest()f.close()except:logging.error("Could not open static file %r", path)hashes[path] = Nonebase = self.request.protocol + "://" + self.request.host \if getattr(self, "include_host", False) else ""static_url_prefix = self.settings.get('static_url_prefix', '/static/')if hashes.get(path):return base + static_url_prefix + path + "?v=" + hashes[path][:5]else:return base + static_url_prefix + pathdef async_callback(self, callback, *args, **kwargs):"""Wrap callbacks with this if they are used on asynchronous requests.Catches exceptions and properly finishes the request."""if callback is None:return Noneif args or kwargs:callback = functools.partial(callback, *args, **kwargs)def wrapper(*args, **kwargs):try:return callback(*args, **kwargs)except Exception, e:if self._headers_written:logging.error("Exception after headers written",exc_info=True)else:self._handle_request_exception(e)return wrapperdef require_setting(self, name, feature="this feature"):"""Raises an exception if the given app setting is not defined."""if not self.application.settings.get(name):                     raise Exception("You must define the '%s' setting in your ""application to use %s" % (name, feature))  def reverse_url(self, name, *args):return self.application.reverse_url(name, *args)def _execute(self, transforms, *args, **kwargs):"""Executes this request with the given output transforms."""self._transforms = transformstry:if self.request.method not in self.SUPPORTED_METHODS:                   # 判断该请求的方式是否是支持的方式raise HTTPError(405)# If XSRF cookies are turned on, reject form submissions without# the proper cookieif self.request.method == "POST" and \self.application.settings.get("xsrf_cookies"):                       # 如果是post请求还需要检查xsrfself.check_xsrf_cookie()self.prepare()                                                          # 在处理请求之前调用该函数,if not self._finished:getattr(self, self.request.method.lower())(*args, **kwargs)         # 获取method方法对应的处理方法处理if self._auto_finish and not self._finished:self.finish()                                                   # 结束该次请求except Exception, e:self._handle_request_exception(e)def _generate_headers(self):lines = [self.request.version + " " + str(self._status_code) + " " +httplib.responses[self._status_code]]                              # 返回头第一行lines.extend(["%s: %s" % (n, v) for n, v in self._headers.iteritems()])     # 将头部序列化进linesfor cookie_dict in getattr(self, "_new_cookies", []):                       # 如果_new_cookies中有数据则写入for cookie in cookie_dict.values():lines.append("Set-Cookie: " + cookie.OutputString(None))return "\r\n".join(lines) + "\r\n\r\n"                                      # 拼成头部字符串def _log(self):if self._status_code < 400:log_method = logging.infoelif self._status_code < 500:log_method = logging.warningelse:log_method = logging.errorrequest_time = 1000.0 * self.request.request_time()log_method("%d %s %.2fms", self._status_code,self._request_summary(), request_time)def _request_summary(self):return self.request.method + " " + self.request.uri + " (" + \self.request.remote_ip + ")"def _handle_request_exception(self, e):                                         # 处理出错信息的处理if isinstance(e, HTTPError):if e.log_message:format = "%d %s: " + e.log_messageargs = [e.status_code, self._request_summary()] + list(e.args)logging.warning(format, *args)if e.status_code not in httplib.responses:logging.error("Bad HTTP status code: %d", e.status_code)self.send_error(500, exception=e)else:self.send_error(e.status_code, exception=e)else:logging.error("Uncaught exception %s\n%r", self._request_summary(),self.request, exc_info=e)self.send_error(500, exception=e)def _ui_module(self, name, module):                                             # 获取ui渲染模板def render(*args, **kwargs):if not hasattr(self, "_active_modules"):self._active_modules = {}if name not in self._active_modules:self._active_modules[name] = module(self)rendered = self._active_modules[name].render(*args, **kwargs)return renderedreturn renderdef _ui_method(self, method):return lambda *args, **kwargs: method(self, *args, **kwargs)

每个处理handler需要继承RequestHandler,需要该handler就重写相应的get,post等方法,

    def _execute(self, transforms, *args, **kwargs):"""Executes this request with the given output transforms."""self._transforms = transformstry:if self.request.method not in self.SUPPORTED_METHODS:                   # 判断该请求的方式是否是支持的方式raise HTTPError(405)# If XSRF cookies are turned on, reject form submissions without# the proper cookieif self.request.method == "POST" and \self.application.settings.get("xsrf_cookies"):                       # 如果是post请求还需要检查xsrfself.check_xsrf_cookie()self.prepare()                                                          # 在处理请求之前调用该函数,if not self._finished:getattr(self, self.request.method.lower())(*args, **kwargs)         # 获取method方法对应的处理方法处理if self._auto_finish and not self._finished:self.finish()                                                   # 结束该次请求except Exception, e:self._handle_request_exception(e)

执行完成后就完成了请求的处理。
至此,基本框架分析完成。

框架的大致运行流程:
server的运行流程(侵权删)

requesthandler处理流程(侵权删)

至此框架的运行原理的大概流程图如下作为补充:

tornado源码分析相关推荐

  1. Tornado源码分析 --- 静态文件处理模块

    每个web框架都会有对静态文件的处理支持,下面对于Tornado的静态文件的处理模块的源码进行分析,以加强自己对静态文件处理的理解. 先从Tornado的主要模块 web.py 入手,可以看到在App ...

  2. tornado源码分析系列一

    先来看一个简单的示例: #!/usr/bin/env python #coding:utf8import socketdef run():sock = socket.socket(socket.AF_ ...

  3. [原]tornado源码分析系列(三)[网络层 IOLoop类]

    引言:由于都是在工作当中抽出时间看源代码,所以更新速度比较慢,但是还是希望通过对好的源码的分析和探讨,大家相互学习,发现不好的地方共同讨论. 上次讲了IOLoop中的几个重要的方法,inistance ...

  4. msdn windows server 按电源事件api_【tornado源码分析】I/O事件循环机制与多进程

    tornado是一个异步非阻塞的web框架,由Facebook开源,源代码用的python语言.之前也用tornado做过几个项目,实习的时候公司也是用的tornado来做工业物联网的后端.空余时间研 ...

  5. tornado源码分析-Application

    tornado.web包含web框架的大部分主要功能,Application是其中一个重要的类 Application类的作用是实现 URI 转发,将 Application 的实例传递给 https ...

  6. tornado源码分析(四)之future、gen.coroutine

    future是什么 在事件驱动编程模型中,会有很多的事件循环,各事件循环在创建异步事件时可以同时创建一个future对象,并将创建的异步事件与该future对象存储在一起,并将所有传入的callbac ...

  7. Python微型Web框架Bottle源码分析

    Bottle 是一个快速,简单和轻量级的 WSGI 微型 Web 框架的 Python.它作为单个文件模块分发,除了 Python 标准库之外没有依赖关系. 选择源码分析的版本是 Release 于 ...

  8. 第二篇:白话tornado源码之待请求阶段

    上篇<白话tornado源码之一个脚本引发的血案>用上帝视角多整个框架做了一个概述,同时也看清了web框架的的本质,下面我们从tornado程序的起始来分析其源码. 概述 上图是torna ...

  9. SaltStack源码分析之:master端执行salt模块大致流程

    2019独角兽企业重金招聘Python工程师标准>>> ##JOB执行流程 先看下官网对于master端的工作流程的介绍: The Salt master works by alwa ...

最新文章

  1. php二分搜索,php如何实现二分搜索法
  2. ABP框架理论学习之Debugging
  3. 【Matlab】绘制不同颜色线条
  4. Java第一次读文件慢_Java 关于文件读取速度问题,求助,谢谢啦
  5. JAVA文件写入FileWriter
  6. 训练(training)和推理\推断(inference)的关系?
  7. 金融壹账通京交会发布区块链白皮书 详解如何成功解决行业痛点
  8. linux中特殊符号分割,Shell_Linux Shell 中实现字符串切割的几种方法
  9. python直方图均衡_python 简单图像处理(8) 直方图均衡化
  10. java工作流activity_activity 工作流学习(一)
  11. 物联网智能空气环境监测系统解决方案
  12. Java二次开发海康SDK-对接门禁机
  13. mysql my.cnf 生效_mysql配置文件生效顺序
  14. 微分 的定义,为什么引入微分
  15. java 判断实体为空_Java 判断实体类属性是否为空工具类
  16. opencv normalize blur medianBlur
  17. PhoneGap Cordova 安装白皮书
  18. (转)网络编程:Socket编程从IPv4转向IPv6支持
  19. 图论(9)图的连通度
  20. java map详解

热门文章

  1. 揭秘华为AI一站式开发平台,3步构建一个AI模型 | 华为昇腾师资培训沙龙西安场...
  2. 机器学习新闻综述:2019年AI领域不得不看的6篇文章
  3. TensorFlow 2.0新特性解读,Keras API成核心
  4. 翻译们又要失业?Facebook最新无监督机器翻译成果,BLEU提升10个点!
  5. 无人驾驶汽车系统入门——基于Frenet优化轨迹的无人车动作规划方法
  6. NLP顶级专家Dan Roth :自然语言处理领域近期的任务和主要应用
  7. 苹果智能音箱HomePod确定开售时间,权威人士质疑:是否太晚?
  8. 如何用Python和深度神经网络识别图像?
  9. 颠覆认知了,公司 SRE 天天到底在干嘛?不会是重启工程师吧?
  10. 月薪没到30K的程序员必须要背的面试八股文,我先啃为敬。。。