Selector的使用

文章目录

  • Selector的使用
    • 一、阻塞 & 非阻塞
      • 1. 阻塞
      • 2. 非阻塞
    • 二、selector 介绍及常用API
      • 1. 多路复用
      • 2. 常用API
    • 三、处理 accept 事件
    • 四、处理 read 事件
      • 1. 为什么事件必须删除
      • 2. 处理客户端断开问题
        • 2.1 客户端强制断开
        • 2.2 客户端正常断开
      • 3. 处理消息边界问题
    • 五、selector 何时不阻塞
    • 六、使用多线程优化

一、阻塞 & 非阻塞

通过多个客户端向服务器发送数据的例子理解阻塞和非阻塞。

1. 阻塞

服务器端:

// 1. 定义ByteBuffer,存放客户端发来的数据
ByteBuffer buffer = ByteBuffer.allocate(16);// 2. 创建服务器channel
ServerSocketChannel ssc = ServerSocketChannel.open();// 3. 服务器绑定监听端口
ssc.bind(new InetSocketAddress(8080));// 4. 客户端channel的连接集合
List<SocketChannel> channels = new ArrayList<>();//服务器不停的接收客户端的请求
while (true) {// 5. 服务器调用accept建立与客户端的连接// 如果客户端发起了连接请求,且服务器端调用了accept方法,则双方会通过三次握手建立连接// SocketChannel用来与客户端之间通信SocketChannel sc = ssc.accept(); // 阻塞方法,如果没有客户端发来请求,线程停止运行,客户端连接建立之后,继续运行channels.add(sc); //将客户端channel添加到channel集合中for (SocketChannel channel : channels) {// 6. 接收客户端发送的数据//读取的数据存放在bytebuffer中channel.read(buffer); // 阻塞方法,如果客户端仅连接,但是没有发送数据,则阻塞,客户端发送数据之后,线程继续运行}
}

客户端:

SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080)); //建立连接
sc.write(StandardCharsets.UTF_8.encode("你好");) //发送数据

如果某个客户端成功向服务器发送了信息,这个客户端再次发送数据会失败,因为服务器会等待客户端连接,连接之后的客户端再次发送数据并没有建立新连接,所以发送失败。也就是说,如果想要接收新的数据,必须建立新的连接。

缺点:

单线程模式下,一个方法的执行会影响另外一个方法,比如上述,一个服务器线程处理多个客户端连接,等待连接时无法读取另一个客户端的数据,读取数据时无法连接另一个客户端,一个客户端处理完毕后才可以处理另一个客户端。

2. 非阻塞

服务器端:

所有的 channel 都要设置为非阻塞模式。

ByteBuffer buffer = ByteBuffer.allocate(16);ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 切换成非阻塞模式,默认为阻塞模式ssc.bind(new InetSocketAddress(8080));List<SocketChannel> channels = new ArrayList<>();//持续等待客户端的连接
while (true) {SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,则sc是null(如果没有连接,会不断的生成null)//客户端有连接if (sc != null) {sc.configureBlocking(false); // 设置为非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {int read = channel.read(buffer);// 非阻塞,如果没有读到数据,线程仍然会继续运行,read值为0}
}

即使连接成功一个客户端,服务器还是会不停的尝试连接(生成null),连接好的客户端可以执行读取数据的操作,也就是说,读取和连接两个操作互不影响。

缺点:

即使没有连接建立和可读数据,线程仍然在不断运行,白白浪费了 cpu。

二、selector 介绍及常用API

1. 多路复用

一个线程配合 selector 就可以监控多个 channel 的事件,事件发生时线程才去处理,事件没有发生则线程处于阻塞状态(多路复用),避免非阻塞模式下CPU空转的问题。

好处:

  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换的次数

2. 常用API

创建selector

//调用静态方法
Selector selector = Selector.open();

绑定channel事件

建立 selector 与 channel 之间的联系,也称之为注册事件,只有绑定之后的事件 selector 才会关心 。

channel.configureBlocking(false); //设置channel为非阻塞模式//将channel注册到selector上,并且关注某一事件类型
SelectionKey key = channel.register(selector, 绑定事件类型);
//返回值表示事件发生后,通过SelectionKey可以得知是什么事件,并且得知是哪个channel发生的事件
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用,一般用于网络编程,而不是文件编程
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 客户端发起连接请求时触发
    • read - 客户端数据可读入时触发,存在因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出至客户端时触发,存在因为发送能力弱,数据暂不能写出的情况

监听channel事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件:

方法1,阻塞直到绑定事件发生

int count = selector.select();
//没有事件发生,就会让线程阻塞,事件发生了才会让线程继续向下运行
//解决了cpu空转的问题

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件发生

