事件处理模式

在《面向模式的软件体系架构卷2:用于并发和网络化对象模式》中,总结了对于当前比较流行的事件处理模式的四种基本模式,分别是反应器模式、主动器模式、异步完成标记和接收器-连接器模式。

  1. 反应器模式,该模式引入的结构将事件驱动的应用可以多路分解并分配从一个或者多个客户机发送应用的服务请求,该模式逆转了应用程序中的控制流,也就是好莱坞原则(不要打电话给我们、我们会打电话给你的),即当有事件准备完成之后就会通过应用程序,有事件准备好可以执行,然后应用程序调用对应的回调函数来处理对应的事件,这样应用程序只需要实现具体事件处理程序来配合多路分解机制和分配机制,虽然该模式相对直观但是该模式还是收到了一定的性能限制,特别是它还是不能同时支持大量的客户机或者耗时长的客户机请求,因为它在事件多路分解层串行化了所有的事件处理程序的处理过程,处理性能并不是很高,当然现在也有好多反应器模式的变种来提高处理性能。
  2. 主动器模式,是事件驱动应用能有效的多路分解和分配由完成的异步操作所触发的服务请求,在一定情况下,它获得了并发的性能优势,在该模式中,客户机和完成处理程序所代表的应用程序称为主动性主体。与被动地等待指示事件的到达并作出响应的反应器模式不同,主动器模式中的客户机和完成处理程序通过在一个异步操作处理器中主动地初始化一个或者多个异步操作请求,引起应用程序内部的控制流和数据流,异步操作完成后,异步操作处理器和指定的主动器组件协作,将产生的完成事件多路分解给相关的完成处理程序,并分配这些处理程序的回调处理方法,完成处理程序处理一个完成事件后,就主动地激活一个异步操作请求。限制就是异步操作还需要操作系统支持,如果操作系统不支持则需要通过多线程等其他方式来模拟实现。
  3. 异步完成标记模式,使应用程序能对它在服务中调用异步操作而引起的响应进行有效地多路分解和处理,从而提高异步处理的效率,主要是对主动器模式中任务的多路分解的优化。
  4. 接收器-接收器模式,该模式经常和反应器模式结合使用,将网络化系统中同级服务的连接和协作初始化与随后进行的处理分开,该模式允许应用程序配置它们的连接拓扑结构,进行这种配置不依赖于应用程序所提供的服务。

本文主要就是介绍最常用的反应器模式,该模式主要就是通过将事件进行多路分解然后通过不同的回调函数处理不同的事件。

反应器模式

当前的C10k问题,高性能的服务端的实现模型,基本上都是选用的反应器模式来实现,通过多路IO复用来进行处理请求的处理,在处理网络请求的过程中主要就是通过connect、accept、read、write等操作接受网络请求,接受网络数据,发送处理结果等操作,首先来查看最基础的反应器模式的实现方法。

单线程反应器模式
客户端服务端初始化事件循环并监听事件连接服务端connect事件接受请求发送数据读事件处理并返回数据写事件将数据返回客户端服务端

该时序图就是简易的描述了反应器模式所有事件的执行都是通过读事件或者写事件进行驱动的。

初始化事件循环机制
新进来连接
传入数据请求
写入待发送数据
循环等待事件
服务端
事件循环
读事件
写事件
接受该连接并监听读数据事件
接受数据处理并将返回数据通过写事件返回
响应会客户端

单线程模式下的服务端代码相对比较简单,如下所示;

