Reactor模式是什么:

反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。

为何要用Reactor:

常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的contnect 、read、write ,特别是对于长链接的服务,有多少个c端,就需要在s端维护同等的IO连接。这对服务器来说是一个很大的开销。

1、BIO比如我们采用BIO的方式来维护和客户端的连接:

// 主线程维护连接  public void run() {      try {          while (true) {              Socket socket = serverSocket.accept();              //提交线程池处理              executorService.submit(new Handler(socket));          }      } catch (Exception e) {          e.printStackTrace();      }  }  // 处理读写服务  class Handler implements Runnable {      public void run() {          try {              //获取Socket的输入流,接收数据              BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));              String readData = buf.readLine();              while (readData != null) {                  readData = buf.readLine();                  System.out.println(readData);              }          } catch (Exception e) {              e.printStackTrace();          }      }  }

很明显,为了避免资源耗尽,我们采用线程池的方式来处理读写服务。但是这么做依然有很明显的弊端:

  1. 同步阻塞IO,读写阻塞,线程等待时间过长

  2. 在制定线程策略的时候,只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。

  3. 多线程之间的上下文切换,造成线程使用效率并不高,并且不易扩展

    状态数据以及其他需要保持一致的数据,需要采用并发同步控制

2、NIO,那么可以有其他方式来更好的处理么,我们可以采用NIO来处理,NIO中支持的基本机制:

public NIOServer(int port) throws Exception {      selector = Selector.open();      serverSocket = ServerSocketChannel.open();      serverSocket.socket().bind(new InetSocketAddress(port));      serverSocket.configureBlocking(false);      serverSocket.register(selector, SelectionKey.OP_ACCEPT);  }  @Override  public void run() {      while (!Thread.interrupted()) {          try {              //阻塞等待事件              selector.select();              // 事件列表              Set selected = selector.selectedKeys();              Iterator it = selected.iterator();              while (it.hasNext()) {                  it.remove();                  //分发事件                  dispatch((SelectionKey) (it.next()));              }          } catch (Exception e) {          }      }  }  private void dispatch(SelectionKey key) throws Exception {      if (key.isAcceptable()) {          register(key);//新链接建立,注册      } else if (key.isReadable()) {          read(key);//读事件处理      } else if (key.isWritable()) {          wirete(key);//写事件处理      }  }  private void register(SelectionKey key) throws Exception {      ServerSocketChannel server = (ServerSocketChannel) key              .channel();      // 获得和客户端连接的通道      SocketChannel channel = server.accept();      channel.configureBlocking(false);      //客户端通道注册到selector 上      channel.register(this.selector, SelectionKey.OP_READ);  }

我们可以看到上述的NIO例子已经差不多拥有reactor的影子了

  1. 基于事件驱动-> selector(支持对多个socketChannel的监听)

  2. 统一的事件分派中心-> dispatch

  3. 事件处理服务-> read & write

3、Reactor

首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:

  • Reactor 将I/O事件分派给对应的Handler

  • Acceptor 处理客户端新连接,并分派请求到处理器链中

  • Handlers 执行非阻塞读/写 任务

1、单Reactor单线程模型

 /**    * 等待事件到来,分发事件处理    */  class Reactor implements Runnable {      private Reactor() throws Exception {          SelectionKey sk =                  serverSocket.register(selector,                          SelectionKey.OP_ACCEPT);          // attach Acceptor 处理新连接          sk.attach(new Acceptor());      }      public void run() {          try {              while (!Thread.interrupted()) {                  selector.select();                  Set selected = selector.selectedKeys();                  Iterator it = selected.iterator();                  while (it.hasNext()) {                      it.remove();                      //分发事件处理                      dispatch((SelectionKey) (it.next()));                  }              }          } catch (IOException ex) {              //do something          }      }      void dispatch(SelectionKey k) {          // 若是连接事件获取是acceptor          // 若是IO读写事件获取是handler          Runnable runnable = (Runnable) (k.attachment());          if (runnable != null) {              runnable.run();          }      }  }    /**    * 连接事件就绪,处理连接事件    */  class Acceptor implements Runnable {      @Override      public void run() {          try {              SocketChannel c = serverSocket.accept();              if (c != null) {// 注册读写                  new Handler(c, selector);              }          } catch (Exception e) {          }      }  }

2、单Reactor多线程模型

/**    * 多线程处理读写业务逻辑    */  class MultiThreadHandler implements Runnable {      public static final int READING = 0, WRITING = 1;      int state;      final SocketChannel socket;      final SelectionKey sk;      //多线程处理业务逻辑      ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());      public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {          this.state = READING;          this.socket = socket;          sk = socket.register(selector, SelectionKey.OP_READ);          sk.attach(this);          socket.configureBlocking(false);      }      @Override      public void run() {          if (state == READING) {              read();          } else if (state == WRITING) {              write();          }      }      private void read() {          //任务异步处理          executorService.submit(() -> process());          //下一步处理写事件          sk.interestOps(SelectionKey.OP_WRITE);          this.state = WRITING;      }      private void write() {          //任务异步处理          executorService.submit(() -> process());          //下一步处理读事件          sk.interestOps(SelectionKey.OP_READ);          this.state = READING;      }      /**        * task 业务处理        */      public void process() {          //do IO ,task,queue something      }  }

3、多Reactor多线程模型

/**    * 多work 连接事件Acceptor,处理连接事件    */  class MultiWorkThreadAcceptor implements Runnable {      // cpu线程数相同多work线程      int workCount =Runtime.getRuntime().availableProcessors();      SubReactor[] workThreadHandlers = new SubReactor[workCount];      volatile int nextHandler = 0;      public MultiWorkThreadAcceptor() {          this.init();      }      public void init() {          nextHandler = 0;          for (int i = 0; i < workThreadHandlers.length; i++) {              try {                  workThreadHandlers[i] = new SubReactor();              } catch (Exception e) {              }          }      }      @Override      public void run() {          try {              SocketChannel c = serverSocket.accept();              if (c != null) {// 注册读写                  synchronized (c) {                      // 顺序获取SubReactor,然后注册channel                       SubReactor work = workThreadHandlers[nextHandler];                      work.registerChannel(c);                      nextHandler++;                      if (nextHandler >= workThreadHandlers.length) {                          nextHandler = 0;                      }                  }              }          } catch (Exception e) {          }      }  }   /**    * 多work线程处理读写业务逻辑    */  class SubReactor implements Runnable {      final Selector mySelector;      //多线程处理业务逻辑      int workCount =Runtime.getRuntime().availableProcessors();      ExecutorService executorService = Executors.newFixedThreadPool(workCount);      public SubReactor() throws Exception {          // 每个SubReactor 一个selector           this.mySelector = SelectorProvider.provider().openSelector();      }      /**        * 注册chanel        *        * @param sc        * @throws Exception        */      public void registerChannel(SocketChannel sc) throws Exception {          sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);      }      @Override      public void run() {          while (true) {              try {              //每个SubReactor 自己做事件分派处理读写事件                  selector.select();                  Set keys = selector.selectedKeys();                  Iterator iterator = keys.iterator();                  while (iterator.hasNext()) {                      SelectionKey key = iterator.next();                      iterator.remove();                      if (key.isReadable()) {                          read();                      } else if (key.isWritable()) {                          write();                      }                  }              } catch (Exception e) {              }          }      }      private void read() {          //任务异步处理          executorService.submit(() -> process());      }      private void write() {          //任务异步处理          executorService.submit(() -> process());      }      /**        * task 业务处理        */      public void process() {          //do IO ,task,queue something      }  }

c++ socket线程池_Netty(3)——Reactor线程模型相关推荐

  1. 线程池版本的mysql_MySQL线程池内幕

    摘要 MySQL线程池在初始化的时刻根据宿主机的CPU核心数设置thread_pool_size,这也就是线程池的线程组的个数.每个线程组在初始化之后会经由过程底层的IO库分派一个收集特别的句柄与之接 ...

  2. java线程池是如何复用线程_线程池如何复用一个线程-- ThreadPoolExecutor的实现(未完)...

    任务是一组逻辑工作单元,而线程则是使任务异步执行的机制.在Java中,Runnable对象代表一个任务,Thread对象负责创建一个线程执行这个任务. 前提:1. 程序需要处理大量任务 2. 任务的执 ...

  3. 一文详解java线程池 详解Java线程池的七个参数 详解池化技术 java如何选择核心线程数 详解Java线程池的拒绝策略

    目录 引言 线程池使用场景 加快请求响应(响应时间优先) 加快处理大任务(吞吐量优先) 特殊说明 线程池的池化技术 线程池的创建 手动创建 创建newFixedThreadPool线程池 创建newS ...

  4. springboot 线程池_Spring boot 2 线程池怎么配置

    线程池 在java 中线程池,就是ThreadPoolExecutor来构造,简单看下线程池包含的方法, corePoolSize:初始化线程.线程池中保留的线程数量. maximumPoolSize ...

  5. java 线程池 初始大小,Java线程池ThreadPoolExecutor的实现和参数

    接文章Java8线程池--底层为LinkedBlockingQueue的ThreadPoolExecutor,文章中简单介绍了线程池保持线程,并且从阻塞队列中获取任务执行的流程.本篇文章详细介绍线程池 ...

  6. C#如何判断线程池中所有的线程是否已经完成(转)

    其 实很简单用ThreadPool.RegisterWaitForSingleObject方法注册一个定时检查线程池的方法,在检查线程的方法内调用 ThreadPool.GetAvailableThr ...

  7. [.Net线程处理系列]专题二:线程池中的工作者线程

    目录: 一.上节补充 二.CLR线程池基础 三.通过线程池的工作者线程实现异步 四.使用委托实现异步 五.任务 六.小结 一.上节补充 对于Thread类还有几个常用方法需要说明的. 1.1 Susp ...

  8. 【Android 异步操作】线程池 ( 线程池使用示例 | 自定义线程池使用流程 | 自定义任务拒绝处理策略 | 完整代码示例 )

    文章目录 一.自定义线程池使用流程 二.自定义任务拒绝处理策略 三.完整代码示例 在博客 [Android 异步操作]线程池 ( 线程池简介 | 线程池初始化方法 | 线程池种类 | AsyncTas ...

  9. 【Android 异步操作】线程池 ( 线程池 reject 拒绝任务 | 线程池 addWorker 添加任务 )

    文章目录 一.线程池 reject 拒绝任务 二.线程池 addWorker 添加任务 在上一篇博客 [Android 异步操作]线程池 ( 线程池 execute 方法源码解析 ) 中 , 讲解 线 ...

  10. 【Android 异步操作】线程池 ( 线程池简介 | 线程池初始化方法 | 线程池种类 | AsyncTask 使用线程池示例 )

    文章目录 一.线程池简介 二.线程池初始化方法简介 三.线程池使用示例 一.线程池简介 线程池一般是实现了 ExecutorService 接口的类 , 一般使用 ThreadPoolExecutor ...

最新文章

  1. html5 canvas雨点打到窗玻璃动画
  2. 1067. 试密码(20)
  3. Redis Primer(1)基于JedisPool的Redis hset并发性能测试 - @钟超 · 技术博客专栏 - 博客频道 - CSDN.NET...
  4. TikTok英国市场你不能不知道的10大数据
  5. 022_Vue购物车
  6. 402.移掉K位数字,使得剩下数字最小
  7. stm32的语音识别_免费开源基于STM32的智能垃圾桶之舵机控制(HAL库)
  8. 如何使用VisualVM监视服务器上的多个JVM
  9. swagger core 和 swagger ui 如何关联【窥探】
  10. 云云协同解决方案全景图发布 华为云助力科技企业云上创新
  11. linux core 永久生效,【调试】Core Dump是什么?Linux下如何正确永久开启?
  12. linux centos ppp限速,Centos7限速和测速
  13. LeetCode每周刷题(2019.6.24-2019.6.30)
  14. TCP协议和UDP协议
  15. Java汉字转拼音实现方式
  16. 植物大战僵尸 php,植物大战僵尸Online
  17. 网络基础之冲突域和广播域
  18. 对分易教学平台考勤漏洞探索,批量签到app制作杂谈
  19. SQLI DUMB SERIES-10
  20. OpenStack 2014 用户调查解析——中国部署规模仅次于美国

热门文章

  1. 【语音识别】基于matlab DWT算法0~9数字语音识别【含Matlab源码 1726期】
  2. 【优化求解】基于matlab遗传算法求解道路流量优化问题【含Matlab源码 1480期】
  3. 【协同任务】基于matlab多无人机协同任务【含Matlab源码 1273期】
  4. SPSS遇到缺失值怎么办?删除还是替换?【SPSS 067期】
  5. mysql资源估算_关于数据库查询要耗费的服务器资源估算!高手进~
  6. sql外键需要输入吗_sql_外键(foreignkey)
  7. 从头实现linux操作系统_从头开始实现您的第一个人工神经元
  8. php 时间 拼接,PHP关于时间的时段的重合、 整合的方法
  9. wpf 引用的图片文件打包后找不到_PyQT5打包:用PyInstaller遇到的坑
  10. (原創) 這學期C++完美的Ending (C/C++) (日記)