int count = selector.selectNow();

三、处理 accept 事件

客户端代码(对以下各种事件都通用):

public class Client {public static void main(String[] args) {try (Socket socket = new Socket("localhost", 8080)) {//客户端发送的数据socket.getOutputStream().write("world".getBytes()); } catch (IOException e) {e.printStackTrace();}}
}

服务器端代码:

@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));//创建SelectorSelector selector = Selector.open();channel.configureBlocking(false);//注册事件SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {//阻塞直到事件发生int count = selector.select();// 获取所有可用的事件,set集合Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理(使用迭代器)Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {//得到某一个SelectionKeySelectionKey key = iter.next();// 判断事件类型if (key.isAcceptable()) {//获取发生这个事件的channel(强转,默认的返回类型为SelectableChannel)ServerSocketChannel c = (ServerSocketChannel) key.channel();// 与客户端建立连接SocketChannel sc = c.accept();// 获取事件以后必须处理,如果没有处理,selector认为有事件没有处理,就不会阻塞,cpu持续被占用//如果不想处理这个事件,可以调用key.cancel方法,取消事件后,会自动的将事件移除,selector依然会阻塞// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}}
}

四、处理 read 事件

服务器端代码:

@Slf4j
public class ChannelDemo6 {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {channel.bind(new InetSocketAddress(8080));Selector selector = Selector.open();channel.configureBlocking(false);SelectionKey sscKey = channel.register(selector, SelectionKey.OP_ACCEPT);while (true) {int count = selector.select();// 获取所有事件Set<SelectionKey> keys = selector.selectedKeys();// 遍历所有事件,逐一处理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 判断事件类型//accept事件if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必须处理SocketChannel sc = c.accept();sc.configureBlocking(false); //连接建立之后,必须设置为非阻塞模式//将所要管理的channel注册到selector上SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);//可读事件} else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();//读取channel中的数据,写入到buffer中ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);//如果客户端正常断开,或者读取完毕,将key删除if(read == -1) {key.cancel();} else {buffer.flip();//读取buffer中的数据}}// 处理完毕,必须将事件移除iter.remove();}}} catch (IOException e) {e.printStackTrace();}}
}

1. 为什么事件必须删除

因为 selector 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要自己编码删除。比如上述代码:

  • 第一次触发了 accept 事件,假如没有移除事件,还存在于 selectedKeys 集合
  • 第二次触发了 read 事件,但这时 selectedKeys 中还有上次的 accept 事件 ,在遍历时,首次遍历到的元素依然是上次的 accept 事件,进入 if 分支,但此时已经建立了连接,所以 accept 方法返回的是 null,继续向下执行会出现空指针异常

2. 处理客户端断开问题

2.1 客户端强制断开

客户端强制断开后,如果服务器正在从中读取数据,服务器会抛出 IOException,如果不处理异常,会导致服务器停止运行,此时应该捕获异常,处理异常时将此 key 删除:

2.2 客户端正常断开

连接通道调用 close() 方法,会让客户端正常断开,调用 read() 读取数据时,返回值为-1,所以应该判断,如果值为 -1,将 key 删除。

3. 处理消息边界问题

如果给buffer申请的空间较小,比如4个字节,客户端发送 “中国” 两个汉字,共占用6个字节(utf-8),需要读取两次才可将消息读入,第一次读取4个字节,获取的是 “中” 以及 “国” 的前三分之一,第二次获取 “国” 的后三分之二,“国” 字会被截断(黏包、半包问题),出现乱码,如下:

三种情况:

  • 时刻1表示消息的长度大于buffer的大小,buffer需扩容
  • 时刻2对应半包现象
  • 时刻3对应黏包现象

三种处理方式:

