tornado和subprocess实现程序的非堵塞异步处理
ornado 是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。
源码组织:
|---__init__.py
---auth.py
---......
---epoll.c
---ioloop.py
---iostream.py
---...
tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。
ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理
iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.
这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。
下面是个例子,有tornado基础的朋友,一看就懂的~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import subprocess
import tornado.ioloop
import time
import fcntl
import functools
import os
class GenericSubprocess (object):
def __init__ ( self, timeout=- 1 , **popen_args ):
self.args = dict()
self.args[ "stdout" ] = subprocess.PIPE
self.args[ "stderr" ] = subprocess.PIPE
self.args[ "close_fds" ] = True
self.args.update(popen_args)
self.ioloop = None
self.expiration = None
self.pipe = None
self.timeout = timeout
self.streams = []
self.has_timed_out = False
def start(self):
"" "Spawn the task.
Throws RuntimeError if the task was already started. "" "
if not self.pipe is None:
raise RuntimeError( "Cannot start task twice" )
self.ioloop = tornado.ioloop.IOLoop.instance()
if self.timeout > 0 :
self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
self.pipe = subprocess.Popen(**self.args)
self.streams = [ (self.pipe.stdout.fileno(), []),
(self.pipe.stderr.fileno(), []) ]
for fd, d in self.streams:
flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
fcntl.fcntl( fd, fcntl.F_SETFL, flags)
self.ioloop.add_handler( fd,
self.stat,
self.ioloop.READ|self.ioloop.ERROR)
def on_timeout(self):
self.has_timed_out = True
self.cancel()
def cancel (self ) :
"" "Cancel task execution
Sends SIGKILL to the child process. "" "
try :
self.pipe.kill()
except:
pass
def stat( self, *args ):
'' 'Check process completion and consume pending I/O data' ''
self.pipe.poll()
if not self.pipe.returncode is None:
'' 'cleanup handlers and timeouts' ''
if not self.expiration is None:
self.ioloop.remove_timeout(self.expiration)
for fd, dest in self.streams:
self.ioloop.remove_handler(fd)
'' 'schedulle callback (first try to read all pending data)' ''
self.ioloop.add_callback(self.on_finish)
for fd, dest in self.streams:
while True:
try :
data = os.read(fd, 4096 )
if len(data) == 0 :
break
dest.extend([data])
except:
break
@property
def stdout(self):
return self.get_output( 0 )
@property
def stderr(self):
return self.get_output( 1 )
@property
def status(self):
return self.pipe.returncode
def get_output(self, index ):
return "" .join(self.streams[index][ 1 ])
def on_finish(self):
raise NotImplemented()
class Subprocess (GenericSubprocess):
def __init__ ( self, callback, *args, **kwargs):
self.callback = callback
self.done_callback = False
GenericSubprocess.__init__(self, *args, **kwargs)
def on_finish(self):
if not self.done_callback:
self.done_callback = True
'' 'prevent calling callback twice' ''
self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if __name__ == "__main__" :
ioloop = tornado.ioloop.IOLoop.instance()
def print_timeout( status, stdout, stderr, has_timed_out) :
assert(status!= 0 )
assert(has_timed_out)
print "OK status:" , repr(status), "stdout:" , repr(stdout), "stderr:" , repr(stderr), "timeout:" , repr(has_timed_out)
def print_ok( status, stdout, stderr, has_timed_out) :
assert(status== 0 )
assert(not has_timed_out)
print "OK status:" , repr(status), "stdout:" , repr(stdout), "stderr:" , repr(stderr), "timeout:" , repr(has_timed_out)
def print_error( status, stdout, stderr, has_timed_out):
assert(status!= 0 )
assert(not has_timed_out)
print "OK status:" , repr(status), "stdout:" , repr(stdout), "stderr:" , repr(stderr), "timeout:" , repr(has_timed_out)
def stop_test():
ioloop.stop()
t1 = Subprocess( print_timeout, timeout= 3 , args=[ "sleep" , "5" ] )
t2 = Subprocess( print_ok, timeout= 3 , args=[ "ip" , "a" ] )
t3 = Subprocess( print_ok, timeout= 3 , args=[ "sleepdsdasdas" , "1" ] )
t4 = Subprocess( print_error, timeout= 3 , args=[ "cat" , "/etc/sdfsdfsdfsdfsdfsdfsdf" ] )
t1.start()
t2.start()
try :
t3.start()
assert( false )
except:
print "OK"
t4.start()
ioloop.add_timeout(time.time() + 10 , stop_test)
ioloop.start()
|
tornado和subprocess实现程序的非堵塞异步处理相关推荐
- Windows Sockets2 详解2——堵塞与非堵塞模式
Socket有两种调用模式:堵塞与非堵塞模式. 顾名思义,堵塞模式就是线程在调用windows socket API后,被挂起直到该函数执行完毕.在(1)中我们知道socket函数调用并不是立即完成的 ...
- IO模式——同步(堵塞、非堵塞)、异步
为什么IO模式非常重要?由于现代的计算机和操作系统的架构决定了CPU是稀缺资源,大家都要来一起竞争.而IO(特别是网络相关的IO)的速度往往较慢.所以怎样进行IO就有了多种模式,包含同步.异步.堵塞. ...
- python模拟多线程http请求_用python实现自己的http服务器——多进程、多线程、协程、单进程非堵塞版、epoll版...
了解http协议 http请求头 GET / HTTP/1.1 Host: www.baidu.com Connection: keep-alive Pragma: no-cache Cache-Co ...
- recv/send堵塞和非堵塞
recv/send堵塞和非堵塞理解 TCP之深入浅出send和recv 需要理解的3个概念 实例详解send() send函数 recv函数 参考: TCP之深入浅出send和recv linux下非 ...
- 同步、异步、堵塞、非堵塞和函数调用及I/O之间的组合概念
在我们工作和学习中,经常会接触到"同步"."异步"."堵塞"和"非堵塞"这些概念,但是并不是每个人都能将它们的关系和区别 ...
- Apache Artemis —— 非堵塞 Java 嵌入消息服务
Apache ActiveMQ Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器.其核心只依赖一个 netty.jar 文件.该项目的目的是为你的 Java 应用提供一 ...
- linux父进程循环,LINUX C 父进程建立多个子进程循环非堵塞回收列子
下面 代码主要用于复习,留于此 点击(此处)折叠或打开 /*********************************************************************** ...
- 单进程服务器-非堵塞模式(python版)
单进程服务器-非堵塞模式 服务器 #coding=utf-8 from socket import * import time# 用来存储所有的新链接的socket g_socketList = [] ...
- 为什么中国程序员非要用英文编程,而不用汉字编程
为什么中国程序员非要用英文编程,而不用汉字编程? 看完下面这个视频 你就知道啦 编程是编写程序的中文简称,在21世纪的互联网时代,计算机使用越来越广泛,编程也是一大热门.而大家都知道现在的编程软件都是 ...
最新文章
- 基于thinkphp的省略图便捷函数
- 【yii2】 yii框架如果控制器和方法都是多个单词组成应该怎样写请求链接
- 每天一道LeetCode-----计算给定范围内所有数的与运算结果
- Qt::WindowType、Qt::WidgetAttribute各个标志含义汇总
- CAN总线技术在船舶监控系统的应用
- java age_这些Java9 超牛的新特性,你竟然还没用过?
- 如何免密操作 github、gitee 远程仓库
- 在Hisi3531环境中为wm8978芯片添加音量调节功能及测试
- 0门槛操作SEO快排代做项目 无需SEO基础
- html怎么导入桌面上的图片,html怎么导入图片
- Suspense组件的使用
- OPA277/OPA2277/OPA4277 High Precision Operational Amplifiers 高精度运放
- 高质量 ppt 免费下载网站
- java安卓天气结课作业_安卓天气预报毕设和毕业论文
- 【牛客刷题22】数根与星际密码
- c语言求寻找假币的次数,C语言经典算法 - 八枚银币问题
- 彭于晏简单网页制作(HTML和css)
- 线性失真与非线性失真
- java生成缩略图例子_具体介绍java生成缩略图的方法示例代码
- 编译OpenBLAS