Tornado实现多线程、多进程HTTP服务
2019独角兽企业重金招聘Python工程师标准>>>
背景
线上有一个相关百科的服务,返回一个query中提及的百科词条。该服务是用python实现的,以前通过thrift接口访问,现要将其改为通过HTTP访问。之前没有搭建HTTPServer的经验,因此想用python的web Framework来做这件事,于是有了下面的工作。第一部分是框架选择,这一部分没有太仔细考虑,只是大概看了一些文章。第二部分是根据所需要的功能,学习及测试在框架上应该如何实现。第三部分是实际的代码。第四部分是下一步的学习。
框架选择
python有很多开源的web framework。从知乎上找了几篇综述型的简介,大体包括:Django、Bottle、Flask、web2py、Tornado。看中了介绍中提及Tornado的速度与并发量,于是打算用tornado来实现。所以按目前的了解,或许Tornado并非实现本工作的最佳方案,只是一个可行方案。
学习与测试
用tornado开发web服务的基本流程
tornado具有web framework的功能,因此用它开发web服务非常方便:
- 实现处理请求的Handler,该类继承自
tornado.web.RequestHandler
,实现用于处理请求的对应方法如:get、post等。返回内容用self.write
方法输出。- 实例化一个Application。构造函数的参数是一个Handlers列表,通过正则表达式,将请求与Handler对应起来。通过dict将Handler需要的其他对象以参数的方式传递给Handler的initialize方法。
- 初始化一个
tornado.httpserver.HTTPServer
对象,构造函数的参数是上一步的Application对象。- 为HTTPServer对象绑定一个端口。
- 开始IOLoop。
原服务的特点
原服务是一个内存占用大,IO密集,计算量适中的服务。
- 内存占用大。需要加载一个比较大的词表,其中每个词对应一个id列表,这一部分是C++实现的,通过boost.python封装为python可调用的so。原服务单进程占用内存超过5G。
- IO密集。计算过程中大量访问redis读取term及baikeid的属性信息,用于过滤及rank计算。也访问在线分词服务,获取各term的NLP分析。
- 计算量适中。划词匹配、rank计算有一定计算量,但是总体来看计算量不是特别大。python单进程每天500多万的访问量,单CPU利用率也就40%-50%之间。
关于服务的分析:
- 内存占用大。内存占用大,但绝大部分是只读的。不适合独立启动多个进程,适合多线程或用子进程。
- IO密集。适合将IO操作都变为异步请求,或者用多线程模型。
- 计算量适中。由于python解释器使用GIL,多线程只能提高IO的并发能力,不能提高计算的并发能力。因此可以考虑通过子进程的方式,适当增加提供服务的进程数,提高整个系统服务能力的上限。
需要用到的特性
由于tornado的亮点是异步请求,所以这里首先想到的是将所有请求都改造为异步的。但是这里遇到一个问题,就是异步函数内一定不能有阻塞调用出现,否则整个IOLoop都会被卡住。这就要求彻底地去改造服务,将所有IO或是用时较长的请求都改造为异步函数。这个工程量是非常大的,需要去修改已有的代码。因此,我们考虑用线程池的方式去实现。当一个线程阻塞在某个请求或IO时,其他线程或IOLoop会继续执行。
另外一个瓶颈就是GIL限制了CPU的并发数量,因此考虑用子进程的方式增加进程数,提高服务能力上限。
综合上面的分析,大致用以下方案:
- 通过子进程的方式复制多个进程,使子进程中的只读页指向同一个物理页。
- 线程池。回避异步改造的工作量,增加IO的并发量。
测试代码
首先测试线程池,测试用例为:
对sleep页面同时发出两个请求:
- 在线程池中运行的函数(这里是
self.block_task
)能够同时执行。表现为在控制台交替打印出数字。- 两个get请求几乎同时返回,在浏览器上显示返回的内容。
线程池的测试代码如下:
import os
import sys
import timeimport tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
from tornado.options import define, optionsclass HasBlockTaskHandler(tornado.web.RequestHandler):executor = ThreadPoolExecutor(20) #起线程池,由当前RequestHandler持有 @tornado.gen.coroutinedef get(self):strTime = time.strftime("%Y-%m-%d %H:%M:%S")print "in get before block_task %s" % strTimeresult = yield self.block_task(strTime)print "in get after block_task"self.write("%s" % (result)) @run_on_executordef block_task(self, strTime):print "in block_task %s" % strTimefor i in range(1, 16):time.sleep(1)print "step %d : %s" % (i, strTime)return "Finish %s" % strTimeif __name__ == "__main__":tornado.options.parse_command_line()app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)http_server = tornado.httpserver.HTTPServer(app)http_server.bind(8888)tornado.ioloop.IOLoop.instance().start()
整个代码里有几个位置值得关注:
executor = ThreadPoolExecutor(20)
。这是给Handler类初始化了一个线程池。其中concurrent.futures
不属于tornado,是python的一个独立模块,在python3中是内置模块,python2.7需要自己安装。- 修饰符
@run_on_executor
。这个修饰符将同步函数改造为在executor(这里是线程池)上运行的异步函数,内部实现是将被修饰的函数submit到executor,返回一个Future对象。- 修饰符
@tornado.gen.coroutine
。被这个修饰符修饰的函数,是一个以同步函数方式编写的异步函数。原本通过callback方式编写的异步代码,有了这个修饰符,可以通过yield一个Future的方式来写。被修饰的函数在yield了一个Future对象后将会被挂起,Future对象的结果返回后继续执行。
运行代码后,在两个不同浏览器上访问sleep页面,得到了想要的效果。这里有一个小插曲,就是如果在同一浏览器的两个tab上进行测试,是无法看到想要的效果。第二个get请求会被block,直到第一个get请求返回,服务端才开始处理第二个get请求。这让我一度觉得多线程没有生效,用了半天时间查了很多资料,才看到是浏览器把相同的第二个请求block了,具体链接参考这里。
由于tornado很方便地支持多进程模型,多进程的使用要简单很多,在以上例子中,只需要对启动部分稍作改动即可。具体代码如下所示:
if __name__ == "__main__":tornado.options.parse_command_line()app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)http_server = tornado.httpserver.HTTPServer(app)http_server.bind(8888)print tornado.ioloop.IOLoop.initialized()http_server.start(5)tornado.ioloop.IOLoop.instance().start()
需要注意的地方有两点:
app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
,在生成Application对象时,要将autoreload和debug两个参数至为False。也就是需要保证在fork子进程之前IOLoop是未被初始化的。这个可以通过tornado.ioloop.IOLoop.initialized()
函数来跟。http_server.start(5)
在启动IOLoop之前通过start函数设置进程数量,如果设置为0表示每个CPU都启动一个进程。
最后的效果是可以看到n+1个进程在运行,且公用同一个端口。
实际代码
大部分逻辑代码是封装好的,服务的代码如下:
import os
import sys
import jsonimport tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.httpclient
import tornado.web
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
from tornado.options import define, optionsimport rela_baike_server
from rela_baike_server import RelaBaikeRequest, RelaBaikeResult, RelaBaikeServerimport logging
from logging.handlers import TimedRotatingFileHandler
logging.basicConfig()import pdbg_log_prefix = '../log/rela_baike_tornado.'def getLogger(strPrefixBase):strPrefix = "%s%d" % (strPrefixBase, os.getpid())logger = logging.getLogger("RELA_BAIKE")logger.propagate = Falsehandler = TimedRotatingFileHandler(strPrefix, 'H', 1)handler.suffix = "%Y%m%d_%H%M%S.log"formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')handler.setFormatter(formatter)logger.addHandler(handler)logger.setLevel(logging.INFO)return loggerdef makeResponseBody(retCode, errReason, dicSummary):dicRes = {}dicRes['retCode'] = retCodeif retCode != 0:dicRes['error'] = errReasonelse:dicRes['data'] = dicSummaryreturn json.dumps(dicRes)class RelaBaikeHandler(tornado.web.RequestHandler):executor = ThreadPoolExecutor(50)def initialize(self, relaServer, logger):self.__serverRelaBaike = relaServerself.__logger = logger @tornado.gen.coroutinedef get(self):lstSummary = []retCode = 0errReason = ""try:utfQuery = self.get_argument('query').encode('utf8').strip()except:errorReason = 'Query encoding not utf-8.'strRes = makeResponseBody(-1, errorReason, lstSummary)self.write(strRes)returnif utfQuery == "":strRes = makeResponseBody(0, '', lstSummary)self.write(strRes)returnerror, errReason, lstSummary = yield self.getRelaBaike(utfQuery)strRes = makeResponseBody(error, errReason, lstSummary)self.write(strRes)def __logResponse(self, utfQuery, relaResult):succ = relaResult.isSuccess()if succ:self.__logger.info("%s\tSucc\t%s" % (utfQuery, "|".join([str(item[0]) for item in relaResult])))else:self.__logger.info("%s\tError:%d" % (utfQuery, relaResult.getError())) @run_on_executordef getRelaBaike(self, utfQuery):error = 0lstSummary = []relaBaikeRequest = RelaBaikeRequest(content=utfQuery)relaBaikeResult = self.__serverRelaBaike.getRelaBaike(relaBaikeRequest)self.__logResponse(utfQuery, relaBaikeResult)if relaBaikeResult.isSuccess():for item in relaBaikeResult:baikeid = item[0]try:dicSummary = json.loads(item[1])except:return -2, 'summary format error' ,lstSummarylstSummary.append(dicSummary)else:return relaBaikeResult.getError(), rela_baike_server.g_dic_error.get(relaBaikeResult.getError(), 'other error') ,lstSumm
aryreturn 0, 'success',lstSummarydef start():port = int(sys.argv[1])serverRelaBaike = rela_baike_server.getRelaBaikeServer()logger = getLogger(g_log_prefix)app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))])http_server = tornado.httpserver.HTTPServer(app)http_server.bind(port)http_server.start(2)tornado.ioloop.IOLoop.instance().start()if __name__ == "__main__":start()
代码所涉及的特性基本上不超过前面的测试例子,除了下两几点:
- 在*Handler类里增加了一个
def initialize(self, relaServer, logger)
函数。这是为了把一些初始化好的对象传到Handler类里。app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))])
。前面handler的initialize函数参数,对应于Application初始化时,每个handler对应的dict。
转载于:https://my.oschina.net/yagami1983/blog/1942415
Tornado实现多线程、多进程HTTP服务相关推荐
- 学习C++项目—— 搭建多进程网络服务框架,增加业务和日志,心跳机制
学习计算机网络编程 一.思路和学习方法 本文学习于:C语言技术网(www.freecplus.net),在 b 站学习于 C 语言技术网,并加以自己的一些理解和复现,如有侵权会删除. 接下来对 ...
- 【Python】单线程异步多线程多进程实例
上一篇文章主要介绍了多任务场景下单线程异步.多线程.多进程如何选择,链接:多任务场景下单线程异步多线程多进程 这里主要通过三个实例去验证一下简单的多任务场景下,三种方式的耗时情况,假设有10个互不关联 ...
- SparkMapReduce的区别、多线程多进程的区别
Spark&MapReduce的区别.多线程&多进程的区别 1.spark与hadoop区别: 2.Spark多线程运行, MR多进程运行 3.MR存在的问题: 4.线程和进程的区别: ...
- linux 进程间界面嵌套,WPF 同一窗口内的多线程/多进程 UI(使用 SetParent 嵌入另一个窗口)...
WPF 的 UI 逻辑只在同一个线程中,这是学习 WPF 开发中大家几乎都会学习到的经验.如果希望做不同线程的 UI,大家也会想到使用另一个窗口来实现,让每个窗口拥有自己的 UI 线程.然而,就不能让 ...
- Python多线程多进程、异步、异常处理等高级用法
文章目录 前言 多线程多进程 多线程 多进程 协程 总结 异步 基本概念 异步编程 asyncio aiohttp 异常 常见异常 异常处理 自定义异常 lambda表达式 lambda表达式用法 高 ...
- python:一文搞懂多线程,多进程,异步协程的使用场景
本文将使用场景化为案例,将单线程,多线程,多进程,异步协程的速度进行对比 对比速度时, >表示 速度快于 >>表示速度远快于 =表示速度差不多 >>>> ...
- 超详细c语言简化tcp通信接口(多线程实现一个服务端处理多个客户端服务)
超详细c语言tcp通信接口 1.可下载源码(客户端 || 服务端通信) 2.说明 3.接口代码 4.客户端通信main_client_demo.c 5.服务端通信main_server_demo.c ...
- 实习手册八(Python基于Tornado框架的接口响应服务)最终章
目录 前言 Tornado_program common handler_base mysql_base sqlalchemy_base redis_base model log user serve ...
- 实习手册一(Python基于Tornado框架的接口响应服务)软件下载与环境配置
目录 一.软件下载及环境配置: 1.Homebrew 2.python3.9 3.PyCharm专业版 4.MySQL 5.Navicat for MySQL 6.Redis 7.Postman 此次 ...
- PyQt5界面多线程多进程爬虫(爬了600w张网页, 出现了一些问题)
有个问题没解决: 将运行过程显示在右边的界面框里的光标问题(光标问题容易解决), 及异常退出的问题. 错误代码: QObject::connect: Cannot queue arguments of ...
最新文章
- Java的LockSupport.park()实现分析
- Logistic回归主要应用领域
- java解析csv文件工具类,java操作CSV文件工具类
- 采用docker安装部署Nginx
- 从0开始html前端页面开发_CSS设置图像透明度
- Java下载安装详细教程(超详细版)
- 【遥感影像处理与分析】遥感影像校正详解-辐射校正、几何校正流程与方法比较
- FFMPEG详解(完整版)
- google四件套之Dagger2。从入门到爱不释手,之:Dagger2进阶知识及在Android中使用
- Adobe FLASH CS6 安装错误解决方法
- 阿里矢量图库 iconfont 引入项目使用方法
- 图书馆用户信息表单生成
- c语言中char ch和getchar()是什么意思?
- 微信小程序-加载图片
- 修复损坏图片的c语言,免费修复损坏的JPEG照片和图像
- 第二章:Java面向对象:抽象(abstract)类、模板方法设计模式、接口(interface)、关键字-implements(实现)、代理模式
- 在官网上下载慢解决办法
- 一. Mybits简单使用
- VR视频拍摄手法学习笔记
- xp系统修复 sfc /scannow 免光盘技巧
热门文章
- 制作一个四轮四驱底盘【内附资料下载链接】
- 数学黑洞(二)任何数都逃不出的西西弗斯黑洞
- Docker容器运行GUI程序的方法(直接进入Docker容器运行或通过SSH连接Docker容器运行)
- python 画椭圆_一篇文章教会你使用SVG ellipse 画椭圆
- 用Win10自带SSH实现免密登录Linux
- 【Linux学习】信号——信号保存 | 信号处理 | 不可重入函数,volatile,SIGCHLD信号
- 出生年(组成年份的数字种类)
- 双十一小马哥背后的女人们
- excel在一个单元格输入内容,在其他单元格同步显示
- 微信小程序——关于时间