趁着三天假期,把Java NIO和Reactor模式整理总结了下,文章特别细节的知识点没有写,如一些API的具体实现。类似数据读到Buffer后再写出时,为什么需要复位操作,这些都属于NIO基础知识,是学习Reactor模式的前置条件。

1. 原始Ractor模式

相关组件的解释

Handle(句柄或是描述符):本质上表示一种资源,是操作系统提供的;该资源用于表示一个个事件,比如文件描述符,或者是针对于网络编程中的Socket描述符。事件既可以来自于外部,也可以来自内部;外部事件比如说客户端的连接请求,客户端发送过来数据等;内部事件比如说操纵系统产生的定时器事件等。它本质上就是一个文件描述符。Handle是事件产生的发源地。

Synchronous Event Demultiplexer(同步事件分离器):它本身是一个系统调用,用于等待事件的发生(事件可能是一个,也可能是多个)。调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止。对于Linux来说,同步事件分离器指的就是常用的I/O多路复用机制,比如说select、poll、epoll等。在Java NIO中,同步事件分离器对应的组件就是Selector;对应的阻塞方法就是select方法。

Event Handler(事件处理器) 本身由多个回调方法构成,这些回调构成了与应用相关的对于某个事件的反馈机制。Netty相比于Java NIO来说,在事件处理器这个角色上进行一个升级,它为我们开发者提供了大量的回调方法,供我们在待定事件产生时实现相应的回调方法进行业务逻辑的处理。

Concrete Event Handler(具体事件处理器):它本身实现了事件处理所提供的各个回调方法,从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。

Initiation Dispatcher(初始分发器):实际上就是Reactor角色。它本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等。Initiation Dispatcher会通过同步事件分离器来等待事件的发生,一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理事件。

执行流程分析

当应用像Initiation Dispatcher注册具体的事件处理器时,应用会标识出事件处理器希望Initiation Dispatcher在某个事件发生时向其通知该事件,该事件与Handle关联。

Initiation Dispatcher会要求每个事件向其传递内部的Handle。该Handle向操作系统标识了事件处理器。

当所有事件处理器注册完毕后,应用会调用handle_events方法来启动Initiation Dispatcher的事件循环。这时,Initiation Dispatcher会将每个注册的事件管理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。比如说,TCP协议层使用select同步事件分离器操作来等待客户端发送的数据到达连接的socker handle上。

当与某个事件源对应的Handle变为ready状态时(比如说,TCP socker变为等待读状态时),同步事件分离器就会通知Initiation Dispatcher。

Initiation Dispatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的Handle。Initiation Dispatcher会回调事件处理器的handle_events回调方法来执行特定于应用的功能(开发者自己所编写的功能),从而响应这个事件。所发生的事件类型可以作为该方法参数并被该方法内部使用来执行额外的特定于服务的功能。

以上描述的内容似乎和本文的标题不大,其实不然,它正是下面介绍的内容的开端。

2. 通过一个例子拉近与Java NIO的距离

/**

* @Author CoderJiA

* @Description NIOServer

* @Date 13/2/19 下午4:59

**/