import selectors
import socketselector = selectors.DefaultSelector()def application():return "test response"class RequestHandler(object):def __init__(self, stream, address, server):self.application = applicationself.stream = streamself.stream.setblocking(False)self.address = addressself.server = serverself._recv_buff = ""self._write_buff = b""self.state = selectors.EVENT_READselector.register(self.stream, selectors.EVENT_READ, self._handle_event)def parse_request(self):try:response = self.application()resp = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {0}\r\n\r\n{1}".format(len(response), response)except Exception as e:response = "error"resp = "HTTP/1.1 500 OK\r\nContent-Type: text/plain\r\nContent-Length: {0}\r\n\r\n{1}".format(len(response), response)self._write_buff += resp.encode(encoding="utf-8")def _handle_event(self, fd, mask):if mask & selectors.EVENT_READ:self._handle_read()elif mask & selectors.EVENT_WRITE:self._handle_write()state = 0if self._recv_buff:state |= selectors.EVENT_READif self._write_buff:state |= selectors.EVENT_WRITE# print(" state  and check state  ", state, self.state, mask)if state != 0 and state != self.state:self.state = stateself.modify_state(state)def _handle_read(self):data = self.stream.recv(1024)if data:self._recv_buff += data.decode("utf-8")self.parse_request()else:self._handle_close()def modify_state(self, state):selector.modify(self.stream, state, self._handle_event)def _handle_write(self):while self._write_buff:try:length = self.stream.send(self._write_buff)self._write_buff = self._write_buff[length:]except Exception as e:print("write error {0}".format(e))def _handle_close(self):print("handle close")selector.unregister(self.stream)try:self.stream.close()except Exception:passclass Server(object):address_family = socket.AF_INETsocket_type = socket.SOCK_STREAMrequest_queue_size = 5def __init__(self, server_bind, handle_class=RequestHandler):self.__shutdown_request = Falseself.allow_reuse_address = Trueself.socket = Noneself.handle_class = handle_classself.server_address = server_bindself.socket = socket.socket(self.address_family,self.socket_type)self.server_bind()def server_bind(self):if self.allow_reuse_address:self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.socket.bind(self.server_address)self.server_address = self.socket.getsockname()self.socket.listen(self.request_queue_size)def serve_forever(self, poll_interval=0.5):selector.register(self.socket, selectors.EVENT_READ, self._handle_request_noblock)while True:ready = selector.select(poll_interval)if self.__shutdown_request:breakfor key, mask in ready:callback = key.datacallback(key.fileobj, mask)def _handle_request_noblock(self, fd, mask):try:conn, address = self.socket.accept()except Exception:returntry:self.handle_class(conn, address, self)except Exception as e:print(" handle_class Error {0}".format(e))def main():server = Server(("127.0.0.1", 5555))server.serve_forever()if __name__ == '__main__':main()

这段代码就是简单的单线程的反应器模式的简单实现,当运行该脚本之后,在终端中或者浏览器中访问http://127.0.0.1:5555就会得到如下返回;

curl  127.0.0.1:5555
test response

这行返回数据就是脚本中application函数返回的内容,因为该脚本只是做原理性的说明,故没有按照http协议的标准来解析数据只是做了简单的数据返回而已,从该端结构也可看出所有的响应请求都是阻塞执行,事件的请求都是阻塞在异步事件驱动框架中进行。进行压测查看一下性能。

 wrk -t4 -c1024 -d90s -T5  --latency http://127.0.0.1:5555
Running 2m test @ http://127.0.0.1:55554 threads and 1024 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency    59.51ms   14.10ms 225.26ms   75.76%Req/Sec     3.88k   448.64     5.62k    72.11%Latency Distribution50%   63.10ms75%   66.94ms90%   73.14ms99%   83.63ms1388445 requests in 1.50m, 103.28MB readSocket errors: connect 0, read 1887, write 35, timeout 0
Requests/sec:  15421.53
Transfer/sec:      1.15MB
多线程单事件驱动改进

在单线程反应器模式中,由于一个线程进行事件的驱动,并在驱动的过程中来处理业务逻辑,此时我们尝试改造成多个线程等待进来的请求,事件驱动模式还是单事件驱动。