  • 一种思路是客户端和服务器约定消息长度,所有数据包大小一样,服务器按预定长度读取,客户端的消息也按照预定长度发送,缺点是浪费带宽(不够的长度会填充至约定的长度)。
  • 另一种思路是按分隔符拆分,服务器根据分隔符来确定一条完整的消息,缺点是效率低,在服务器端需要一个字节一个字节对比。
  • 第三种思路是数据使用 LTV 格式,即 Length 长度、Type 类型、Value 数据。发送的数据分成三部分,第一部分表示此数据的长度,第二部分是数据的类型,第三部分是实际的数据,服务器根据第一部分在长度已知的情况下,就可以很方便的确定消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量。
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

第二种思路的使用:

假设服务器设置的buffer大小为16个字节,客户端发送的数据为20个字节,则第一次读事件只能传输16个字节,所以需要对buffer进行扩容,扩容之后,第二次读事件把剩余的消息读入,生成一个完整的消息,如下图:

服务器端代码:

private static void split(ByteBuffer source) {//切换到读模式source.flip();for (int i = 0; i < source.limit(); i++) {// 找到一条完整消息if (source.get(i) == '\n') {int length = i + 1 - source.position();// 把这条完整消息存入新的ByteBufferByteBuffer target = ByteBuffer.allocate(length);// 从source读,向target写for (int j = 0; j < length; j++) {target.put(source.get());}}}//只有找到分隔符,才会读取,如果没有找到分割符,则不会读取,也就是压缩之后,position位于limit处,都是16source.compact();
}public static void main(String[] args) throws IOException {Selector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//关注accept事件SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while (true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 如果是accept事件if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); //将channel数据通道注册到selector上//第三个参数表示将一个 byteBuffer 作为附件关联到 selectionKey 上//表示这个buffer只能由这个channel使用,此buffer与SelectionKey的生命周期一致SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ, buffer);// 如果是read事件} else if (key.isReadable()) { try {// 拿到触发事件的channelSocketChannel channel = (SocketChannel) key.channel(); // 获取 selectionKey 上关联的附件(buffer)ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常断开,read方法返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 如果position和limit相等,则表示不是一条完整的消息,需要扩容if (buffer.position() == buffer.limit()) {//设定扩容机制为2倍ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); //将旧buffer内容写入到新扩容后的buffer中buffer.flip();newBuffer.put(buffer); //将新的buffer作为selectionKey的附加条件,替换旧的bufferkey.attach(newBuffer);}}} catch (IOException e) {e.printStackTrace();key.cancel();  }}//移除事件iter.remove();}}
}

五、selector 何时不阻塞

  • 事件发生时

    • 客户端发起连接请求时,会触发 accept 事件
    • 客户端发送数据、客户端正常 / 异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次 read 事件
    • channel 可写时,会触发 write 事件
    • 在 Linux 下发生 nio bug
  • 调用 selector.wakeup()

  • 调用 selector.close()

  • selector 所在线程 interrupt

六、使用多线程优化

上述过程只有一个 selector,没有充分利用多核 cpu,现利用多线程进行优化。

分两组 selector:

  • 单线程配一个 selector,专门处理 accept 事件(起名为boss)
  • 创建 CPU 核心数的线程,每个线程配一个 selector,轮流处理 read/write 事件(起名为worker)

服务器端代码:

/*** 流程:* 1. 客户端发起连接,触发accept事件,在accept事件中给worker绑定read事件* 2. 需要保证绑定read事件在worker调用select方法之前,阻塞之后必须得发生一个别的事件才能绑定,耗费时间
*/public class MultiThreadServer {//worker用来处理读写事件static class Worker implements Runnable{private Thread thread; // 不同的worker有不同的线程private Selector selector; // 不同的worker有不同的selectorprivate String name; // 不同的worker有不同的名字private volatile boolean start = false; // worker对应的thread和selector是否已经初始化//队列用来保证绑定read事件在worker调用select方法之前//并发队列用来在多个线程之间传递数据private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();// 构造器,定义worker的名字public Worker(String name) {this.name = name;}// 初始化Thread和selectorpublic void register(SocketChannel sc) throws IOException {//如果已经初始化过了,就不需要再初始化//要保证worker对应一个thread、一个selector,不能每次调用register方法都创建新的if(!start) {selector = Selector.open();thread = new Thread(this, name); // 一个线程对应一个workerthread.start();start = true;}//将绑定操作添加到队列中,此时还没有执行queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null); //绑定read事件} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup();//main线程唤醒selector,防止worker线程中selector阻塞导致read事件无法绑定}//worker的职责,监听读写事件@Overridepublic void run() {while(true) {try {selector.select();//从队列中取出绑定操作并执行Runnable task = queue.poll();if (task != null) {task.run(); //执行队列中的任务,即绑定操作}/*** 这样做的原因:* main线程调用worker的register方法绑定read事件,* 但是select方法的执行不在main线程中,而在worker自己的线程中,* 由于是两个线程,所以并不能保证先后顺序,* 所以利用队列,将绑定read事件和worker调用select方法放在一个线程中,保证先后顺序,*/Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();channel.read(buffer);}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}//main线程当作仅处理accept事件的线程public static void main(String[] args) throws IOException {//boss线程(main线程)专门用来监听accept事件Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();//绑定accept事件SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));//创建cpu核心数的worker并初始化Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}//用来保证多个客户端平均的连接到worker上AtomicInteger index = new AtomicInteger();while(true) {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);//在accept事件中给worker绑定read事件,调用worker的register方法绑定//保证平均的访问到workerworkers[index.getAndIncrement() % workers.length].register(sc);}}}}
}

NIO多路复用之Selector的使用相关推荐

