2019独角兽企业重金招聘Python工程师标准>>>

在了解Reactor模式之前, 首先了解什么是NIO.

java.nio全称java non-blocking IO 即非阻塞IO.这个地方要明白,非阻塞不等于异步。

非阻塞:当一个线上读取数据,没有数据时该线程可以干其他的事情。就是调用了之后立马返回。

异步IO: 在一个IO操作中,用户态的线程完全不用考虑数据的读取过程,都交给操作系统完成,完成之后通知用户线程即可。这才是真正的异步操作。

同步IO  每个请求必须逐个的被处理,一个流程的处理会导致整个流程的暂时等待。

阻塞:  某个请求发出后,该请求操作需要的条件不满足,请求会一直阻塞,不会返回,直到条件满足。

其中java NIO 中的 Select 在Linux中基于epoll实现。基于IO多路复用。就是一个线程来管理多个IO.

epoll全称eventpoll 是linux内核针对IO多路复用的实现。在linux中,和epoll类似的由select和poll。

其中epoll监听的fd集合是一直在内核存在的,有三个系统调用:epoll_create epoll_wait epoll_ctl 通过epoll_wait可以多次监听同一个fd结合,只返回可读写的那部分。

select只有一个系统调用,就是每次都需要将要监听的所有集合都传给操作系统,当有事件发生时。操作系统在返回给你整个集合。

NIO核心包含三个部分: Channels Buffers Selectors.

Channel: 在NIO中,所有的IO过程都是从建立一个Channel开始的,数据可以从channel中读取到Buffer中 也可以从Buffer中写入到channel中。channel就好像BIO中的流。但是channel时双向的,我感觉这样更贴近于现实,毕竟TCP连接是全双工的。

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

channel分为这四种,分别对应着文件,UDP TCP网络IO.

Buffer buffer即为缓冲区,也就是数据块。

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

java基础数据类型中除了boolean都有对应的buffer实现。

Selector (选择器): 他是NIO中的关键所在,我们在程序中可以通过它来实现一个线程同时处理多个Channel 也就是多个连接。

如上图,一个Selectot监听五个通道,在使用时首先需要将通道以及对应感兴趣的事件(Accept   read  writer等 )注册到Selector上 。当发生对应的事件时,操作系统回通知我们的程序。在Selector中可以读取到对应的Channel 根据事件类型做出相应的操作。

零拷贝

java NIO中提供的FileChannel拥有transferTo和transferFrom两个方法,可以直接把FileChannel中的数据拷贝到另一个Channel,或者把另一个Channel中的数据拷贝到FileChannel .在操作系统的支持下,通过这个方法传输数据不需要将原数据从内核态拷贝到用户态,再从用户态拷贝到内核态。

Reactor实现一个简单的Echo服务器  基于单个线程同时处理多个连接。这样一个Selector同时完成Accept  Read Write事件的监听,同时业逻辑也和Selector在同一个线程中执行。这里可以优化一下将业务逻辑在新的线程中执行。

public class EchoService {private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector, SelectionKey.OP_ACCEPT);while (true){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iteratorKey=keys.iterator();while (iteratorKey.hasNext()){SelectionKey key=iteratorKey.next();if (key.isAcceptable()){ServerSocketChannel serverChannel= (ServerSocketChannel) key.channel();SocketChannel socketChannel=serverChannel.accept();socketChannel.configureBlocking(false).register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,ByteBuffer.allocate(1024));}if (key.isReadable()){SocketChannel sc= (SocketChannel) key.channel();ByteBuffer buffer= (ByteBuffer) key.attachment();buffer.clear();int readCount= sc.read(buffer);if (readCount<0){iteratorKey.remove();continue;}buffer.flip();sc.write(buffer);System.out.print(new String(buffer.array(),0,readCount));}iteratorKey.remove();}}} catch (Exception e) {e.printStackTrace();}finally {System.out.println("exit");}}
}

现在计算机的核数越来越多,仅仅用一个核心来处理IO连接有点让费系统资源,因此我们可以多见几个Reactor  .其中住Reactor负责TCP的连接(Accept),连接之后分配到子Reactor来处理IO的读写事件。

并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作自始至终旨在一个线程处理。这样保证了同一个请求的所有状态和上下文在同一个线程中,方便监控请求相应状态。

具体代码实现 EchoService为例:

https://github.com/WJ1020/reactor

