前面的代码在服务器端都是单线程配合一个 Selector 选择器来管理多个 channel 上的事件

这样有两个缺点

  1. 现在都是多核CPU,一个线程只能用一个核心,其他的核心会白白浪费
  2. 单线程是可以处理多个事件,但是如果某个事件耗时较长,就会造成其他事件的等待

如果让你设计一种较为合理的架构,你会怎样设计呢?

首先一点要把多核CPU 充分利用起来,第二就是每个线程对应自己的职责,例如,店小二负责接待,厨师负责炒菜,服务员负责记录菜单

代码实现

服务器

package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;/*** @Auther: HackerZhao* @Date: 2021/11/2 12:05* @Description: 多线程管理多个 channel*/@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {// 设置main 名称为 BossThread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));// 1. 创建固定数量的 work 并初始化Work work = new Work("work0");work.register();while (true) {// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}",sc.getRemoteAddress());// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selectorlog.debug("before register...{}",sc.getRemoteAddress());sc.register(work.selector,SelectionKey.OP_READ,null);log.debug("after register...{}",sc.getRemoteAddress());}}}}static class Work implements Runnable {private Thread thread;   // 独立的线程private Selector selector;  // 独立的监听器private String name; // work  的名字private volatile boolean start; // 刚开始线程还未初始化public Work(String name) {this.name = name;}// 初始化 Thread 和 Selectorpublic void register() throws IOException {if (!start) {thread = new Thread(this, name);  // 创建一个新线程,就是当前类thread.start();selector = Selector.open();start = true;}}@Overridepublic void run() {while (true){try {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();iter.remove();if (key.isReadable()){ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();// 读取到了 channel 中的数据log.debug("read...{}",channel.getRemoteAddress());channel.read(buffer);// 切换至读模式buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}

客户端

package com.zhao.c1;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;/*** @Auther: HackerZhao* @Date: 2021/11/2 13:17* @Description: 客户端*/
public class TestClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8080));sc.write(Charset.defaultCharset().encode("1234567890abcdef"));System.in.read();}}

一切看似风平浪静,但是客户端已经向服务器发送了数据,服务器却没有打印,只显示了连接

原因是线程的先后顺序不一致导致的

注册事件

阻塞事件

如果先发生注册事件,再发生阻塞事件,客户端此时发送消息过来,run 方法中是不会再发生阻塞,会把这个事件处理完毕。

如果先发生阻塞事件,再发生注册事件,客户端此时发送消息过来,你的 read 事件还没注册到 Selector 上面,selector.select()会认为你现在没有事件要处理,会一直阻塞。

这个解决起来稍微有一点复杂,因为你想让注册事件发生在阻塞事件前面,阻塞事件是在另一个线程里边,所以得想办法把注册事件也放在这个线程里边,并且先于阻塞事件发生。

这可以通过一个队列来解决,先把我要做的事情存放到一个消息队列里边,执行另一个线程的 run 方法之前,先执行我消息队列的事件,这样一来就解决了 阻塞事件阻塞注册事件的问题。