初始化事件循环机制
新进来连接
处理业务逻辑
事件循环的写事件
服务端
事件循环
读事件
线程池
注册写事件
写事件
响应客户端
import selectors
import socket
import queue
from threading import Threadselector = selectors.DefaultSelector()def application():return "test response"class RequestHandler(object):def __init__(self, stream, address, server):self.application = applicationself.stream = streamself.stream.setblocking(False)self.address = addressself.server = serverself._recv_buff = ""self._write_buff = b""self.state = selectors.EVENT_READselector.register(self.stream, selectors.EVENT_READ, self._handle_event)def parse_request(self):try:response = self.application()resp = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {0}\r\n\r\n{1}".format(len(response), response)except Exception as e:response = "error"resp = "HTTP/1.1 500 OK\r\nContent-Type: text/plain\r\nContent-Length: {0}\r\n\r\n{1}".format(len(response), response)self._write_buff += resp.encode(encoding="utf-8")def _handle_event(self, fd, mask):if mask & selectors.EVENT_READ:self._handle_read()elif mask & selectors.EVENT_WRITE:self._handle_write()state = 0if self._recv_buff:state |= selectors.EVENT_READif self._write_buff:state |= selectors.EVENT_WRITE# print(" state  and check state  ", state, self.state, mask)if state != 0 and state != self.state:self.state = stateself.modify_state(state)def _handle_read(self):data = self.stream.recv(1024)if data:self._recv_buff += data.decode("utf-8")self.parse_request()else:self._handle_close()def modify_state(self, state):selector.modify(self.stream, state, self._handle_event)def _handle_write(self):while self._write_buff:try:length = self.stream.send(self._write_buff)self._write_buff = self._write_buff[length:]except Exception as e:print("write error {0}".format(e))def _handle_close(self):print("handle close")selector.unregister(self.stream)try:self.stream.close()except Exception:passclass Server(object):address_family = socket.AF_INETsocket_type = socket.SOCK_STREAMrequest_queue_size = 5def __init__(self, server_bind, handle_class=RequestHandler):self.__shutdown_request = Falseself.allow_reuse_address = Trueself.socket = Noneself.handle_class = handle_classself.server_address = server_bindself.socket = socket.socket(self.address_family,self.socket_type)self.server_bind()self.work_queue = queue.Queue()self.start_worker()def start_worker(self):for i in range(10):t = Thread(target=self.spawn_worker, args=(i, ))t.start()def spawn_worker(self, num):while not self.__shutdown_request:try:conn, address = self.work_queue.get()except Exception as e:print("spawn_worrker get  {0}".format(e))returnprint("worker  thread  num  :  {0}".format(num))try:self.handle_class(conn, address, self)except Exception as e:print(" handle_class Error {0}".format(e))def server_bind(self):if self.allow_reuse_address:self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.socket.bind(self.server_address)self.server_address = self.socket.getsockname()self.socket.listen(self.request_queue_size)def serve_forever(self, poll_interval=0.5):selector.register(self.socket, selectors.EVENT_READ, self._handle_request_noblock)while True:ready = selector.select(poll_interval)if self.__shutdown_request:breakfor key, mask in ready:callback = key.datacallback(key.fileobj, mask)def _handle_request_noblock(self, fd, mask):try:conn, address = self.socket.accept()except Exception:returnself.work_queue.put((conn, address))def main():server = Server(("127.0.0.1", 5555))server.serve_forever()if __name__ == '__main__':main()

通过加入线程池解决了并发响应客户端数据的性能,但由于python本身在多线程中有GIL锁的存在故利用线程池的解决方案可能性能未必有很好的提升,而且在响应方案中由于加入了线程安全的队列,这也加重了在多线程条件下的抢占的开销,改进后的压测数据如下所示,通过对比可知加入多线程的解决方案的性能还略有下降。

 wrk -t4 -c1024 -d90s -T5  --latency http://127.0.0.1:5555
Running 2m test @ http://127.0.0.1:55554 threads and 1024 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency    58.89ms   17.19ms 242.36ms   74.64%Req/Sec     3.77k   446.73     5.53k    70.19%Latency Distribution50%   64.36ms75%   68.54ms90%   74.92ms99%   85.79ms1349814 requests in 1.50m, 100.41MB readSocket errors: connect 0, read 1970, write 60, timeout 0
Requests/sec:  14987.46
Transfer/sec:      1.11MB
多事件驱动加多线程处理的反应器模式

在该模式中,新增加多个事件驱动模式,主事件驱动只需要接受新接受的连接请求,剩余的连接的事件驱动都由子事件驱动来进行交互,从而比单事件驱动提高了事件驱动的效率。