public class EchoService {private static final Logger logger= LoggerFactory.getLogger(EchoService.class);private final String ip;private final int port;public EchoService(String ip, int port) {this.ip = ip;this.port = port;}public void start(){logger.info("echo service start......");try {Selector selector=Selector.open();ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(ip,port)).configureBlocking(false).register(selector,SelectionKey.OP_ACCEPT);int coreNum = Runtime.getRuntime().availableProcessors();Processor[] processors = new Processor[coreNum];for (int i = 0; i < processors.length; i++) {logger.info("creat processor :{}",i+1);processors[i] = new Processor();}int index=0;while (Status.running){selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey selectionKey=iterator.next();iterator.remove();if (selectionKey.isAcceptable()){ServerSocketChannel currServerSocketChannel= (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel=currServerSocketChannel.accept();socketChannel.configureBlocking(false);logger.info("Accept request from {}",socketChannel.getRemoteAddress());Processor processor=processors[(++index)%coreNum];processor.addChannel(socketChannel);}}}} catch (IOException e) {logger.error("io exception {}",e.getMessage());}}}
public class Processor {private static final Logger logger= LoggerFactory.getLogger(Processor.class);private static final ExecutorService service=Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors());private final Selector selector;private volatile boolean running=true;public Processor() throws IOException {this.selector= SelectorProvider.provider().openSelector();}public void addChannel(SocketChannel socketChannel){try {socketChannel.register(this.selector, SelectionKey.OP_READ);if (running){running=false;start();}wakeup();} catch (ClosedChannelException e) {logger.error("register channel error :{}",e.getMessage());}}private void wakeup(){this.selector.wakeup();}private void start(){service.submit(new ProcessorTask(selector));}
}
public class ProcessorTask implements Runnable {private final static Logger logger= LoggerFactory.getLogger(ProcessorTask.class);private Selector selector;ProcessorTask(Selector selector) {this.selector = selector;}@Overridepublic void run() {logger.info("{}\tsub reactor start listener",Thread.currentThread().getName());while (Status.running){try {selector.select();Set<SelectionKey> keys=selector.selectedKeys();Iterator<SelectionKey> iterator=keys.iterator();while (iterator.hasNext()){SelectionKey key=iterator.next();iterator.remove();if (key.isReadable()){ByteBuffer buffer= ByteBuffer.allocate(1024);SocketChannel socketChannel= (SocketChannel) key.channel();int count=socketChannel.read(buffer);if (count<0){socketChannel.close();key.cancel();logger.info("{}\t Read ended",socketChannel);}else if (count==0){logger.info("{}\t Message size is 0",socketChannel);}else {buffer.flip();socketChannel.write(buffer);logger.info("{}\t Read message{}",socketChannel,new String(buffer.array()));}}}} catch (IOException e) {logger.error("select error :{}",e.getMessage());}}}
}

在EchoService中 ,主Reactor接受到新的连接后,将channel注册到subReactor的Selector中。每个子Reactor都有一个自己的Selector对象,并有独立的一个线程处理。

转载于:https://my.oschina.net/wang520/blog/3036562

高性能IO -Reactor模式的实现相关推荐

  1. 高性能IO设计中的Reactor模式与Proactor模式

    为什么80%的码农都做不了架构师?>>>    在高性能的IO设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proacto ...

  2. 高性能IO之Reactor模式

    讲到高性能IO绕不开Reactor模式,它是大多数IO相关组件如Netty.Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢? 最最原始的网络编程思路就是服务器用一个w ...

  3. 高性能实践IO之Reactor模式

    讲到高性能IO绕不开Reactor模式,它是大多数IO相关组件如Netty.Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢? 最最原始的网络编程思路就是服务器用一个w ...

  4. 高性能IO设计的Reactor和Proactor模式(转)

    在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作. 在比较这两个模式之前,我们首先的搞明白 ...

  5. 设计模式:高性能IO之Reactor模式

    讲到高性能IO绕不开Reactor模式,它是大多数IO相关组件如Netty.Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢? 最最原始的网络编程思路就是服务器用一个w ...

  6. 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )

    文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...

  7. 两种IO模式:Proactor与Reactor模式

    在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作. 在比较这两个模式之前,我们首先的搞明白 ...

  8. I/O模型系列之四:两种高性能IO设计模式 Reactor 和 Proactor

    不同的操作系统实现的io策略可能不一样,即使是同一个操作系统也可能存在多重io策略,常见如linux上的select,poll,epoll,面对这么多不同类型的io接口,这里需要一层抽象api来完成, ...

  9. 模型描述的关系模式_框架篇:见识一下linux高性能网络IO+Reactor模型

    前言 网络I/O,可以理解为网络上的数据流.通常我们会基于socket与远端建立一条TCP或者UDP通道,然后进行读写.单个socket时,使用一个线程即可高效处理:然而如果是10K个socket连接 ...

最新文章

  1. Silverlight 预定义颜色速查表
  2. Python循环中的变量作用域的灵异现象
  3. 计算机网络相关知识点
  4. xcode添加Cocos2d
  5. 搜索+回溯问题(DFS\BFS详解)
  6. 协同过滤推荐算法-----向量之间的相似度
  7. color是css3新增属性吗,CSS进阶篇--你用过css3的这个currentColor新属性吗?使用与兼容性...
  8. 记一次nodemanager无法启动的情况
  9. python可以替代java吗_Python 并非最好的编程语言,它无法取代 C/C++ 和 Java
  10. 关闭谷歌浏览器右下角的广告弹窗
  11. 无法定位程序输入点 getHostNameW 于动态链接库 WS2_32.dll
  12. java 最大的整数_java中最大的整数
  13. oracle 查询字符代码dump,字符集问题(Linux、oracle、终端等,导入导出数据)
  14. VNCTF2021 Ez_game
  15. 软件测试费用是多少,国内软件检测机构排名
  16. 正则表达式(一)认识正则表达式
  17. 如何解决 【eclipse】中注释时乱码的问题
  18. 你的Mac支持更新macOS Monterey吗
  19. IDEA插件Free Mybatis plugin
  20. 华为EC122在HiSi3110E上移植

热门文章

  1. c语言malloc函数用法_小白对c语言数组的基础总结
  2. 操作VR界面仅需眼神,Eyefluence眼控技术解放你的双手
  3. sublimeText3 工具
  4. 日志采集框架Flume以及Flume的安装部署(一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统)...
  5. 一次针对SAP系统的完美渗透测试
  6. PHP convet class to json data
  7. java计算代码执行时间
  8. rsync+inotify-tools实现文件的实时同步
  9. 第一个程序 - Windows程序设计(SDK)001
  10. 一款Octopress插件用于同步博客到其他站点