public class NIOServer {

public static void main(String[] args) throws Exception{

// 1.创建ServerSocketChannel

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.configureBlocking(false);

ServerSocket serverSocket = serverSocketChannel.socket();

serverSocket.bind(new InetSocketAddress(8899));

// 2.创建Selector,并ServerSocketChannel注册OP_ACCEPT事件,接收连接。

Selector selector = Selector.open();

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 3.开启轮询

while (selector.select() > 0) {

// 从selector所有事件就绪的key,并遍历处理。

Set selectionKeys = selector.selectedKeys();

selectionKeys.forEach(selectionKey -> {

SocketChannel client;

try {

if (selectionKey.isAcceptable()) { // 接受事件就绪

// 获取serverSocketChannel

ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();

// 接收连接

client = server.accept();

client.configureBlocking(false);

client.register(selector, SelectionKey.OP_READ);

} else if (selectionKey.isReadable()) { // 读事件就绪

// 获取socketChannel

client = (SocketChannel) selectionKey.channel();

// 创建buffer,并将获取socketChannel中的数据读入到buffer中

ByteBuffer readBuf = ByteBuffer.allocate(1024);

int readCount = client.read(readBuf);

if (readCount <= 0) {

return;

}

Charset charset = Charset.forName(StandardCharsets.UTF_8.name());

readBuf.flip();

System.out.println(String.valueOf(charset.decode(readBuf).array()));

}

} catch (IOException e) {

e.printStackTrace();

}

selectionKeys.remove(selectionKey);

});

}

}

复制代码通过这个例子,与原始Reactor模式相对应的理解,比如同步事件分离器对应着Selector的select()方法,再比如ServerSocketChannel注册给Selector的OP_ACCEPT,还有SocketChannel的OP_READ与OP_WRITE,这些事件保存在操作系统上,其实就是原始Reactor中的Handle。

四个重要api

Channel:Connections to files,sockets etc that support non-blocking reads.

Buffer:Array-like objects that can be directly read or written by Channels.

Selector:Tell which of a set of Channels have IO events.

SelectionKeys:Maintain IO event status and bingdings.

3.用Java NIO对Reactor模式的应用。

3.1 Single threaded version

/**

* @Author CoderJiA

* @Description Reactor

* @Date 5/4/19 下午2:25

**/

public abstract class Reactor implements Runnable{

protected final Selector selector;

protected final ServerSocketChannel serverSocket;

protected final long port;

protected final long timeout;

public Reactor(int port, long timeout) throws IOException {

this.port = port;

this.timeout = timeout;

selector = Selector.open();

serverSocket = ServerSocketChannel.open();

serverSocket

.socket()

.bind(new InetSocketAddress(port));

serverSocket.configureBlocking(false);

SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

sk.attach(newAcceptor(selector));

}

@Override

public void run() {

try {

while (!Thread.interrupted()) {

if (selector.select(timeout) > 0) {

Set selected = selector.selectedKeys();

selected.forEach(sk -> {

dispatch(sk);

selected.remove(sk);

});

}

}

} catch (IOException e) {

e.printStackTrace();

}

}

private void dispatch(SelectionKey sk) {

Runnable r = (Runnable)(sk.attachment());

if (Objects.nonNull(r)) {

r.run();

}

}

public abstract Acceptor newAcceptor(Selector selector);

}

复制代码/**

* @Author CoderJiA

* @Description Acceptor

* @Date 5/4/19 下午2:58

**/

public class Acceptor implements Runnable {

private final Selector selector;

private final ServerSocketChannel serverSocket;

public Acceptor(Selector selector, ServerSocketChannel serverSocket) {

this.selector = selector;

this.serverSocket = serverSocket;

}

@Override

public void run() {

try {

SocketChannel socket = serverSocket.accept();

if (Objects.nonNull(socket)) {

new Handler(selector, socket);

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

复制代码/**

* @Author CoderJiA

* @Description Handler

* @Date 5/4/19 下午4:25

**/

public class Handler implements Runnable {

private static final int MB = 1024 * 1024;

protected final SocketChannel socket;

protected final SelectionKey sk;

protected final ByteBuffer input = ByteBuffer.allocate(MB);

protected final ByteBuffer output = ByteBuffer.allocate(MB);

private static final int READING = 0, SENDING = 1;

private int state = READING;

public Handler(Selector selector, SocketChannel socket) throws IOException {

this.socket = socket;

socket.configureBlocking(false);

sk = socket.register(selector, SelectionKey.OP_READ);

sk.attach(this);

}

@Override

public void run() {

try {

if (state == READING) read();

else if (state == SENDING) send();

} catch (Exception e) {

e.printStackTrace();

}

}

private void read() throws IOException {

socket.read(input);

if (inputIsComplete()) {

state = SENDING;

sk.interestOps(SelectionKey.OP_WRITE);

}

input.clear();

}

private void send() throws IOException {

socket.write(output);

if (outputIsComplete()) {

sk.cancel();

}

}

private boolean inputIsComplete() {

return input.position() > 0;

}

private boolean outputIsComplete() {

return !output.hasRemaining();

}

}

复制代码/**

* @Author CoderJiA

* @Description EchoReactor

* @Date 5/4/19 下午5:01

**/

public class EchoReactor extends Reactor {

private static final int PORT = 9999;

private static final long TIME_OUT = TimeUnit.MILLISECONDS.toMillis(10);

public EchoReactor(int port, long timeout) throws IOException {

super(port, timeout);

}

@Override

public Acceptor newAcceptor(Selector selector) {

return new Acceptor(selector, this.serverSocket);

}

public static void main(String[] args) throws IOException {

new EchoReactor(PORT, TIME_OUT).run();

}

}

复制代码

核心组件组件分析

Reactor等同于原始Reactor模式的Initiation Dispatcher,它负责所有就绪事件统一分发到事件处理器,如Acceptor和Hanlder。

Acceptor用于将接收到的SocketChannel交给Handler处理。

Handler处理读写操作。

这是Reactor的单线程版本,这个版本一个线程处理客户端的接收和数据处理以及读写操作,数据处理往往就是我们实际开发中的业务处理,是比较耗时的。如果一个处理过程处于阻塞,那么这个模型所表现出的就处于阻塞,所以一个数据处理的阻塞会导致不能处理客户端连接的接收。因此衍生出来下面的多工作线程版本来优化Handler。

3.2 Worker Threads version

调整下Handler

package cn.coderjia.nio.douglea.reactor2;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

/**

* @Author CoderJiA

* @Description Handler

* @Date 5/4/19 下午4:25

**/

public class Handler implements Runnable {

private static final int MB = 1024 * 1024;

protected final SocketChannel socket;

protected final SelectionKey sk;

protected final ByteBuffer input = ByteBuffer.allocate(MB);

protected final ByteBuffer output = ByteBuffer.allocate(MB);

private static final int READING = 0, SENDING = 1, PROCESSING = 3;

private int state = READING;

private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

public Handler(Selector selector, SocketChannel socket) throws IOException {

this.socket = socket;

socket.configureBlocking(false);

sk = socket.register(selector, SelectionKey.OP_READ);

sk.attach(this);

}

@Override

public void run() {

try {

if (state == READING) read();

else if (state == SENDING) send();

} catch (Exception e) {

e.printStackTrace();

}

}

private void read() throws IOException {

socket.read(input);

if (inputIsComplete()) {

state = PROCESSING;

EXECUTOR_SERVICE.execute(new Processer());

}

input.clear();

}

private void send() throws IOException {

socket.write(output);

if (outputIsComplete()) {

sk.cancel();

}

}

private void process() {

System.out.println("Handler.process()...");

}

private boolean inputIsComplete() {

return input.position() > 0;

}

private boolean outputIsComplete() {

return !output.hasRemaining();

}

class Processer implements Runnable {

public void run() {

processAndHandOff();

}

}

synchronized void processAndHandOff() {

process();

state = SENDING;

sk.interestOps(SelectionKey.OP_WRITE);

}

}

复制代码Handler多工作线程版本将耗时的process(),创建线程去处理。这个版本Reactor既负责客户端的接收事件,又负责读写事件,因为对于高并发场景连接数巨大,Reactor可能有时候会力不从心。因此衍生出下面的主从Reactor模型。

3.3 Multiple Reactors Version

调整Acceptor

/**

* @Author CoderJiA

* @Description Acceptor3

* @Date 6/4/19 下午6:51

**/

public class Acceptor3 implements Runnable {

private final ServerSocketChannel serverSocket;

public Acceptor3(ServerSocketChannel serverSocket) {

this.serverSocket = serverSocket;

}

@Override

public void run() {

try {

SocketChannel socket = serverSocket.accept();

if (Objects.nonNull(socket)) {

new Handler(EchoReactor.nextSubReactor().selector, socket);

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

复制代码

调整Reactor

/**

* @Author CoderJiA

* @Description Reactor3

* @Date 6/4/19 下午6:51

**/

public abstract class Reactor3 implements Runnable {

protected Selector selector;

protected ServerSocketChannel serverSocket;

protected final int port;

protected final long timeout;

protected final boolean isMainReactor;

public Reactor3(int port, long timeout, boolean isMainReactor) {

this.port = port;

this.timeout = timeout;

this.isMainReactor = isMainReactor;

}

@Override

public void run() {

try {

init();

while (!Thread.interrupted()) {

if (selector.select(timeout) > 0) {

System.out.println("isMainReactor:" + isMainReactor);

Set selected = selector.selectedKeys();

selected.forEach(sk -> {

dispatch(sk);

selected.remove(sk);

});

selected.clear();

}

}

} catch (IOException e) {

e.printStackTrace();

}

}

private void init() throws IOException {

selector = Selector.open();

if (isMainReactor) {

serverSocket = ServerSocketChannel.open();

serverSocket

.socket()

.bind(new InetSocketAddress(port));

serverSocket.configureBlocking(false);

SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

sk.attach(newAcceptor());

}

}

private void dispatch(SelectionKey sk) {

Runnable r = (Runnable)(sk.attachment());

if (Objects.nonNull(r)) {

r.run();

}

}

public abstract Acceptor3 newAcceptor();

}

复制代码/**

* @Author CoderJiA

* @Description EchoReactor

* @Date 6/4/19 下午5:35

**/

public class EchoReactor extends Reactor3 {

private static final int PORT = 9999;

private static final long TIME_OUT = TimeUnit.MILLISECONDS.toMillis(10);

private static final int SUB_REACTORS_SIZE = 2;

private static final Reactor3[] SUB_REACTORS = new Reactor3[SUB_REACTORS_SIZE];

private static final AtomicInteger NEXT_INDEX = new AtomicInteger(0);

static {

// 初始化子Reactor

IntStream.range(0, SUB_REACTORS_SIZE).forEach(i -> SUB_REACTORS[i] = new EchoReactor(PORT, TIME_OUT, false));

}

public static Reactor3 nextSubReactor(){

int curIdx = NEXT_INDEX.getAndIncrement();

if(curIdx >= SUB_REACTORS_SIZE){

NEXT_INDEX.set(0);

curIdx = 0;

}

return SUB_REACTORS[(curIdx % SUB_REACTORS_SIZE)];

}

public EchoReactor(int port, long timeout, boolean isMainReactor) {

super(port, timeout, isMainReactor);

}

@Override

public Acceptor3 newAcceptor() {

return new Acceptor3(this.serverSocket);

}

public static void main(String[] args) {

Reactor3 mainReactor = new EchoReactor(PORT, TIME_OUT, true);

// 启动主Reactor

new Thread(mainReactor).start();

// 启动子Reactor

IntStream.range(0, SUB_REACTORS_SIZE).forEach(i -> new Thread(SUB_REACTORS[i]).start());

}

}

复制代码主从Reactor模型,主Reactor用于处理客户端连接的接收转发给Acceptor处理,子Reactor处理读写事件的接收转发给Handler处理。

参考文章

Scalable IO in Java

源码地址

java nio doug_深入的聊聊 Java NIO相关推荐

  1. java整段标记_聊聊JAVA GC系列(7) - 标记整理算法

    在介绍"平平无奇"的标记清除算法时, 还留下了另一个问题, 就是内存碎片的问题. 内存碎片的问题是指, 每次回收的内存都是比较分散的, 可以加起来是一个比较大的数值, 但是由于可用 ...

  2. java中字节码_聊聊Java的字节码

    本文为作者原创,转载请注明出处(http://www.cnblogs.com/mar-q/)by 负赑屃 巴山楚水凄凉地,二十三年弃置身. 怀旧空吟闻笛赋,到乡翻似烂柯人. 沉舟侧畔千帆过,病树前头万 ...

  3. java gc机制 优点_聊聊Java的GC机制

    作者 某人Valar 如需转载请保留原文链接 部分图片来自百度,如有侵权请联系删除 本文目录 什么是GC JVM内存结构简单介绍 可达性分析与GC Roots 常见的垃圾收集算法 1. 什么是GC G ...

  4. java简单对称加密_聊聊java中的对称加密机制

    对数据的加密算是一个老生常谈的话题了,加密的方式很多,java也为这些加密算法提供了支持.今天就来聊聊对称加密算法的java实现方式.本文对加密算法不了解也没关系,我会从0开始讲解,保证小白也能看懂. ...

  5. java线程深入_深入聊聊Java多线程

    一.背景 在没有学习Java多线程以前,总觉得多线程是个很神秘的东西,只有那些大神才能驾驭,新年假期没事就来学习和了解一下Java的多线程,本篇博客我们就来从头说一下多线程到底是怎么回事. 二.概述 ...

  6. java string hash变量_聊聊 Java 中 HashMap 初始化的另一种方式

    如果你接触过不同的语言,从语法和代码层面来说,Java 是一种不折不扣的"臃肿.啰嗦"的语言,从另一方面来说这种臃肿和啰嗦也体现了它严谨的一面,作为适合构建大型.复杂项目的理由之一 ...

  7. java nio使用_什么时候使用NIO?

    一.前言 学习了Java IO 和 NIO之后,肯定会问:我们到底什么时候该使用 IO,什么时候该使用 NIO? 在下文中我会尝试用例子阐述java NIO 和IO的区别,以及它们对你的设计会有什么影 ...

  8. Java传统的io和nio区别_Java中IO和NIO的本质和区别

    简介 终于要写到java中最最让人激动的部分了IO和NIO.IO的全称是input output,是java程序跟外部世界交流的桥梁,IO指的是java.io包中的所有类,他们是从java1.0开始就 ...

  9. Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

    本文介绍了Java中的四种I/O模型,同步阻塞,同步非阻塞,多路复用,异步阻塞.同时将NIO和BIO进行了对比,并详细分析了基于NIO的Reactor模式,包括经典单线程模型以及多线程模式和多Reac ...

最新文章

  1. jdbc封装mysql_实用JDBC数据库查询封装
  2. java mysql failover_MySQL MMM 双主在Failover时挂起
  3. qt中的数据库可以创建在主函数中吗_在qt中怎么建立数据库
  4. python 拓扑排序 dfs bfs_拓扑排序的DFS和BFS
  5. 回退进度_【蜕变】V7账号发展进度第47期:回归宝箱开个都是啥呀!瞬间无爱了...
  6. 【Python之路Day17】Python Web框架之 Django
  7. React版本更新及升级须知(持续更新)
  8. 大规模微服务利器:eBPF + Kubernetes
  9. 1、mybatis是什么?为什么要用mybatis?
  10. Linux 普通用户拿到root权限及使用szrz命令上传下载文件
  11. 出于一些原因的考虑,即日起,一步一步SharePoint 2007系列文章将暂停发布
  12. 使用XMLHttpRequest发送POST数据
  13. Atitit mysql 存储kv 以及php js接口目录kv_mysql.js 1Set.php 1Get.php 2CREATE TABLE `cfg` ( `k`
  14. 2节串联锂电池充电管理IC芯片,5V,12V升降压解决方案
  15. BLEU——机器翻译评测
  16. Python eval() 函数看这里就够了
  17. 2020年12月-第02阶段-前端基础-CSS Day07
  18. 财经365零基础学投资:用江恩展望下半年股市
  19. JAVA通过poi实现excel表格制作并且将图片放入到指定的单元格中(可以循环插入)
  20. 云服务器、VPS、虚拟主机三者之间的区别?

热门文章

  1. Liunx之nginx代理
  2. 路飞学城Python-Day171
  3. 搭建Hexo博客(一)-创建Hexo环境
  4. 数据库设计-规范化规则
  5. C Builder中如何利用消息
  6. SQLite的使用(二):数据增删改查
  7. iOS 的 XMPPFramework 简介
  8. MySQL 授权远程登录(Ubuntu 环境)
  9. source insight常用命令--实际使用中比较常用的
  10. poj 1015(dp)