多线程改进Selector
前面的代码在服务器端都是单线程配合一个 Selector 选择器来管理多个 channel 上的事件
这样有两个缺点
- 现在都是多核CPU,一个线程只能用一个核心,其他的核心会白白浪费
- 单线程是可以处理多个事件,但是如果某个事件耗时较长,就会造成其他事件的等待
如果让你设计一种较为合理的架构,你会怎样设计呢?
首先一点要把多核CPU 充分利用起来,第二就是每个线程对应自己的职责,例如,店小二负责接待,厨师负责炒菜,服务员负责记录菜单
代码实现
服务器
package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;/*** @Auther: HackerZhao* @Date: 2021/11/2 12:05* @Description: 多线程管理多个 channel*/@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {// 设置main 名称为 BossThread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));// 1. 创建固定数量的 work 并初始化Work work = new Work("work0");work.register();while (true) {// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}",sc.getRemoteAddress());// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selectorlog.debug("before register...{}",sc.getRemoteAddress());sc.register(work.selector,SelectionKey.OP_READ,null);log.debug("after register...{}",sc.getRemoteAddress());}}}}static class Work implements Runnable {private Thread thread; // 独立的线程private Selector selector; // 独立的监听器private String name; // work 的名字private volatile boolean start; // 刚开始线程还未初始化public Work(String name) {this.name = name;}// 初始化 Thread 和 Selectorpublic void register() throws IOException {if (!start) {thread = new Thread(this, name); // 创建一个新线程,就是当前类thread.start();selector = Selector.open();start = true;}}@Overridepublic void run() {while (true){try {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()){SelectionKey key = iter.next();iter.remove();if (key.isReadable()){ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();// 读取到了 channel 中的数据log.debug("read...{}",channel.getRemoteAddress());channel.read(buffer);// 切换至读模式buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}
客户端
package com.zhao.c1;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;/*** @Auther: HackerZhao* @Date: 2021/11/2 13:17* @Description: 客户端*/
public class TestClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8080));sc.write(Charset.defaultCharset().encode("1234567890abcdef"));System.in.read();}}
一切看似风平浪静,但是客户端已经向服务器发送了数据,服务器却没有打印,只显示了连接
原因是线程的先后顺序不一致导致的
注册事件
阻塞事件
如果先发生注册事件,再发生阻塞事件,客户端此时发送消息过来,run 方法中是不会再发生阻塞,会把这个事件处理完毕。
如果先发生阻塞事件,再发生注册事件,客户端此时发送消息过来,你的 read 事件还没注册到 Selector 上面,selector.select()会认为你现在没有事件要处理,会一直阻塞。
这个解决起来稍微有一点复杂,因为你想让注册事件发生在阻塞事件前面,阻塞事件是在另一个线程里边,所以得想办法把注册事件也放在这个线程里边,并且先于阻塞事件发生。
这可以通过一个队列来解决,先把我要做的事情存放到一个消息队列里边,执行另一个线程的 run 方法之前,先执行我消息队列的事件,这样一来就解决了 阻塞事件阻塞注册事件的问题。
package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;/*** @Auther: HackerZhao* @Date: 2021/11/2 12:05* @Description: 多线程管理多个 channel*/@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {// 设置main 名称为 BossThread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));// 1. 创建固定数量的 work 并初始化Work work = new Work("work0");while (true) {// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selectorlog.debug("before register...{}", sc.getRemoteAddress());work.register(sc);log.debug("after register...{}", sc.getRemoteAddress());}}}}static class Work implements Runnable {private Thread thread; // 独立的线程private Selector selector; // 独立的监听器private String name; // work 的名字private volatile boolean start; // 刚开始线程还未初始化private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>(); // 两个线程通道public Work(String name) {this.name = name;}// 初始化 Thread 和 Selectorpublic void register(SocketChannel sc) throws IOException {if (!start) {thread = new Thread(this, name); // 创建一个新线程,就是当前类thread.start();selector = Selector.open();start = true;}queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup(); // 通知阻塞放行}@Overridepublic void run() {while (true) {try {selector.select(); // work0,阻塞,使用 wakeup 唤醒Runnable task = queue.poll();if (task != null) {task.run(); // 执行了 sc.register(selector, SelectionKey.OP_READ, null);}Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();// 读取到了 channel 中的数据log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);// 切换至读模式buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}
设置多个 Worker,Worker 的个数根据 Cpu 的核数定的,可以用轮询做到
具体实现
创建一个 Workers 数组,容量为 CPU 的核数
创建一个原子 Integer 类
获得workers 数组中的 其中一个元素 = index.getAndIncrement() % works.length
package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;/*** @Auther: HackerZhao* @Date: 2021/11/2 12:05* @Description: 多线程管理多个 channel*/@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {// 设置main 名称为 BossThread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));Work[] works = new Work[Runtime.getRuntime().availableProcessors()];for (int i = 0; i < works.length; i++) {works[i] = new Work("worker- "+i);}AtomicInteger index = new AtomicInteger();// 1. 创建固定数量的 work 并初始化while (true) {// 判断是否有事件发生,如果四种事件其中一种发生了,会向下运行。没有发生,则阻塞boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {// accept 建立与客户端连接,SocketChannel 用来与客户端之间通信(菜单)SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 如果 work 创建在连接里边,那么每创建一个连接就会创建一个work,太浪费资源了(work应该是独立的,它只负责自己的读写事件)// 2. 关联 selector,此时就不能关联 boss 这个 selector 了,应当关联对象 work 中的 selectorlog.debug("before register...{}", sc.getRemoteAddress());// round robin 轮询works[index.getAndIncrement() % works.length].register(sc);log.debug("after register...{}", sc.getRemoteAddress());}}}}static class Work implements Runnable {private Thread thread; // 独立的线程private Selector selector; // 独立的监听器private String name; // work 的名字private volatile boolean start; // 刚开始线程还未初始化private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>(); // 两个线程通道public Work(String name) {this.name = name;}// 初始化 Thread 和 Selectorpublic void register(SocketChannel sc) throws IOException {if (!start) {thread = new Thread(this, name); // 创建一个新线程,就是当前类thread.start();selector = Selector.open();start = true;}queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});selector.wakeup(); // 通知阻塞放行}@Overridepublic void run() {while (true) {try {selector.select(); // work0,阻塞,使用 wakeup 唤醒Runnable task = queue.poll();if (task != null) {task.run(); // 执行了 sc.register(selector, SelectionKey.OP_READ, null);}Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();// 读取到了 channel 中的数据log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);// 切换至读模式buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}
NIO vs BIO
stream vs channel
stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
二者均为全双工,即读写可以同时进行
IO 模型
同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞
当一个应用程序调用了 read 方法引发的事件
阻塞IO
一旦等待数据处发生阻塞,这个线程就什么也做不了了,只能老老实实的等待数据。不光用户态要等,内核态也要等。就像是一个单道环,没有回头路。
张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,等一天或者等一年,张三都在等,一直等到货来了,买到了,才回去。
非阻塞IO
一旦用户态发现内核态没有数据,就会立刻返回。下一次 where(true) 循环的时候,再去查看有无数据,一旦有数据了,就不会立刻返回。而是复制数据并返回。
张三去买一包 程序员安慰剂,到了药店,药店老板说要等一阵子才有货,张三一听转身就走,第二次,张三又去了,老板说有货,张三就买到货并返回。张三买货的期间是不能做其他事情的
多路复用
用户态调用 select 方法,查看内核是否有事件,没有事件则阻塞,有事件内核会通知用户态有新事件发生。用户线程就会调用 read 方法进行读取,需要复制数据,就会发生阻塞
张三去买一包 程序员安慰剂,但是他不清楚药店有没有货,就打电话给药店老板,老板说现在没有,等有了我告诉你。过了一两天,有货了,老板打电话给张三,让他来拿货。于是张三过去买了一包又回来了。
与阻塞IO不同的是,多路复用每次获取的是一批事件。
例如:张三需要吃饭,吃完饭后还要写代码,写完代码后又要去吃饭。这之中发生了等待时间就只能在这里等待了。例如,去吃饭饭店没菜了,厨子去买菜,饭什么时候可以做好,谁也说不清楚。写代码,停电了,什么时候来电,谁也不知道。再次期间就只能干等。多路复用则不同,没菜了?我不去,等你有菜了,来电了我再做事情,例如,饭店打电话说现在没菜了,那张三就会呆在家,哪里也不去,公司打电话说断电了,张三也哪里都不会去。直到饭店说买到菜了,公司说来电了,张三才会起身去吃饭,去公司上班。他获取的是一批事件,不会陷入某一个事件中等待。
通过上面的案例,可以得出一个结论,就是当用户线程去处理请求,说明此时已经有一个或多个请求要处理了,等待的时间已经在 select 处等待过了,一次性的处理多个事件。
同步:线程自己去获取结果(一个线程)
异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)
阻塞IO :自己调用read 方法,自己接收请求。同步阻塞
阻塞IO :自己调用read 方法,自己接收请求。同步非阻塞
多路复用:自己调用read 方法,自己接收请求。同步
异步非阻塞:当前线程调用完 Thread1 后会向下继续运行,不会阻塞
张三想买一包 程序员安慰剂,但自己又很忙,于是他叫来自己的小弟李四去买,张三去上班了,李四买完药后,回到家,拿给张三。
package com.zhao.c1;import com.zhao.io.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;/*** @Auther: HackerZhao* @Date: 2021/11/3 15:01* @Description: 异步io*/@Slf4j
public class AioFileChannel {public static void main(String[] args) throws IOException {try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {// 参数1:ByteBuffer// 参数2:读取的起始位置// 参数3:附件// 参数4:回调对象ByteBuffer buffer = ByteBuffer.allocate(16);log.debug("read begin");channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {// read 成功调用此方法/*** 参数1: 实际读取的字节数* 参数2: buffer 对象*/@Overridepublic void completed(Integer result, ByteBuffer attachment) {log.debug("read completed");attachment.flip();ByteBufferUtil.debugAll(attachment);}// read 失败调用此方法@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});log.debug("read end");} catch (IOException e) {e.printStackTrace();}System.in.read();}}
主线程打印完 read begin 和 read end 后,Thread 1 才开始运行.
零拷贝
package com.zhao.c1;import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;public class ClientDemo {public static void main(String[] args) throws IOException {//创建客户端的Socket对象(Socket)//Socket(String host, int port) 创建流套接字并将其连接到指定主机上的指定端口号Socket socket = new Socket("192.168.1.66",10000);File f = new File("data.txt");RandomAccessFile file = new RandomAccessFile(f,"r");byte[] buf = new byte[(int)f.length()];file.read(buf);//获取输出流,写数据socket.getOutputStream().write(buf);//释放资源socket.close();}
}
以上程序用户态和内核态之间的转换如下图
程序调用read 方法后,会有用户态转换成内核态。方法调用结束会有内核态转换为用户态,返回结果。会发生 1,2步骤的内容
程序调用 write 方法,会有用户态转换为内核态,会发生 3,4步骤的内容
程序运行,会发生3次 用户态与内核态的转换,4次 数据复制
二次优化
在 Netty基础 NIO中有介绍过使用 ByteBuffer.allocateDirect(16),创建的 buffer 使用的是直接内存,会减少一次拷贝。
buffer使用的是操作系统中的内存,当磁盘想缓冲区写入时,byte 数组和内核缓冲区用的是同一块内存。减少了一次拷贝
三次优化
无论是使用原始读取还是使用直接内存的 ByteBuffer,都绕不开java程序,linux 2.1 后提供的 sendFile 方法 ,java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
现在只需要 一次用户态到内核态的切换,是在 transferTo 方法发生后,3次 数据复制
四次优化
linux 2.4
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
零拷贝,不需要在java 层面发生拷贝,拷贝只发生在网络与操作系统之间
优点和适用场景
更少的用户态与内核态的切换
不利用 cpu 计算,减少 cpu 缓存伪共享
零拷贝适合小文件传输
多线程改进Selector相关推荐
- 【UDP通过多线程改进,在一个窗口中同时接收又发送】
package com.yjf.esupplier.common.test;import java.net.DatagramSocket; import java.net.SocketExceptio ...
- Netty框架之多线程的Selector优化
Netty框架之多线程的Selector优化 1.一个Boss线程,一个Woker线程实现服务器 2.一个Boss线程,多个Woker线程实现服务器 学习Selector之后,我们发现单个Select ...
- Java基础知识强化之网络编程笔记05:UDP之多线程实现聊天室案例
1. 通过多线程改进刚才的聊天程序,这样我就可以实现在一个窗口发送和接收数据了 2. 代码示例: (1)SendThread.java,如下: 1 package com.himi.udpDemo2 ...
- 后端开发-Reactor设计模式
在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,Proactor用于异步I/O操作. Reactor模式称之为响应器模式,通常用 ...
- Netty学习笔记二网络编程
Netty学习笔记二 二. 网络编程 1. 阻塞模式 阻塞主要表现为: 连接时阻塞 读取数据时阻塞 缺点: 阻塞单线程在没有连接时会阻塞等待连接的到达,连接到了以后,要进行读取数据,如果没有数据,还要 ...
- 【Netty】第二章 网络编程和 IO 概念剖析
[Netty]第二章 网络编程 文章目录 [Netty]第二章 网络编程 一.网络编程 1.模拟阻塞模式下服务器单线程处理请求 2.模拟非阻塞模式下服务器单线程处理请求 3.使用 Selector 改 ...
- 大牛实战归纳——Kafka架构原理
作者:qq_41534566 https://blog.csdn.net/qq_41534566/article/details/81210496 对于kafka的架构原理我们先提出几个问题? 1.K ...
- python network_python network(非常好)
关于网络编程以及socket 等一些概念和函数介绍就不再重复了,这里示例性用python 编写客户端和服务器端. 一.最简单的客户端流程: 1. Create a socket 2. Connect ...
- Python最佳代码实践:性能、内存和可用性!
作者:Satwik Kansal,译者:Prodesire 英文原文:https://dwz.cn/r4N2hvht 译文:https://zhuanlan.zhihu.com/p/28675694 ...
最新文章
- Hutool,一个贼好用的 Java 工具类库,用过都说好~
- tensorflow 的输入层和输出层维度注意事项
- 盘点丨毕业年薪34万,高校人工智能研究哪家强?
- Java创建线程的3种方式
- 检测客户pc电脑端VC++环境并安装
- jvm 调优_Java架构—JVM调优
- Django项目部署到阿里云服务器上无法发送邮件STMP
- php 并发 100 压测,简单PHP把握站点并发数
- 傅里叶级数的数学推导
- android超级管理员权限作用,Android获取超级管理员权限的实现
- phpstrom php cli,在docker中的PhpStorm 2017.1远程php-cli:配置php.ini文件不存在
- php 页面上显示xls文档,phpExcel输出xls文档显示乱码的解决方法
- Eclipse 添加书签
- 在 可编辑的 Div 的 光标位置 插入 文字 或 HTML
- 天正双击墙体不能编辑_天正CAD绘图必须要知道的技巧
- TOPSIS(优劣解距离法)
- 计算机图形和ps的区别,PS CC和PS CC有什么区别?
- qq远程控制无法连接服务器,win10系统QQ远程协助,无法控制对方电脑,怎么办?...
- 全新织梦DEDE CMS模板-精仿qq技术导航网站源码
- VR乒乓球项目Unity3D 开发经验整理,4简单而有效的AI