多线程Reactor模式
文章目录
- 1.1 主服务器
- 2.1 IO请求handler+线程池
- 3.1 客户端
多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。
让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度
1.1 主服务器
package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Logger;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;class MultiThreadEchoServerReactor {ServerSocketChannel serverSocket;AtomicInteger next = new AtomicInteger(0);Selector bossSelector = null;Reactor bossReactor = null;//selectors集合,引入多个selector选择器//多个选择器可以更好的提高通道的并发量Selector[] workSelectors = new Selector[2];//引入多个子反应器//如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。//每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量Reactor[] workReactors = null;MultiThreadEchoServerReactor() throws IOException {bossSelector = Selector.open();//初始化多个selector选择器workSelectors[0] = Selector.open();workSelectors[1] = Selector.open();serverSocket = ServerSocketChannel.open();InetSocketAddress address =new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT);serverSocket.socket().bind(address);//非阻塞serverSocket.configureBlocking(false);//第一个selector,负责监控新连接事件SelectionKey sk =serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);//附加新连接处理handler处理器到SelectionKey(选择键)sk.attach(new AcceptorHandler());//处理新连接的反应器bossReactor = new Reactor(bossSelector);//第一个子反应器,一子反应器负责一个选择器Reactor subReactor1 = new Reactor(workSelectors[0]);//第二个子反应器,一子反应器负责一个选择器Reactor subReactor2 = new Reactor(workSelectors[1]);workReactors = new Reactor[]{subReactor1, subReactor2};}private void startService() {new Thread(bossReactor).start();// 一子反应器对应一条线程new Thread(workReactors[0]).start();new Thread(workReactors[1]).start();}//反应器class Reactor implements Runnable {//每条线程负责一个选择器的查询final Selector selector;public Reactor(Selector selector) {this.selector = selector;}public void run() {try {while (!Thread.interrupted()) {//单位为毫秒//每隔一秒列出选择器感应列表selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();if (null == selectedKeys || selectedKeys.size() == 0) {//如果列表中的通道注册事件没有发生那就继续执行continue;}Iterator<SelectionKey> it = selectedKeys.iterator();while (it.hasNext()) {//Reactor负责dispatch收到的事件SelectionKey sk = it.next();dispatch(sk);}//清楚掉已经处理过的感应事件,防止重复处理selectedKeys.clear();}} catch (IOException ex) {ex.printStackTrace();}}void dispatch(SelectionKey sk) {Runnable handler = (Runnable) sk.attachment();//调用之前attach绑定到选择键的handler处理器对象if (handler != null) {handler.run();}}}// Handler:新连接处理器class AcceptorHandler implements Runnable {public void run() {try {SocketChannel channel = serverSocket.accept();Logger.info("接收到一个新的连接");if (channel != null) {int index = next.get();Logger.info("选择器的编号:" + index);Selector selector = workSelectors[index];new MultiThreadEchoHandler(selector, channel);}} catch (IOException e) {e.printStackTrace();}if (next.incrementAndGet() == workSelectors.length) {next.set(0);}}}public static void main(String[] args) throws IOException {MultiThreadEchoServerReactor server =new MultiThreadEchoServerReactor();server.startService();}}
按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。
这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。
//第一个selector,负责监控新连接事件SelectionKey sk =serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);//附加新连接处理handler处理器到SelectionKey(选择键)sk.attach(new AcceptorHandler());
但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:
主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。
int index = next.get();Logger.info("选择器的编号:" + index);Selector selector = workSelectors[index];new MultiThreadEchoHandler(selector, channel);
2.1 IO请求handler+线程池
package com.crazymakercircle.ReactorModel;import com.crazymakercircle.util.Logger;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;class MultiThreadEchoHandler implements Runnable {final SocketChannel channel;final SelectionKey sk;final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);static final int RECIEVING = 0, SENDING = 1;int state = RECIEVING;//引入线程池static ExecutorService pool = Executors.newFixedThreadPool(4);MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {channel = c;channel.configureBlocking(false);//唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞bossselector.wakeup();//仅仅取得选择键,后设置感兴趣的IO事件sk = channel.register(selector, 0);//将本Handler作为sk选择键的附件,方便事件dispatchsk.attach(this);//向sk选择键注册Read就绪事件sk.interestOps(SelectionKey.OP_READ);//唤醒选择,是的OP_READ生效selector.wakeup();Logger.info("新的连接 注册完成");}public void run() {//异步任务,在独立的线程池中执行pool.execute(new AsyncTask());}//异步任务,不在Reactor线程中执行public synchronized void asyncRun() {try {if (state == SENDING) {//写入通道channel.write(byteBuffer);//写完后,准备开始从通道读,byteBuffer切换成写模式byteBuffer.clear();//写完后,注册read就绪事件sk.interestOps(SelectionKey.OP_READ);//写完后,进入接收的状态state = RECIEVING;} else if (state == RECIEVING) {//从通道读int length = 0;while ((length = channel.read(byteBuffer)) > 0) {Logger.info(new String(byteBuffer.array(), 0, length));}//读完后,准备开始写入通道,byteBuffer切换成读模式byteBuffer.flip();//读完后,注册write就绪事件sk.interestOps(SelectionKey.OP_WRITE);//读完后,进入发送的状态state = SENDING;}//处理结束了, 这里不能关闭select key,需要重复使用//sk.cancel();} catch (IOException ex) {ex.printStackTrace();}}//异步任务的内部类class AsyncTask implements Runnable {public void run() {MultiThreadEchoHandler.this.asyncRun();}}}
在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。
3.1 客户端
package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Dateutil;
import com.crazymakercircle.util.Logger;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;/*** create by 尼恩 @ 疯狂创客圈**/
public class EchoClient {public void start() throws IOException {InetSocketAddress address =new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT);// 1、获取通道(channel)SocketChannel socketChannel = SocketChannel.open(address);Logger.info("客户端连接成功");// 2、切换成非阻塞模式socketChannel.configureBlocking(false);//不断的自旋、等待连接完成,或者做一些其他的事情while (!socketChannel.finishConnect()) {}Logger.tcfo("客户端启动成功!");//启动接受线程Processer processer = new Processer(socketChannel);new Thread(processer).start();}static class Processer implements Runnable {final Selector selector;final SocketChannel channel;Processer(SocketChannel channel) throws IOException {//Reactor初始化selector = Selector.open();this.channel = channel;channel.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE);}public void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selected = selector.selectedKeys();Iterator<SelectionKey> it = selected.iterator();while (it.hasNext()) {SelectionKey sk = it.next();if (sk.isWritable()) {ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);Scanner scanner = new Scanner(System.in);Logger.tcfo("请输入发送内容:");if (scanner.hasNext()) {SocketChannel socketChannel = (SocketChannel) sk.channel();String next = scanner.next();buffer.put((Dateutil.getNow() + " >>" + next).getBytes());buffer.flip();// 操作三:发送数据socketChannel.write(buffer);buffer.clear();}}if (sk.isReadable()) {// 若选择键的IO事件是“可读”事件,读取数据SocketChannel socketChannel = (SocketChannel) sk.channel();//读取数据ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int length = 0;while ((length = socketChannel.read(byteBuffer)) > 0) {byteBuffer.flip();Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));byteBuffer.clear();}}//处理结束了, 这里不能关闭select key,需要重复使用//selectionKey.cancel();}selected.clear();}} catch (IOException ex) {ex.printStackTrace();}}}public static void main(String[] args) throws IOException {new EchoClient().start();}
}
多线程Reactor模式相关推荐
- Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式
本文介绍了Java中的四种I/O模型,同步阻塞,同步非阻塞,多路复用,异步阻塞.同时将NIO和BIO进行了对比,并详细分析了基于NIO的Reactor模式,包括经典单线程模型以及多线程模式和多Reac ...
- Reactor模式!
文章目录 Reactor模式介绍 什么是Reactor模式 ? 为什么使用Reactor模式 ? Reactor模式的演进过程 单Reactor单线程 单Reactor多线程 多Reactor多线程 ...
- Reactor模式与NIO
文章目录 Reactor模式与NIO 简单的NIO编程方式 Reactor模式 单线程Reactor模式 多线程Reactor模式 主从Reactor多线程模型 Reactor模式与NIO NIO是非 ...
- Netty基础入门——Reactor模式
文章目录 1. 前言 2. 单线程Reactor模式 3. 多线程Reactor模式 4. Reactor模式和观察者模式 1. 前言 Reactor模式是高性能.高并发技术中非常重要的基础知识,只有 ...
- 【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )
文章目录 一. 反应器 ( Reactor ) 模式 二. 反应器 ( Reactor ) 模式两大组件 三. 单反应器 ( Reactor ) 单线程 四. 单反应器 ( Reactor ) 单线程 ...
- reactor多线程模型_Netty运用Reactor模式到极致
常见的reactor模式有以下三种 单线程reactor 多线程reactor 主从reactor 1.单线程reactor ractor 单线程模式是指所有的I/O操作都在一个NIO线程完成,该线程 ...
- reactor模式:多线程的reactor模式
上文说到单线程的reactor模式 reactor模式:单线程的reactor模式 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题,所以多线程的reactor模式引入线程池的概念, ...
- 高性能IO之Reactor模式
讲到高性能IO绕不开Reactor模式,它是大多数IO相关组件如Netty.Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢? 最最原始的网络编程思路就是服务器用一个w ...
- 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...
最新文章
- 扫描自己进入VR中 SLAM Scan 3D引擎将做到
- WebMvcConfigurerAdapter过时了替代的方法
- kettle 遇到 解决Incorrect integer value: '' for column 'id' at row 1 完美解决-费元星
- datareader对象直接转化为int_Integer、new Integer() 和 int 比较的面试题
- 年末巨制:知识图谱嵌入方法研究总结
- 51ctopython自动化测试工程师课程价格,Python自动化测试开发实战 一门能就业的测试课...
- 大华股份携手阿里云计算 涉足智能家居
- 【转】cron表达式详解
- windows系统TLQ8安装时提示载入java vm时windows出现错误
- 戏人看戏,苏旭博客网-学无止尽
- 川农计算机分数线,2016四川农业大学录取分数线(附15年调档线)
- android+锁屏显示农历,在手机锁屏界面上显示农历日期和天气
- Error: [vuex] do not mutate vuex store state outside mutation handlers.报错的解决方法
- 大数据解读《旅行青蛙》崛起之谜
- 注入漏洞-sql注入
- Graphics2D进行后台绘图
- CU4C字符集检测和转换,C++版本
- 浅析运动健身APP开发的四种模式
- 爬虫数据分析实战——腾讯视频《奔跑吧》第九季弹幕数据分析
- 为用户设计舒适的姿势
热门文章
- 无圆角的四方孔的机构原理及运动仿真
- 使用POI技术往Excel中写入图片并以附件的形式发送给对方
- wifi中2.4GHz、5GHz、db信道介绍
- 南瓜做法大全家常菜 南瓜怎么做好吃
- JDBC系列(九):JDBC与数据库连接池(Druid-德鲁伊)使用步骤
- 清华朱军团队开源首个基于Transformer的多模态扩散大模型
- 一个叫搜索引擎的家伙
- gnupg,gnupg2 and gnupg1 do not seem to be installed,but one of them is required for this operation
- 不可错过的吃鸡攻略:多位抖音吃鸡主播关于刺激战场的神级操作技巧汇总
- 说话之道学习笔记1-转述赞美