c++ socket线程池_Netty(3)——Reactor线程模型
Reactor模式是什么:
反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。
为何要用Reactor:
常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的contnect 、read、write ,特别是对于长链接的服务,有多少个c端,就需要在s端维护同等的IO连接。这对服务器来说是一个很大的开销。
1、BIO比如我们采用BIO的方式来维护和客户端的连接:
// 主线程维护连接 public void run() { try { while (true) { Socket socket = serverSocket.accept(); //提交线程池处理 executorService.submit(new Handler(socket)); } } catch (Exception e) { e.printStackTrace(); } } // 处理读写服务 class Handler implements Runnable { public void run() { try { //获取Socket的输入流,接收数据 BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream())); String readData = buf.readLine(); while (readData != null) { readData = buf.readLine(); System.out.println(readData); } } catch (Exception e) { e.printStackTrace(); } } }
很明显,为了避免资源耗尽,我们采用线程池的方式来处理读写服务。但是这么做依然有很明显的弊端:
同步阻塞IO,读写阻塞,线程等待时间过长
在制定线程策略的时候,只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。
多线程之间的上下文切换,造成线程使用效率并不高,并且不易扩展
状态数据以及其他需要保持一致的数据,需要采用并发同步控制
2、NIO,那么可以有其他方式来更好的处理么,我们可以采用NIO来处理,NIO中支持的基本机制:
public NIOServer(int port) throws Exception { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { while (!Thread.interrupted()) { try { //阻塞等待事件 selector.select(); // 事件列表 Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { it.remove(); //分发事件 dispatch((SelectionKey) (it.next())); } } catch (Exception e) { } } } private void dispatch(SelectionKey key) throws Exception { if (key.isAcceptable()) { register(key);//新链接建立,注册 } else if (key.isReadable()) { read(key);//读事件处理 } else if (key.isWritable()) { wirete(key);//写事件处理 } } private void register(SelectionKey key) throws Exception { ServerSocketChannel server = (ServerSocketChannel) key .channel(); // 获得和客户端连接的通道 SocketChannel channel = server.accept(); channel.configureBlocking(false); //客户端通道注册到selector 上 channel.register(this.selector, SelectionKey.OP_READ); }
我们可以看到上述的NIO例子已经差不多拥有reactor的影子了
基于事件驱动-> selector(支持对多个socketChannel的监听)
统一的事件分派中心-> dispatch
事件处理服务-> read & write
3、Reactor
首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:
Reactor 将I/O事件分派给对应的Handler
Acceptor 处理客户端新连接,并分派请求到处理器链中
Handlers 执行非阻塞读/写 任务
1、单Reactor单线程模型
/** * 等待事件到来,分发事件处理 */ class Reactor implements Runnable { private Reactor() throws Exception { SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // attach Acceptor 处理新连接 sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { it.remove(); //分发事件处理 dispatch((SelectionKey) (it.next())); } } } catch (IOException ex) { //do something } } void dispatch(SelectionKey k) { // 若是连接事件获取是acceptor // 若是IO读写事件获取是handler Runnable runnable = (Runnable) (k.attachment()); if (runnable != null) { runnable.run(); } } } /** * 连接事件就绪,处理连接事件 */ class Acceptor implements Runnable { @Override public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) {// 注册读写 new Handler(c, selector); } } catch (Exception e) { } } }
2、单Reactor多线程模型
/** * 多线程处理读写业务逻辑 */ class MultiThreadHandler implements Runnable { public static final int READING = 0, WRITING = 1; int state; final SocketChannel socket; final SelectionKey sk; //多线程处理业务逻辑 ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception { this.state = READING; this.socket = socket; sk = socket.register(selector, SelectionKey.OP_READ); sk.attach(this); socket.configureBlocking(false); } @Override public void run() { if (state == READING) { read(); } else if (state == WRITING) { write(); } } private void read() { //任务异步处理 executorService.submit(() -> process()); //下一步处理写事件 sk.interestOps(SelectionKey.OP_WRITE); this.state = WRITING; } private void write() { //任务异步处理 executorService.submit(() -> process()); //下一步处理读事件 sk.interestOps(SelectionKey.OP_READ); this.state = READING; } /** * task 业务处理 */ public void process() { //do IO ,task,queue something } }
3、多Reactor多线程模型
/** * 多work 连接事件Acceptor,处理连接事件 */ class MultiWorkThreadAcceptor implements Runnable { // cpu线程数相同多work线程 int workCount =Runtime.getRuntime().availableProcessors(); SubReactor[] workThreadHandlers = new SubReactor[workCount]; volatile int nextHandler = 0; public MultiWorkThreadAcceptor() { this.init(); } public void init() { nextHandler = 0; for (int i = 0; i < workThreadHandlers.length; i++) { try { workThreadHandlers[i] = new SubReactor(); } catch (Exception e) { } } } @Override public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) {// 注册读写 synchronized (c) { // 顺序获取SubReactor,然后注册channel SubReactor work = workThreadHandlers[nextHandler]; work.registerChannel(c); nextHandler++; if (nextHandler >= workThreadHandlers.length) { nextHandler = 0; } } } } catch (Exception e) { } } } /** * 多work线程处理读写业务逻辑 */ class SubReactor implements Runnable { final Selector mySelector; //多线程处理业务逻辑 int workCount =Runtime.getRuntime().availableProcessors(); ExecutorService executorService = Executors.newFixedThreadPool(workCount); public SubReactor() throws Exception { // 每个SubReactor 一个selector this.mySelector = SelectorProvider.provider().openSelector(); } /** * 注册chanel * * @param sc * @throws Exception */ public void registerChannel(SocketChannel sc) throws Exception { sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT); } @Override public void run() { while (true) { try { //每个SubReactor 自己做事件分派处理读写事件 selector.select(); Set keys = selector.selectedKeys(); Iterator iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { read(); } else if (key.isWritable()) { write(); } } } catch (Exception e) { } } } private void read() { //任务异步处理 executorService.submit(() -> process()); } private void write() { //任务异步处理 executorService.submit(() -> process()); } /** * task 业务处理 */ public void process() { //do IO ,task,queue something } }
c++ socket线程池_Netty(3)——Reactor线程模型相关推荐
- 线程池版本的mysql_MySQL线程池内幕
摘要 MySQL线程池在初始化的时刻根据宿主机的CPU核心数设置thread_pool_size,这也就是线程池的线程组的个数.每个线程组在初始化之后会经由过程底层的IO库分派一个收集特别的句柄与之接 ...
- java线程池是如何复用线程_线程池如何复用一个线程-- ThreadPoolExecutor的实现(未完)...
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制.在Java中,Runnable对象代表一个任务,Thread对象负责创建一个线程执行这个任务. 前提:1. 程序需要处理大量任务 2. 任务的执 ...
- 一文详解java线程池 详解Java线程池的七个参数 详解池化技术 java如何选择核心线程数 详解Java线程池的拒绝策略
目录 引言 线程池使用场景 加快请求响应(响应时间优先) 加快处理大任务(吞吐量优先) 特殊说明 线程池的池化技术 线程池的创建 手动创建 创建newFixedThreadPool线程池 创建newS ...
- springboot 线程池_Spring boot 2 线程池怎么配置
线程池 在java 中线程池,就是ThreadPoolExecutor来构造,简单看下线程池包含的方法, corePoolSize:初始化线程.线程池中保留的线程数量. maximumPoolSize ...
- java 线程池 初始大小,Java线程池ThreadPoolExecutor的实现和参数
接文章Java8线程池--底层为LinkedBlockingQueue的ThreadPoolExecutor,文章中简单介绍了线程池保持线程,并且从阻塞队列中获取任务执行的流程.本篇文章详细介绍线程池 ...
- C#如何判断线程池中所有的线程是否已经完成(转)
其 实很简单用ThreadPool.RegisterWaitForSingleObject方法注册一个定时检查线程池的方法,在检查线程的方法内调用 ThreadPool.GetAvailableThr ...
- [.Net线程处理系列]专题二:线程池中的工作者线程
目录: 一.上节补充 二.CLR线程池基础 三.通过线程池的工作者线程实现异步 四.使用委托实现异步 五.任务 六.小结 一.上节补充 对于Thread类还有几个常用方法需要说明的. 1.1 Susp ...
- 【Android 异步操作】线程池 ( 线程池使用示例 | 自定义线程池使用流程 | 自定义任务拒绝处理策略 | 完整代码示例 )
文章目录 一.自定义线程池使用流程 二.自定义任务拒绝处理策略 三.完整代码示例 在博客 [Android 异步操作]线程池 ( 线程池简介 | 线程池初始化方法 | 线程池种类 | AsyncTas ...
- 【Android 异步操作】线程池 ( 线程池 reject 拒绝任务 | 线程池 addWorker 添加任务 )
文章目录 一.线程池 reject 拒绝任务 二.线程池 addWorker 添加任务 在上一篇博客 [Android 异步操作]线程池 ( 线程池 execute 方法源码解析 ) 中 , 讲解 线 ...
- 【Android 异步操作】线程池 ( 线程池简介 | 线程池初始化方法 | 线程池种类 | AsyncTask 使用线程池示例 )
文章目录 一.线程池简介 二.线程池初始化方法简介 三.线程池使用示例 一.线程池简介 线程池一般是实现了 ExecutorService 接口的类 , 一般使用 ThreadPoolExecutor ...
最新文章
- html5 canvas雨点打到窗玻璃动画
- 1067. 试密码(20)
- Redis Primer(1)基于JedisPool的Redis hset并发性能测试 - @钟超 · 技术博客专栏 - 博客频道 - CSDN.NET...
- TikTok英国市场你不能不知道的10大数据
- 022_Vue购物车
- 402.移掉K位数字,使得剩下数字最小
- stm32的语音识别_免费开源基于STM32的智能垃圾桶之舵机控制(HAL库)
- 如何使用VisualVM监视服务器上的多个JVM
- swagger core 和 swagger ui 如何关联【窥探】
- 云云协同解决方案全景图发布 华为云助力科技企业云上创新
- linux core 永久生效,【调试】Core Dump是什么?Linux下如何正确永久开启?
- linux centos ppp限速,Centos7限速和测速
- LeetCode每周刷题(2019.6.24-2019.6.30)
- TCP协议和UDP协议
- Java汉字转拼音实现方式
- 植物大战僵尸 php,植物大战僵尸Online
- 网络基础之冲突域和广播域
- 对分易教学平台考勤漏洞探索,批量签到app制作杂谈
- SQLI DUMB SERIES-10
- OpenStack 2014 用户调查解析——中国部署规模仅次于美国
热门文章
- 【语音识别】基于matlab DWT算法0~9数字语音识别【含Matlab源码 1726期】
- 【优化求解】基于matlab遗传算法求解道路流量优化问题【含Matlab源码 1480期】
- 【协同任务】基于matlab多无人机协同任务【含Matlab源码 1273期】
- SPSS遇到缺失值怎么办?删除还是替换?【SPSS 067期】
- mysql资源估算_关于数据库查询要耗费的服务器资源估算!高手进~
- sql外键需要输入吗_sql_外键(foreignkey)
- 从头实现linux操作系统_从头开始实现您的第一个人工神经元
- php 时间 拼接,PHP关于时间的时段的重合、 整合的方法
- wpf 引用的图片文件打包后找不到_PyQT5打包:用PyInstaller遇到的坑
- (原創) 這學期C++完美的Ending (C/C++) (日記)