  1. epoll 浅析以及 nio 中的 Selector

    转载自 epoll 浅析以及 nio 中的 Selector 首先介绍下epoll的基本原理,网上有很多版本,这里选择一个个人觉得相对清晰的讲解(详情见reference): 首先我们来定义流的概念, ...

  2. epoll浅析以及nio中的Selector

    出处: https://my.oschina.net/hosee/blog/730598 首先介绍下epoll的基本原理,网上有很多版本,这里选择一个个人觉得相对清晰的讲解(详情见reference) ...

  3. nio学习之Selector选择器

    nio学习之Selector选择器 Selector选择器 三个相关的类 如何创建选择器 SelectionKey选择键相关的方法 选择器的使用 服务器端模板代码 selector.select()方 ...

  4. java nio多路复用_Java NIO系列教程(六) 多路复用器Selector

    多路复用器Selector是Java NIO编程的基础,熟练地掌握Selector对于掌握NIO编程至关重要.多路复用器提供选择已经就绪的任务的能力.简单来讲,Selector会不断地轮询注册在其上的 ...

  5. epoll nio区别_【总结】两种 NIO 实现:Selector 与 Epoll

    我想用这个话题小结下最近这一阶段的各种测试和开发.其实文章的内容主要还是想总结一下 NIO Socket ,以及两种不同操作系统实现 NIO 的方式, selector 和 epoll . 问题应该从 ...

  6. java nio 多路复用_8分钟深入浅出搞懂BIO、NIO、AIO

    在高性能的IO体系设计中,BIO.NIO.AIO的概念,常常会让我们感到困惑不解.在Java面试中,我们也经常会被问到这个问题.譬如: BIO.NIO.AIO 的概念 同步/异步.阻塞/非阻塞的区别 ...

  7. java nio attachment_7. 彤哥说netty系列之Java NIO核心组件之Selector

    --日拱一卒,不期而至! 你好,我是彤哥,本篇是netty系列的第七篇. 简介 上一章我们一起学习了Java NIO的核心组件Buffer,它通常跟Channel一起使用,但是它们在网络IO中又该如何 ...

  8. netty:NIO模型--选择器(Selector)

    1. java的NIO,用非阻塞的IO方式,可以用一个线程,处理多个的客户连接,就会使用到Selector(选择器). 2. Selector能够检测多个注册的通道上是否有事件发生(注意: 多个Cha ...

  9. 计算机IO系列(二)BIO/NIO/多路复用实现

    一.什么是IO? 我们都知道Liux世界里.一切皆文件.而文件是什么呢?文件就是一串二进制流而已.不管socket.还是FIFO.管道.终端.对我们来说.一切都是文件.一切都是流.在信息交换的过程中. ...

最新文章

  1. 图像合成与风格转换实战
  2. springboot 简单自定义starter - beetl
  3. 关于NSString,NSMutableString,NSArray,NSMutableArray,NSDictionary,NSMutableDictionary
  4. C#学习小记14求助一道让我头疼的C#小题
  5. XidianOJ 1019 自然数的秘密
  6. 优秀!33岁博士,拟作为县长人选!
  7. Windows 软件安全---注入安全
  8. 企业发展如何启动云的力量
  9. C# WPF ASP.net 上传多文件和数据
  10. 深入了解字符集和编码
  11. 设计模式之十三:适配器模式(Adapter)
  12. adb小技巧之实现近似vim编辑器功能编辑android系统内部的文本文件
  13. 苹果软件测试的电池损耗准确,苹果官方维修如何查看苹果iPhone电池损耗和寿命...
  14. 方正飞鸿:构建中小企业“两化融合”新模式
  15. 什么是模拟信号?数字信号?区别是什么?它们又是如何完成转换的?
  16. 多项式与快速傅立叶变换
  17. 解读UDS协议中NRC以及NRC优先级
  18. 学习笔记-使用python进行数据分析
  19. QQ2009卸载程序QQ2009Preview.msi文件
  20. C++常见十六进制数组转换char数组方法

热门文章

  1. AI会改变什么?不会改变什么?ChatGPT之父对人工智能未来的判断
  2. 表白/生日浪漫樱花HTML礼物
  3. 录像音视频同步原理分析及PTS计算公式
  4. 【Leetcode】122. Best Time to Buy and Sell Stock II买卖股票的最佳时机 II
  5. MAC快捷键---5
  6. 2020php木马文件,绕过waf的另类木马文件攻击方法
  7. iPhone 10.X 越狱后,抹除设备导致Cydia所有源无法安装使用彻底解决方案及遇到的问题
  8. 动态规划——最长回文子序列
  9. 如何用cmd命令加密文件夹
  10. 关于as608指纹模块的学习心得