初始化事件循环机制
初始化子事件驱动
新进来连接
选择一个子事件驱动
处理业务逻辑
子事件循环的写事件
服务端
事件循环
子事件驱动集
读事件
线程池
注册写事件
写事件
响应客户端
import selectors
import socket
import queue
from threading import Thread, Lock
import randomselector = selectors.DefaultSelector()def application():return "test response"class RequestHandler(object):def __init__(self, stream, address, server, sel):self.application = applicationself.stream = streamself.stream.setblocking(False)self.address = addressself.server = serverself._recv_buff = ""self._write_buff = b""self.state = selectors.EVENT_READself.sel = selself.sel.register(self.stream, selectors.EVENT_READ, self._handle_event)def parse_request(self):try:response = self.application()resp = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {0}\r\n\r\n{1}".format(len(response), response)except Exception as e:response = "error"resp = "HTTP/1.1 500 OK\r\nContent-Type: text/plain\r\nContent-Length: {0}\r\n\r\n{1}".format(len(response), response)self._write_buff += resp.encode(encoding="utf-8")def _handle_event(self, fd, mask):if mask & selectors.EVENT_READ:self._handle_read()elif mask & selectors.EVENT_WRITE:self._handle_write()state = 0if self._recv_buff:state |= selectors.EVENT_READif self._write_buff:state |= selectors.EVENT_WRITE# print(" state  and check state  ", state, self.state, mask)if state != 0 and state != self.state:self.state = stateself.modify_state(state)def _handle_read(self):data = self.stream.recv(1024)if data:self._recv_buff += data.decode("utf-8")self.parse_request()else:self._handle_close()def modify_state(self, state):self.sel.modify(self.stream, state, self._handle_event)def _handle_write(self):while self._write_buff:try:length = self.stream.send(self._write_buff)self._write_buff = self._write_buff[length:]except Exception as e:print("write error {0}".format(e))def _handle_close(self):print("handle close")self.sel.unregister(self.stream)try:self.stream.close()except Exception:passclass Server(object):address_family = socket.AF_INETsocket_type = socket.SOCK_STREAMrequest_queue_size = 5def __init__(self, server_bind, handle_class=RequestHandler):self.__shutdown_request = Falseself.allow_reuse_address = Trueself.socket = Noneself.handle_class = handle_classself.server_address = server_bindself.socket = socket.socket(self.address_family,self.socket_type)self.server_bind()self.work_queue = queue.Queue()self.start_worker()self.sels = []self.lock = Lock()self.start_sels()def start_sels(self):for i in range(5):t = Thread(target=self.sub_forever)t.start()def start_worker(self):for i in range(10):t = Thread(target=self.spawn_worker, args=(i, ))t.start()def spawn_worker(self, num):while not self.__shutdown_request:try:conn, address = self.work_queue.get()except Exception as e:print("spawn_worrker get  {0}".format(e))returnprint("worker  thread  num  :  {0}".format(num))rand_index_sel = random.randint(0, 4)print("random  sels  index  : {0}".format(rand_index_sel))sel = self.sels[rand_index_sel]try:self.handle_class(conn, address, self, sel)except Exception as e:print(" handle_class Error {0}".format(e))def server_bind(self):if self.allow_reuse_address:self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.socket.bind(self.server_address)self.server_address = self.socket.getsockname()self.socket.listen(self.request_queue_size)def sub_forever(self, poll_interval=0.5):selector_sub = selectors.DefaultSelector()print("start sub_selector_sub")with self.lock:self.sels.append(selector_sub)print("current  sels ", self.sels)while True:ready = selector_sub.select(poll_interval)if self.__shutdown_request:breakfor key, mask in ready:print("sub ready  : {0}".format(key))callback = key.datacallback(key.fileobj, mask)def serve_forever(self, poll_interval=0.5):selector.register(self.socket, selectors.EVENT_READ, self._handle_request_noblock)while True:ready = selector.select(poll_interval)if self.__shutdown_request:breakfor key, mask in ready:print("main  selector  events : {0}".format(key))callback = key.datacallback(key.fileobj, mask)def _handle_request_noblock(self, fd, mask):try:conn, address = self.socket.accept()except Exception:returnself.work_queue.put((conn, address))def main():server = Server(("127.0.0.1", 5555))server.serve_forever()if __name__ == '__main__':main()

在该模式的改进下,主要通过新增多个子线程来,并在每个子线程中初始化一个事件驱动并单独执行事件驱动,每个子事件驱动互相独立,从而提高了事件驱动的响应效率。

 wrk -t4 -c1024 -d90s -T5  --latency http://127.0.0.1:5555
Running 2m test @ http://127.0.0.1:55554 threads and 1024 connectionsThread Stats   Avg      Stdev     Max   +/- StdevLatency   181.72ms   57.96ms 471.36ms   72.76%Req/Sec     1.19k   280.01     2.27k    71.15%Latency Distribution50%  194.47ms75%  216.44ms90%  239.99ms99%  303.04ms425025 requests in 1.50m, 31.62MB readSocket errors: connect 0, read 2101, write 221, timeout 0
Requests/sec:   4719.33
Transfer/sec:    359.48KB

从压测的效果来看,无疑简单粗暴的修改为这种形式效果很差,选用这种模式需要优化的点还有很多,而且因为在Python中新加了几个线程来执行,无疑更加重了调度的成本,后续有时间可继续优化该模式的响应性能。

总结

