文章目录

  • 1.1 主服务器
  • 2.1 IO请求handler+线程池
  • 3.1 客户端

多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。
让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度

1.1 主服务器

package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Logger;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;class MultiThreadEchoServerReactor {ServerSocketChannel serverSocket;AtomicInteger next = new AtomicInteger(0);Selector bossSelector = null;Reactor bossReactor = null;//selectors集合,引入多个selector选择器//多个选择器可以更好的提高通道的并发量Selector[] workSelectors = new Selector[2];//引入多个子反应器//如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。//每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量Reactor[] workReactors = null;MultiThreadEchoServerReactor() throws IOException {bossSelector = Selector.open();//初始化多个selector选择器workSelectors[0] = Selector.open();workSelectors[1] = Selector.open();serverSocket = ServerSocketChannel.open();InetSocketAddress address =new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT);serverSocket.socket().bind(address);//非阻塞serverSocket.configureBlocking(false);//第一个selector,负责监控新连接事件SelectionKey sk =serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);//附加新连接处理handler处理器到SelectionKey(选择键)sk.attach(new AcceptorHandler());//处理新连接的反应器bossReactor = new Reactor(bossSelector);//第一个子反应器,一子反应器负责一个选择器Reactor subReactor1 = new Reactor(workSelectors[0]);//第二个子反应器,一子反应器负责一个选择器Reactor subReactor2 = new Reactor(workSelectors[1]);workReactors = new Reactor[]{subReactor1, subReactor2};}private void startService() {new Thread(bossReactor).start();// 一子反应器对应一条线程new Thread(workReactors[0]).start();new Thread(workReactors[1]).start();}//反应器class Reactor implements Runnable {//每条线程负责一个选择器的查询final Selector selector;public Reactor(Selector selector) {this.selector = selector;}public void run() {try {while (!Thread.interrupted()) {//单位为毫秒//每隔一秒列出选择器感应列表selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();if (null == selectedKeys || selectedKeys.size() == 0) {//如果列表中的通道注册事件没有发生那就继续执行continue;}Iterator<SelectionKey> it = selectedKeys.iterator();while (it.hasNext()) {//Reactor负责dispatch收到的事件SelectionKey sk = it.next();dispatch(sk);}//清楚掉已经处理过的感应事件,防止重复处理selectedKeys.clear();}} catch (IOException ex) {ex.printStackTrace();}}void dispatch(SelectionKey sk) {Runnable handler = (Runnable) sk.attachment();//调用之前attach绑定到选择键的handler处理器对象if (handler != null) {handler.run();}}}// Handler:新连接处理器class AcceptorHandler implements Runnable {public void run() {try {SocketChannel channel = serverSocket.accept();Logger.info("接收到一个新的连接");if (channel != null) {int index = next.get();Logger.info("选择器的编号:" + index);Selector selector = workSelectors[index];new MultiThreadEchoHandler(selector, channel);}} catch (IOException e) {e.printStackTrace();}if (next.incrementAndGet() == workSelectors.length) {next.set(0);}}}public static void main(String[] args) throws IOException {MultiThreadEchoServerReactor server =new MultiThreadEchoServerReactor();server.startService();}}

按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。

这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。

 //第一个selector,负责监控新连接事件SelectionKey sk =serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);//附加新连接处理handler处理器到SelectionKey(选择键)sk.attach(new AcceptorHandler());

但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:
主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。

                   int index = next.get();Logger.info("选择器的编号:" + index);Selector selector = workSelectors[index];new MultiThreadEchoHandler(selector, channel);

2.1 IO请求handler+线程池

package com.crazymakercircle.ReactorModel;import com.crazymakercircle.util.Logger;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;class MultiThreadEchoHandler implements Runnable {final SocketChannel channel;final SelectionKey sk;final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);static final int RECIEVING = 0, SENDING = 1;int state = RECIEVING;//引入线程池static ExecutorService pool = Executors.newFixedThreadPool(4);MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {channel = c;channel.configureBlocking(false);//唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞bossselector.wakeup();//仅仅取得选择键,后设置感兴趣的IO事件sk = channel.register(selector, 0);//将本Handler作为sk选择键的附件,方便事件dispatchsk.attach(this);//向sk选择键注册Read就绪事件sk.interestOps(SelectionKey.OP_READ);//唤醒选择,是的OP_READ生效selector.wakeup();Logger.info("新的连接 注册完成");}public void run() {//异步任务,在独立的线程池中执行pool.execute(new AsyncTask());}//异步任务,不在Reactor线程中执行public synchronized void asyncRun() {try {if (state == SENDING) {//写入通道channel.write(byteBuffer);//写完后,准备开始从通道读,byteBuffer切换成写模式byteBuffer.clear();//写完后,注册read就绪事件sk.interestOps(SelectionKey.OP_READ);//写完后,进入接收的状态state = RECIEVING;} else if (state == RECIEVING) {//从通道读int length = 0;while ((length = channel.read(byteBuffer)) > 0) {Logger.info(new String(byteBuffer.array(), 0, length));}//读完后,准备开始写入通道,byteBuffer切换成读模式byteBuffer.flip();//读完后,注册write就绪事件sk.interestOps(SelectionKey.OP_WRITE);//读完后,进入发送的状态state = SENDING;}//处理结束了, 这里不能关闭select key,需要重复使用//sk.cancel();} catch (IOException ex) {ex.printStackTrace();}}//异步任务的内部类class AsyncTask implements Runnable {public void run() {MultiThreadEchoHandler.this.asyncRun();}}}

在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。

3.1 客户端

package com.crazymakercircle.ReactorModel;import com.crazymakercircle.NioDemoConfig;
import com.crazymakercircle.util.Dateutil;
import com.crazymakercircle.util.Logger;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;/*** create by 尼恩 @ 疯狂创客圈**/
public class EchoClient {public void start() throws IOException {InetSocketAddress address =new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT);// 1、获取通道(channel)SocketChannel socketChannel = SocketChannel.open(address);Logger.info("客户端连接成功");// 2、切换成非阻塞模式socketChannel.configureBlocking(false);//不断的自旋、等待连接完成,或者做一些其他的事情while (!socketChannel.finishConnect()) {}Logger.tcfo("客户端启动成功!");//启动接受线程Processer processer = new Processer(socketChannel);new Thread(processer).start();}static class Processer implements Runnable {final Selector selector;final SocketChannel channel;Processer(SocketChannel channel) throws IOException {//Reactor初始化selector = Selector.open();this.channel = channel;channel.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE);}public void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selected = selector.selectedKeys();Iterator<SelectionKey> it = selected.iterator();while (it.hasNext()) {SelectionKey sk = it.next();if (sk.isWritable()) {ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);Scanner scanner = new Scanner(System.in);Logger.tcfo("请输入发送内容:");if (scanner.hasNext()) {SocketChannel socketChannel = (SocketChannel) sk.channel();String next = scanner.next();buffer.put((Dateutil.getNow() + " >>" + next).getBytes());buffer.flip();// 操作三:发送数据socketChannel.write(buffer);buffer.clear();}}if (sk.isReadable()) {// 若选择键的IO事件是“可读”事件,读取数据SocketChannel socketChannel = (SocketChannel) sk.channel();//读取数据ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int length = 0;while ((length = socketChannel.read(byteBuffer)) > 0) {byteBuffer.flip();Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));byteBuffer.clear();}}//处理结束了, 这里不能关闭select key,需要重复使用//selectionKey.cancel();}selected.clear();}} catch (IOException ex) {ex.printStackTrace();}}}public static void main(String[] args) throws IOException {new EchoClient().start();}
}

多线程Reactor模式相关推荐

  1. Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

    本文介绍了Java中的四种I/O模型,同步阻塞,同步非阻塞,多路复用,异步阻塞.同时将NIO和BIO进行了对比,并详细分析了基于NIO的Reactor模式,包括经典单线程模型以及多线程模式和多Reac ...

  2. Reactor模式!

    文章目录 Reactor模式介绍 什么是Reactor模式 ? 为什么使用Reactor模式 ? Reactor模式的演进过程 单Reactor单线程 单Reactor多线程 多Reactor多线程 ...

  3. Reactor模式与NIO

    文章目录 Reactor模式与NIO 简单的NIO编程方式 Reactor模式 单线程Reactor模式 多线程Reactor模式 主从Reactor多线程模型 Reactor模式与NIO NIO是非 ...

  4. Netty基础入门——Reactor模式

    文章目录 1. 前言 2. 单线程Reactor模式 3. 多线程Reactor模式 4. Reactor模式和观察者模式 1. 前言 Reactor模式是高性能.高并发技术中非常重要的基础知识,只有 ...

  5. 【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )

    文章目录 一. 反应器 ( Reactor ) 模式 二. 反应器 ( Reactor ) 模式两大组件 三. 单反应器 ( Reactor ) 单线程 四. 单反应器 ( Reactor ) 单线程 ...

  6. reactor多线程模型_Netty运用Reactor模式到极致

    常见的reactor模式有以下三种 单线程reactor 多线程reactor 主从reactor 1.单线程reactor ractor 单线程模式是指所有的I/O操作都在一个NIO线程完成,该线程 ...

  7. reactor模式:多线程的reactor模式

    上文说到单线程的reactor模式 reactor模式:单线程的reactor模式 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题,所以多线程的reactor模式引入线程池的概念, ...

  8. 高性能IO之Reactor模式

    讲到高性能IO绕不开Reactor模式,它是大多数IO相关组件如Netty.Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢? 最最原始的网络编程思路就是服务器用一个w ...

  9. 【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )

    文章目录 一. NIO 原生 API 弊端 二. Netty 简介 三. Netty 架构 四. Netty 版本 五. Netty 线程模型 六. 阻塞 IO 线程模型 七. 反应器 ( React ...

最新文章

  1. 扫描自己进入VR中 SLAM Scan 3D引擎将做到
  2. WebMvcConfigurerAdapter过时了替代的方法
  3. kettle 遇到 解决Incorrect integer value: '' for column 'id' at row 1 完美解决-费元星
  4. datareader对象直接转化为int_Integer、new Integer() 和 int 比较的面试题
  5. 年末巨制:知识图谱嵌入方法研究总结
  6. 51ctopython自动化测试工程师课程价格,Python自动化测试开发实战 一门能就业的测试课...
  7. 大华股份携手阿里云计算 涉足智能家居
  8. 【转】cron表达式详解
  9. windows系统TLQ8安装时提示载入java vm时windows出现错误
  10. 戏人看戏,苏旭博客网-学无止尽
  11. 川农计算机分数线,2016四川农业大学录取分数线(附15年调档线)
  12. android+锁屏显示农历,在手机锁屏界面上显示农历日期和天气
  13. Error: [vuex] do not mutate vuex store state outside mutation handlers.报错的解决方法
  14. 大数据解读《旅行青蛙》崛起之谜
  15. 注入漏洞-sql注入
  16. Graphics2D进行后台绘图
  17. CU4C字符集检测和转换,C++版本
  18. 浅析运动健身APP开发的四种模式
  19. 爬虫数据分析实战——腾讯视频《奔跑吧》第九季弹幕数据分析
  20. 为用户设计舒适的姿势

热门文章

  1. 无圆角的四方孔的机构原理及运动仿真
  2. 使用POI技术往Excel中写入图片并以附件的形式发送给对方
  3. wifi中2.4GHz、5GHz、db信道介绍
  4. 南瓜做法大全家常菜 南瓜怎么做好吃
  5. JDBC系列(九):JDBC与数据库连接池(Druid-德鲁伊)使用步骤
  6. 清华朱军团队开源首个基于Transformer的多模态扩散大模型
  7. 一个叫搜索引擎的家伙
  8. gnupg,gnupg2 and gnupg1 do not seem to be installed,but one of them is required for this operation
  9. 不可错过的吃鸡攻略:多位抖音吃鸡主播关于刺激战场的神级操作技巧汇总
  10. 说话之道学习笔记1-转述赞美