ornado     是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。

源码组织:

  |---__init__.py

   ---auth.py

   ---......

   ---epoll.c

   ---ioloop.py

   ---iostream.py

   ---...

  tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。

  ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理

  iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.

这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。

下面是个例子,有tornado基础的朋友,一看就懂的~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import subprocess
import tornado.ioloop
import time
import fcntl
import functools
import os
class GenericSubprocess (object):
    def __init__ ( self, timeout=-1, **popen_args ):
        self.args = dict()
        self.args["stdout"] = subprocess.PIPE
        self.args["stderr"] = subprocess.PIPE
        self.args["close_fds"] = True
        self.args.update(popen_args)
        self.ioloop = None
        self.expiration = None
        self.pipe = None
        self.timeout = timeout
        self.streams = []
        self.has_timed_out = False
    def start(self):
        """Spawn the task.
        Throws RuntimeError if the task was already started."""
        if not self.pipe is None:
            raise RuntimeError("Cannot start task twice")
        self.ioloop = tornado.ioloop.IOLoop.instance()
        if self.timeout > 0:
            self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
        self.pipe = subprocess.Popen(**self.args)
        self.streams = [ (self.pipe.stdout.fileno(), []),
                             (self.pipe.stderr.fileno(), []) ]
        for fd, d in self.streams:
            flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
            fcntl.fcntl( fd, fcntl.F_SETFL, flags)
            self.ioloop.add_handler( fd,
                                     self.stat,
                                     self.ioloop.READ|self.ioloop.ERROR)
    def on_timeout(self):
        self.has_timed_out = True
        self.cancel()
    def cancel (self ) :
        """Cancel task execution
        Sends SIGKILL to the child process."""
        try:
            self.pipe.kill()
        except:
            pass
    def stat( self, *args ):
        '''Check process completion and consume pending I/O data'''
        self.pipe.poll()
        if not self.pipe.returncode is None:
            '''cleanup handlers and timeouts'''
            if not self.expiration is None:
                self.ioloop.remove_timeout(self.expiration)
            for fd, dest in  self.streams:
                self.ioloop.remove_handler(fd)
            '''schedulle callback (first try to read all pending data)'''
            self.ioloop.add_callback(self.on_finish)
        for fd, dest in  self.streams:
            while True:
                try:
                    data = os.read(fd, 4096)
                    if len(data) == 0:
                        break
                    dest.extend([data])
                except:
                    break
    @property
    def stdout(self):
        return self.get_output(0)
    @property
    def stderr(self):
        return self.get_output(1)
    @property
    def status(self):
        return self.pipe.returncode
    def get_output(self, index ):
        return "".join(self.streams[index][1])
    def on_finish(self):
        raise NotImplemented()