package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;/*** @Auther: HackerZhao* @Date: 2021/11/2 12:05* @Description: 多线程管理多个 channel*/@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {// 设置main 名称为 BossThread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));// 1. 创建固定数量的 work 并初始化Work work = new Work("work0");while (true) {// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selectorlog.debug("before register...{}", sc.getRemoteAddress());work.register(sc);log.debug("after register...{}", sc.getRemoteAddress());}}}}static class Work implements Runnable {private Thread thread;   // 独立的线程private Selector selector;  // 独立的监听器private String name; // work  的名字private volatile boolean start; // 刚开始线程还未初始化private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();  // 两个线程通道public Work(String name) {this.name = name;}// 初始化 Thread 和 Selectorpublic void register(SocketChannel sc) throws IOException {if (!start) {thread = new Thread(this, name);  // 创建一个新线程,就是当前类thread.start();selector = Selector.open();start = true;}queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup();  // 通知阻塞放行}@Overridepublic void run() {while (true) {try {selector.select(); // work0,阻塞,使用 wakeup 唤醒Runnable task = queue.poll();if (task != null) {task.run();   // 执行了 sc.register(selector, SelectionKey.OP_READ, null);}Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();// 读取到了 channel 中的数据log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);// 切换至读模式buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}

设置多个 Worker,Worker 的个数根据 Cpu 的核数定的,可以用轮询做到

具体实现

创建一个 Workers 数组,容量为 CPU 的核数

创建一个原子 Integer 类

获得workers 数组中的 其中一个元素 = index.getAndIncrement() % works.length

package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;/*** @Auther: HackerZhao* @Date: 2021/11/2 12:05* @Description: 多线程管理多个 channel*/@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {// 设置main 名称为 BossThread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));Work[] works = new Work[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < works.length; i++) {works[i] = new Work("worker- "+i);}AtomicInteger index = new AtomicInteger();// 1. 创建固定数量的 work 并初始化while (true) {// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selectorlog.debug("before register...{}", sc.getRemoteAddress());// round robin 轮询works[index.getAndIncrement() % works.length].register(sc);log.debug("after register...{}", sc.getRemoteAddress());}}}}static class Work implements Runnable {private Thread thread;   // 独立的线程private Selector selector;  // 独立的监听器private String name; // work  的名字private volatile boolean start; // 刚开始线程还未初始化private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();  // 两个线程通道public Work(String name) {this.name = name;}// 初始化 Thread 和 Selectorpublic void register(SocketChannel sc) throws IOException {if (!start) {thread = new Thread(this, name);  // 创建一个新线程,就是当前类thread.start();selector = Selector.open();start = true;}queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup();  // 通知阻塞放行}@Overridepublic void run() {while (true) {try {selector.select(); // work0,阻塞,使用 wakeup 唤醒Runnable task = queue.poll();if (task != null) {task.run();   // 执行了 sc.register(selector, SelectionKey.OP_READ, null);}Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();// 读取到了 channel 中的数据log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);// 切换至读模式buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}

NIO vs BIO

stream vs channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)

  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用

  • 二者均为全双工,即读写可以同时进行

IO 模型

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞 

当一个应用程序调用了 read 方法引发的事件

阻塞IO

一旦等待数据处发生阻塞,这个线程就什么也做不了了,只能老老实实的等待数据。不光用户态要等,内核态也要等。就像是一个单道环,没有回头路。

张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,等一天或者等一年,张三都在等,一直等到货来了,买到了,才回去。

非阻塞IO

一旦用户态发现内核态没有数据,就会立刻返回。下一次 where(true) 循环的时候,再去查看有无数据,一旦有数据了,就不会立刻返回。而是复制数据并返回。

张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,张三一听转身就走,第二次,张三又去了,老板说有货,张三就买到货并返回。张三买货的期间是不能做其他事情的

多路复用

用户态调用 select 方法,查看内核是否有事件,没有事件则阻塞,有事件内核会通知用户态有新事件发生。用户线程就会调用 read 方法进行读取,需要复制数据,就会发生阻塞

张三去买一包 程序员安慰剂,但是他不清楚药店有没有货,就打电话给药店老板,老板说现在没有,等有了我告诉你。过了一两天,有货了,老板打电话给张三,让他来拿货。于是张三过去买了一包又回来了。

与阻塞IO不同的是,多路复用每次获取的是一批事件。

例如:张三需要吃饭,吃完饭后还要写代码,写完代码后又要去吃饭。这之中发生了等待时间就只能在这里等待了。例如,去吃饭饭店没菜了,厨子去买菜,饭什么时候可以做好,谁也说不清楚。写代码,停电了,什么时候来电,谁也不知道。再次期间就只能干等。多路复用则不同,没菜了?我不去,等你有菜了,来电了我再做事情,例如,饭店打电话说现在没菜了,那张三就会呆在家,哪里也不去,公司打电话说断电了,张三也哪里都不会去。直到饭店说买到菜了,公司说来电了,张三才会起身去吃饭,去公司上班。他获取的是一批事件,不会陷入某一个事件中等待。

通过上面的案例,可以得出一个结论,就是当用户线程去处理请求,说明此时已经有一个或多个请求要处理了,等待的时间已经在 select 处等待过了,一次性的处理多个事件。

同步:线程自己去获取结果(一个线程)

异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

阻塞IO :自己调用read 方法,自己接收请求。同步阻塞

阻塞IO :自己调用read 方法,自己接收请求。同步非阻塞

多路复用:自己调用read 方法,自己接收请求。同步

异步非阻塞:当前线程调用完 Thread1 后会向下继续运行,不会阻塞

张三想买一包 程序员安慰剂,但自己又很忙,于是他叫来自己的小弟李四去买,张三去上班了,李四买完药后,回到家,拿给张三。

package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;/*** @Auther: HackerZhao* @Date: 2021/11/3 15:01* @Description: 异步io*/@Slf4j
public class AioFileChannel {public static void main(String[] args) throws IOException {try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {// 参数1:ByteBuffer// 参数2:读取的起始位置// 参数3:附件// 参数4:回调对象ByteBuffer buffer = ByteBuffer.allocate(16);log.debug("read begin");channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {// read 成功调用此方法/*** 参数1: 实际读取的字节数* 参数2: buffer 对象*/@Overridepublic void completed(Integer result, ByteBuffer attachment) {log.debug("read completed");attachment.flip();ByteBufferUtil.debugAll(attachment);}// read 失败调用此方法@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});log.debug("read end");} catch (IOException e) {e.printStackTrace();}System.in.read();}}

主线程打印完 read begin 和 read end 后,Thread 1 才开始运行.

零拷贝

package com.zhao.c1;import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;public class ClientDemo {public static void main(String[] args) throws IOException {//创建客户端的Socket对象(Socket)//Socket(String host, int port) 创建流套接字并将其连接到指定主机上的指定端口号Socket socket = new Socket("192.168.1.66",10000);File f = new File("data.txt");RandomAccessFile file = new RandomAccessFile(f,"r");byte[] buf = new byte[(int)f.length()];file.read(buf);//获取输出流,写数据socket.getOutputStream().write(buf);//释放资源socket.close();}
}

以上程序用户态和内核态之间的转换如下图

程序调用read 方法后,会有用户态转换成内核态。方法调用结束会有内核态转换为用户态,返回结果。会发生 1,2步骤的内容

程序调用 write 方法,会有用户态转换为内核态,会发生 3,4步骤的内容

程序运行,会发生3次 用户态与内核态的转换,4次 数据复制

二次优化

在 Netty基础 NIO中有介绍过使用 ByteBuffer.allocateDirect(16),创建的 buffer 使用的是直接内存,会减少一次拷贝。

buffer使用的是操作系统中的内存,当磁盘想缓冲区写入时,byte 数组和内核缓冲区用的是同一块内存。减少了一次拷贝

三次优化

无论是使用原始读取还是使用直接内存的 ByteBuffer,都绕不开java程序,linux 2.1 后提供的 sendFile 方法 ,java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

现在只需要 一次用户态到内核态的切换,是在  transferTo 方法发生后,3次 数据复制

四次优化

linux 2.4

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

零拷贝,不需要在java 层面发生拷贝,拷贝只发生在网络与操作系统之间

优点和适用场景

  • 更少的用户态与内核态的切换

  • 不利用 cpu 计算,减少 cpu 缓存伪共享

  • 零拷贝适合小文件传输

多线程改进Selector相关推荐

  1. 【UDP通过多线程改进,在一个窗口中同时接收又发送】

    package com.yjf.esupplier.common.test;import java.net.DatagramSocket; import java.net.SocketExceptio ...

  2. Netty框架之多线程的Selector优化

    Netty框架之多线程的Selector优化 1.一个Boss线程,一个Woker线程实现服务器 2.一个Boss线程,多个Woker线程实现服务器 学习Selector之后,我们发现单个Select ...

  3. Java基础知识强化之网络编程笔记05:UDP之多线程实现聊天室案例

    1. 通过多线程改进刚才的聊天程序,这样我就可以实现在一个窗口发送和接收数据了 2.  代码示例: (1)SendThread.java,如下: 1 package com.himi.udpDemo2 ...

  4. 后端开发-Reactor设计模式

    在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,Proactor用于异步I/O操作. Reactor模式称之为响应器模式,通常用 ...

  5. Netty学习笔记二网络编程

    Netty学习笔记二 二. 网络编程 1. 阻塞模式 阻塞主要表现为: 连接时阻塞 读取数据时阻塞 缺点: 阻塞单线程在没有连接时会阻塞等待连接的到达,连接到了以后,要进行读取数据,如果没有数据,还要 ...

  6. 【Netty】第二章 网络编程和 IO 概念剖析

    [Netty]第二章 网络编程 文章目录 [Netty]第二章 网络编程 一.网络编程 1.模拟阻塞模式下服务器单线程处理请求 2.模拟非阻塞模式下服务器单线程处理请求 3.使用 Selector 改 ...

  7. 大牛实战归纳——Kafka架构原理

    作者:qq_41534566 https://blog.csdn.net/qq_41534566/article/details/81210496 对于kafka的架构原理我们先提出几个问题? 1.K ...

  8. python network_python network(非常好)

    关于网络编程以及socket 等一些概念和函数介绍就不再重复了,这里示例性用python 编写客户端和服务器端. 一.最简单的客户端流程: 1. Create a socket 2. Connect ...

  9. Python最佳代码实践:性能、内存和可用性!

    作者:Satwik Kansal,译者:Prodesire 英文原文:https://dwz.cn/r4N2hvht 译文:https://zhuanlan.zhihu.com/p/28675694 ...

最新文章

  1. Hutool,一个贼好用的 Java 工具类库,用过都说好~
  2. tensorflow 的输入层和输出层维度注意事项
  3. 盘点丨毕业年薪34万,高校人工智能研究哪家强?
  4. Java创建线程的3种方式
  5. 检测客户pc电脑端VC++环境并安装
  6. jvm 调优_Java架构—JVM调优
  7. Django项目部署到阿里云服务器上无法发送邮件STMP
  8. php 并发 100 压测,简单PHP把握站点并发数
  9. 傅里叶级数的数学推导
  10. android超级管理员权限作用,Android获取超级管理员权限的实现
  11. phpstrom php cli,在docker中的PhpStorm 2017.1远程php-cli:配置php.ini文件不存在
  12. php 页面上显示xls文档,phpExcel输出xls文档显示乱码的解决方法
  13. Eclipse 添加书签
  14. 在 可编辑的 Div 的 光标位置 插入 文字 或 HTML
  15. 天正双击墙体不能编辑_天正CAD绘图必须要知道的技巧
  16. TOPSIS(优劣解距离法)
  17. 计算机图形和ps的区别,PS CC和PS CC有什么区别?
  18. qq远程控制无法连接服务器,win10系统QQ远程协助,无法控制对方电脑,怎么办?...
  19. 全新织梦DEDE CMS模板-精仿qq技术导航网站源码
  20. VR乒乓球项目Unity3D 开发经验整理,4简单而有效的AI

热门文章

  1. 汽车UDS工具链实现自动化测试
  2. UE4从外部修改材质参数
  3. win7驱动备份_彻底解决WIN7宽带连接错误651问题的办法
  4. BUUCTF-小明的保险箱
  5. 海康威视控件覆盖dom元素问题下拉菜单被覆盖
  6. ISME:海洋氨氧化古菌的营养能量策略决定其生物地理学分布格局
  7. Java实现 LeetCode 516 最长回文子序列
  8. springboot+vue中小学生课外知识学习网站
  9. 招财进宝手势锁,Android手势密码的实现
  10. FX5U与MT8101IE通信设置