本文主要是总结了反应器模式常用的一些示例,几种不同的模式下的响应都不相同,故所面对的响应性能也有所差别,本文主要是原理性的示例而已,其中具体的优化的措施或者示例代码有不对的地方并没有做过多的考虑,单线程的反应器模式是目前应用比较广泛的一种模式,例如Redis的事件驱动也采用该种模式。。由于本人才疏学浅,如有错误请批评指正。

模式设计概述:反应器(Reactor)模式相关推荐

  1. 模式设计概述:代理者模式

    分布式系统模式 分布式相关的模式设计有大概三种模式,分布式系统与集中式系统相比需要完全不同的软件.管道和过滤器模式,微核和代理者模式. 代理者模式 代理者模式体系结构的强制条件是 组件应该能够访问其他 ...

  2. 【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )

    文章目录 一. 反应器 ( Reactor ) 模式 二. 反应器 ( Reactor ) 模式两大组件 三. 单反应器 ( Reactor ) 单线程 四. 单反应器 ( Reactor ) 单线程 ...

  3. reactor模式:多线程的reactor模式

    上文说到单线程的reactor模式 reactor模式:单线程的reactor模式 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题,所以多线程的reactor模式引入线程池的概念, ...

  4. ACE反应器(Reactor)模式

    1.ACE反应器框架简介 反应器(Reactor):用于事件多路分离和分派的体系结构模式 通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞与非阻塞.所谓阻塞方式的意思是指, 当试图对 ...

  5. 高性能IO设计中的Reactor模式与Proactor模式

    为什么80%的码农都做不了架构师?>>>    在高性能的IO设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proacto ...

  6. 两种IO模式:Proactor与Reactor模式

    在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作. 在比较这两个模式之前,我们首先的搞明白 ...

  7. java reactor模式例子_回顾 Reactor 模式

    Reactor 作为网络编程库的核心模式的 Reactor 模式是网络编程中的最常用的模式,反应器 Reactor 又名分派器 Dispatcher, 或通知器 Notifier, 重温一下 POSA ...

  8. 反模式设计_设计模式:模式或反模式,这就是问题

    反模式设计 我最近遇到了Wiki页面" Anti-pattern" ,其中包含详尽的反模式列表. 其中一些对我来说很明显. 他们中的一些让我想了一下,其他的让我想了更多. 然后,我 ...

  9. UML+模式设计概述

    转自于:http://blog.csdn.net/rexuefengye/article/details/13020225 工程学:工程庞大到一定程度必须是用工程学方法,好比直接用水泥沙子建设实用的摩 ...

最新文章

  1. 安卓高手之路之 WindowManager
  2. 电路基础知识 -- 三态
  3. C 语言 方法外部的数组与普通变量传入方法内部时的区别
  4. opencv 阈值分割_CVPR2019实例分割Mask Scoring RCNN
  5. 少儿编程100讲轻松学python(一)-python怎么打开
  6. 199-Pycharm相关
  7. SAP 严重漏洞可导致供应链攻击
  8. SylixOS中AARCH64跳转表实现原理
  9. 分位数回归-Quantile regression
  10. F: Shattered Cake
  11. android 自动跳转市场,js判断设备,跳转app应用、android市场或者AppStore
  12. 安装gooreplacer插件为含有googlefonts api的网页提速
  13. Linux 中的 -rwxr-xr-x 权限代表什么意思 Linux 中的权限
  14. 51单片机LED 8*8点阵屏显示图形
  15. maven 问题解决(Failed to look for file: http://)
  16. 使用sqlyog连接阿里云rds数据库
  17. matlab m序列扩频,基于matlab的移位寄存器法m序列的产生
  18. C# 串口CRC CCITT-FALSE 校验
  19. 计算机维修高级工考试员题库,职业技能鉴定国家题库统一试卷高级计算机维修工知识试题...
  20. Python机器学习基础

热门文章

  1. “一键”部署分布式训练,微软“群策MARO”上新集群管理助手
  2. 关于2021年及未来,人工智能的5大趋势预测
  3. 4场直播,哈工大、亚马逊等大咖为你带来机器学习与知识图谱的内容盛宴
  4. 如何更新你的机器学习模型?手把手带你设计一个可持续的预测模型!
  5. 简单粗暴理解与实现机器学习之逻辑回归:逻辑回归介绍、应用场景、原理、损失以及优化...
  6. 挑战弱监督学习的三大热门问题 AutoWSL2019挑战赛正式开赛
  7. Space X的火箭上天,Tesla的业绩落地
  8. 超实用总结:AI实践者需要用到的10个深度学习方法
  9. 面试官:MySQL 表设计要注意什么?
  10. 我的读论文经验总结!