class Subprocess (GenericSubprocess):
    def __init__ ( self, callback, *args, **kwargs):
        self.callback = callback
        self.done_callback = False
        GenericSubprocess.__init__(self, *args, **kwargs)
    def on_finish(self):
        if not self.done_callback:
            self.done_callback = True
            '''prevent calling callback twice'''
            self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if __name__ == "__main__":
    ioloop = tornado.ioloop.IOLoop.instance()
    def print_timeout( status, stdout, stderr, has_timed_out) :
        assert(status!=0)
        assert(has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_ok( status, stdout, stderr, has_timed_out) :
        assert(status==0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_error( status, stdout, stderr, has_timed_out):
        assert(status!=0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def stop_test():
        ioloop.stop()
    t1 = Subprocess( print_timeout, timeout=3, args=[ "sleep","5"] )
    t2 = Subprocess( print_ok, timeout=3, args=[ "ip""a" ] )
    t3 = Subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas""1" ] )
    t4 = Subprocess( print_error, timeout=3, args=[ "cat""/etc/sdfsdfsdfsdfsdfsdfsdf" ] )
    t1.start()
    t2.start()
    try:
        t3.start()
        assert(false)
    except:
        print "OK"
    t4.start()
    ioloop.add_timeout(time.time() + 10, stop_test)
    ioloop.start()

本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1236330,如需转载请自行联系原作者

tornado和subprocess实现程序的非堵塞异步处理相关推荐

  1. Windows Sockets2 详解2——堵塞与非堵塞模式

    Socket有两种调用模式:堵塞与非堵塞模式. 顾名思义,堵塞模式就是线程在调用windows socket API后,被挂起直到该函数执行完毕.在(1)中我们知道socket函数调用并不是立即完成的 ...

  2. IO模式——同步(堵塞、非堵塞)、异步

    为什么IO模式非常重要?由于现代的计算机和操作系统的架构决定了CPU是稀缺资源,大家都要来一起竞争.而IO(特别是网络相关的IO)的速度往往较慢.所以怎样进行IO就有了多种模式,包含同步.异步.堵塞. ...

  3. python模拟多线程http请求_用python实现自己的http服务器——多进程、多线程、协程、单进程非堵塞版、epoll版...

    了解http协议 http请求头 GET / HTTP/1.1 Host: www.baidu.com Connection: keep-alive Pragma: no-cache Cache-Co ...

  4. recv/send堵塞和非堵塞

    recv/send堵塞和非堵塞理解 TCP之深入浅出send和recv 需要理解的3个概念 实例详解send() send函数 recv函数 参考: TCP之深入浅出send和recv linux下非 ...

  5. 同步、异步、堵塞、非堵塞和函数调用及I/O之间的组合概念

    在我们工作和学习中,经常会接触到"同步"."异步"."堵塞"和"非堵塞"这些概念,但是并不是每个人都能将它们的关系和区别 ...

  6. Apache Artemis —— 非堵塞 Java 嵌入消息服务

    Apache ActiveMQ Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器.其核心只依赖一个 netty.jar 文件.该项目的目的是为你的 Java 应用提供一 ...

  7. linux父进程循环,LINUX C 父进程建立多个子进程循环非堵塞回收列子

    下面 代码主要用于复习,留于此 点击(此处)折叠或打开 /*********************************************************************** ...

  8. 单进程服务器-非堵塞模式(python版)

    单进程服务器-非堵塞模式 服务器 #coding=utf-8 from socket import * import time# 用来存储所有的新链接的socket g_socketList = [] ...

  9. 为什么中国程序员非要用英文编程,而不用汉字编程

    为什么中国程序员非要用英文编程,而不用汉字编程? 看完下面这个视频 你就知道啦 编程是编写程序的中文简称,在21世纪的互联网时代,计算机使用越来越广泛,编程也是一大热门.而大家都知道现在的编程软件都是 ...

最新文章

  1. 基于thinkphp的省略图便捷函数
  2. 【yii2】 yii框架如果控制器和方法都是多个单词组成应该怎样写请求链接
  3. 每天一道LeetCode-----计算给定范围内所有数的与运算结果
  4. Qt::WindowType、Qt::WidgetAttribute各个标志含义汇总
  5. CAN总线技术在船舶监控系统的应用
  6. java age_这些Java9 超牛的新特性,你竟然还没用过?
  7. 如何免密操作 github、gitee 远程仓库
  8. 在Hisi3531环境中为wm8978芯片添加音量调节功能及测试
  9. 0门槛操作SEO快排代做项目 无需SEO基础
  10. html怎么导入桌面上的图片,html怎么导入图片
  11. Suspense组件的使用
  12. OPA277/OPA2277/OPA4277 High Precision Operational Amplifiers 高精度运放
  13. 高质量 ppt 免费下载网站
  14. java安卓天气结课作业_安卓天气预报毕设和毕业论文
  15. 【牛客刷题22】数根与星际密码
  16. c语言求寻找假币的次数,C语言经典算法 - 八枚银币问题
  17. 彭于晏简单网页制作(HTML和css)
  18. 线性失真与非线性失真
  19. java生成缩略图例子_具体介绍java生成缩略图的方法示例代码
  20. 编译OpenBLAS

热门文章

  1. VB实现指示窗口中拖动方框的程序
  2. AI大牛Jerry Kaplan:AGI?没有技术和工程基础
  3. Linux日志被删处理方法
  4. 美国国家安全原因致金沙江暂停收购飞利浦LED
  5. unity, destroy gameObject destroy all children
  6. KVM的概念和云计算
  7. JavaScript ES5之Object.create函数详解
  8. window7梦幻桌面
  9. SQL Server中 缓冲和池的不同点
  10. HDU 5970 CCPC2016合肥 求等差数列整除整数下取整求和