上文说到单线程的reactor模式 reactor模式:单线程的reactor模式

单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题,所以多线程的reactor模式引入线程池的概念,把耗时的IO操作交由线程池处理,处理完了之后再同步到selectionkey中,服务器架构图如下

上文(reactor模式:单线程的reactor模式)提到,以read和send阶段IO最为频繁,所以多线程的reactor版本里,把这2个阶段单独拎出来。

下面看看代码实现

 1 // Reactor線程 (该类与单线程的处理基本无变动)
 2     package server;
 3
 4     import java.io.IOException;
 5     import java.net.InetSocketAddress;
 6     import java.nio.channels.SelectionKey;
 7     import java.nio.channels.Selector;
 8     import java.nio.channels.ServerSocketChannel;
 9     import java.util.Iterator;
10     import java.util.Set;
11
12     public class TCPReactor implements Runnable {
13
14         private final ServerSocketChannel ssc;
15         private final Selector selector;
16
17         public TCPReactor(int port) throws IOException {
18             selector = Selector.open();
19             ssc = ServerSocketChannel.open();
20             InetSocketAddress addr = new InetSocketAddress(port);
21             ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
22             ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞
23             SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
24             sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象
25         }
26
27         @Override
28         public void run() {
29             while (!Thread.interrupted()) { // 在線程被中斷前持續運行
30                 System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
31                 try {
32                     if (selector.select() == 0) // 若沒有事件就緒則不往下執行
33                         continue;
34                 } catch (IOException e) {
35                     // TODO Auto-generated catch block
36                     e.printStackTrace();
37                 }
38                 Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
39                 Iterator<SelectionKey> it = selectedKeys.iterator();
40                 while (it.hasNext()) {
41                     dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度
42                     it.remove();
43                 }
44             }
45         }
46
47         /*
48          * name: dispatch(SelectionKey key)
49          * description: 調度方法,根據事件綁定的對象開新線程
50          */
51         private void dispatch(SelectionKey key) {
52             Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
53             if (r != null)
54                 r.run();
55         }
56
57     }  

 1  // 接受連線請求線程
 2     package server;
 3
 4     import java.io.IOException;
 5     import java.nio.channels.SelectionKey;
 6     import java.nio.channels.Selector;
 7     import java.nio.channels.ServerSocketChannel;
 8     import java.nio.channels.SocketChannel;
 9
10     public class Acceptor implements Runnable {
11
12         private final ServerSocketChannel ssc;
13         private final Selector selector;
14
15         public Acceptor(Selector selector, ServerSocketChannel ssc) {
16             this.ssc=ssc;
17             this.selector=selector;
18         }
19
20         @Override
21         public void run() {
22             try {
23                 SocketChannel sc= ssc.accept(); // 接受client連線請求
24                 System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
25
26                 if(sc!=null) {
27                     sc.configureBlocking(false); // 設置為非阻塞
28                     SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key
29                     selector.wakeup(); // 使一個阻塞住的selector操作立即返回
30                     sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
31                 }
32
33             } catch (IOException e) {
34                 // TODO Auto-generated catch block
35                 e.printStackTrace();
36             }
37         }
38
39
40     }  

 1     // Handler線程
 2     package server;
 3
 4     import java.io.IOException;
 5     import java.nio.channels.SelectionKey;
 6     import java.nio.channels.SocketChannel;
 7     import java.util.concurrent.LinkedBlockingQueue;
 8     import java.util.concurrent.ThreadPoolExecutor;
 9     import java.util.concurrent.TimeUnit;
10
11     public class TCPHandler implements Runnable {
12
13         private final SelectionKey sk;
14         private final SocketChannel sc;
15         private static final int THREAD_COUNTING = 10;
16         private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
17                 THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
18                 new LinkedBlockingQueue<Runnable>()); // 線程池
19
20         HandlerState state; // 以狀態模式實現Handler
21
22         public TCPHandler(SelectionKey sk, SocketChannel sc) {
23             this.sk = sk;
24             this.sc = sc;
25             state = new ReadState(); // 初始狀態設定為READING
26             pool.setMaximumPoolSize(32); // 設置線程池最大線程數
27         }
28
29         @Override
30         public void run() {
31             try {
32                 state.handle(this, sk, sc, pool);
33
34             } catch (IOException e) {
35                 System.out.println("[Warning!] A client has been closed.");
36                 closeChannel();
37             }
38         }
39
40         public void closeChannel() {
41             try {
42                 sk.cancel();
43                 sc.close();
44             } catch (IOException e1) {
45                 e1.printStackTrace();
46             }
47         }
48
49         public void setState(HandlerState state) {
50             this.state = state;
51         }
52     }
53
54  

 1     package server;
 2
 3     import java.io.IOException;
 4     import java.nio.channels.SelectionKey;
 5     import java.nio.channels.SocketChannel;
 6     import java.util.concurrent.ThreadPoolExecutor;
 7
 8     public interface HandlerState {
 9
10         public void changeState(TCPHandler h);
11
12         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
13                 ThreadPoolExecutor pool) throws IOException ;
14     }  

 1     package server;
 2
 3     import java.io.IOException;
 4     import java.nio.ByteBuffer;
 5     import java.nio.channels.SelectionKey;
 6     import java.nio.channels.SocketChannel;
 7     import java.util.concurrent.ThreadPoolExecutor;
 8
 9     public class ReadState implements HandlerState{
10
11         private SelectionKey sk;
12
13         public ReadState() {
14         }
15
16         @Override
17         public void changeState(TCPHandler h) {
18             // TODO Auto-generated method stub
19             h.setState(new WorkState());
20         }
21
22         @Override
23         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
24                 ThreadPoolExecutor pool) throws IOException { // read()
25             this.sk = sk;
26             // non-blocking下不可用Readers,因為Readers不支援non-blocking
27             byte[] arr = new byte[1024];
28             ByteBuffer buf = ByteBuffer.wrap(arr);
29
30             int numBytes = sc.read(buf); // 讀取字符串
31             if(numBytes == -1)
32             {
33                 System.out.println("[Warning!] A client has been closed.");
34                 h.closeChannel();
35                 return;
36             }
37             String str = new String(arr); // 將讀取到的byte內容轉為字符串型態
38             if ((str != null) && !str.equals(" ")) {
39                 h.setState(new WorkState()); // 改變狀態(READING->WORKING)
40                 pool.execute(new WorkerThread(h, str)); // do process in worker thread
41                 System.out.println(sc.socket().getRemoteSocketAddress().toString()
42                         + " > " + str);
43             }
44
45         }
46
47         /*
48          * 執行邏輯處理之函數
49          */
50         synchronized void process(TCPHandler h, String str) {
51             // do process(decode, logically process, encode)..
52             // ..
53             h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
54             this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
55             this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
56         }
57
58         /*
59          * 工作者線程
60          */
61         class WorkerThread implements Runnable {
62
63             TCPHandler h;
64             String str;
65
66             public WorkerThread(TCPHandler h, String str) {
67                 this.h = h;
68                 this.str=str;
69             }
70
71             @Override
72             public void run() {
73                 process(h, str);
74             }
75
76         }
77     }  

 1  package server;
 2
 3     import java.io.IOException;
 4     import java.nio.channels.SelectionKey;
 5     import java.nio.channels.SocketChannel;
 6     import java.util.concurrent.ThreadPoolExecutor;
 7
 8     public class WorkState implements HandlerState {
 9
10         public WorkState() {
11         }
12
13         @Override
14         public void changeState(TCPHandler h) {
15             // TODO Auto-generated method stub
16             h.setState(new WriteState());
17         }
18
19         @Override
20         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
21                 ThreadPoolExecutor pool) throws IOException {
22             // TODO Auto-generated method stub
23
24         }
25
26     }  

 1     package server;
 2
 3     import java.io.IOException;
 4     import java.nio.ByteBuffer;
 5     import java.nio.channels.SelectionKey;
 6     import java.nio.channels.SocketChannel;
 7     import java.util.concurrent.ThreadPoolExecutor;
 8
 9     public class WriteState implements HandlerState{
10
11         public WriteState() {
12         }
13
14         @Override
15         public void changeState(TCPHandler h) {
16             // TODO Auto-generated method stub
17             h.setState(new ReadState());
18         }
19
20         @Override
21         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
22                 ThreadPoolExecutor pool) throws IOException { // send()
23             // get message from message queue
24
25             String str = "Your message has sent to "
26                     + sc.socket().getLocalSocketAddress().toString() + "\r\n";
27             ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
28
29             while (buf.hasRemaining()) {
30                 sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
31             }
32
33             h.setState(new ReadState()); // 改變狀態(SENDING->READING)
34             sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
35             sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
36         }
37     }  

 1     package server;
 2
 3     import java.io.IOException;
 4
 5     public class Main {
 6
 7
 8         public static void main(String[] args) {
 9             // TODO Auto-generated method stub
10             try {
11                 TCPReactor reactor = new TCPReactor(1333);
12                 reactor.run();
13             } catch (IOException e) {
14                 // TODO Auto-generated catch block
15                 e.printStackTrace();
16             }
17         }
18
19     }  

总的来说,多线程版本的reactor是为了解决单线程reactor版本的IO和CPU处理速度不匹配问题,从而达到高效处理的目的

参考文章:

https://blog.csdn.net/yehjordan/article/details/51017025

转载于:https://www.cnblogs.com/billmiao/p/9872221.html

reactor模式:多线程的reactor模式相关推荐

  1. 【Netty】反应器 Reactor 模式 ( 单反应器 Reactor 单线程 | 单反应器 Reactor 多线程 )

    文章目录 一. 反应器 ( Reactor ) 模式 二. 反应器 ( Reactor ) 模式两大组件 三. 单反应器 ( Reactor ) 单线程 四. 单反应器 ( Reactor ) 单线程 ...

  2. C# 多线程并发锁模式-总结

    开篇: 互斥还是lock Monitor Mutex 模式! Muex Monitor lock AutoEventSet ManualEventSet 后续的 ReaderWriterLock Re ...

  3. 多线程终极模式:生产者-消费者模式

    多线程de小事情 导航不迷路: 程序.进程以及线程的爱恨情仇 最简单实现多线程的方法(Thread) 简单易懂的多线程(通过实现Runnable接口实现多线程) 常用获取线程基本信息的方法(新手专属) ...

  4. 多线程编程反模式_编程反模式

    多线程编程反模式 您是否曾经进行过代码审查,记录了非常高的WTF / m? 您是否想知道所有这些错误代码的原因是什么? 在大多数情况下,导致原因1的原因是使用设计和编码反模式. 如果您喜欢定义,请参见 ...

  5. 多窗口售票:单件模式多线程实现

    多窗口售票:单件模式多线程实现 2017-06-01 场景如下:总共100张票,编号1~100,三个窗口售票,售完为止. Tickets.java 票为单件模式,代码如下: package Teste ...

  6. 并发编程(五)python实现生产者消费者模式多线程爬虫

    并发编程专栏系列博客 并发编程(一)python并发编程简介 并发编程(二)怎样选择多线程多进程和多协程 并发编程(三)Python编程慢的罪魁祸首.全局解释器锁GIL 并发编程(四)如何使用多线程, ...

  7. C#.多线程 (一)多线程(异步模式)与单线程(同步模式)的应用与区别 举例

    先看网上百度的几张图片: 什么是单线程?单线程工作模式也成为同步模式.其就是在一定状态下只能做一件事情,比如我在18:00-18:30时间段可以做饭. 什么是多线程?多线程工作模式也成为异步模式.其就 ...

  8. 从开源框架细节的来分析网络模块的封装丨网络模块|Redis|skynet|多线程|单线程|reactor多核实现|IO多路复用

    从开源框架细节的来分析网络模块的封装 视频讲解如下: 从开源框架细节的来分析网络模块的封装丨网络模块|Redis|skynet|多线程|单线程|reactor多核实现|IO多路复用丨c/c++linu ...

  9. java中reactor模型_Java——Netty Reactor模型(转)

    1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接 ...

最新文章

  1. LeetCode Length of Last Word(最后一个单词的长度)
  2. unity 实现手机振动_Unity 手机震动插件Vibration
  3. Linux centos查看cpu信息命令
  4. 每个人都必须阅读的10篇Java文章
  5. html超链接使用d,HTML图像的调用和超链接
  6. 使用LiteOS Studio图形化查看LiteOS在STM32上运行的奥秘
  7. 字符集ASCII、GBK、UNICODE、UTF在储存字符时的区别
  8. 软件版本命名规范及各阶段说明
  9. opengl 保留上一帧_历史上第一部长片动画,还真有点重口
  10. android 1024 github,1024 怎么能少了这款高颜值、敲实用的 GitHub 第三方客户端呢?...
  11. Unity 资源加载卸载过程
  12. bfptr算法(即中位数的中位数算法)
  13. PyCharm 激活 截止日期2100年1月
  14. linux系统bcast,关于linux的Bcast的疑问.请大家帮忙看看,谢谢啦
  15. JAVA图片裁剪上传实例______软件开发-帮助类
  16. 给IDEA换个酷炫的主题,这个有点哇塞啊!
  17. 密码学之RSA加密原理解析
  18. 2022年通信工程专业保研:从四非到浙大工院夏令营面试经验分享(前期准备篇/含通信原理面试真题)
  19. SiT5711:±5~±8ppb超高精度Stratum 3E恒温振荡器OCXO,1-60MHz
  20. 解决 docker 中 zsh: command not found: jupyterlab 问题

热门文章

  1. SUSE Linux 维护笔记一
  2. k均值算法 二分k均值算法_如何获得K均值算法面试问题
  3. opencv 创建图像_非艺术家的图像创建(OpenCV项目演练)
  4. 这些贷款冷知识你知道多少?
  5. 日本电影《摇摆》:男人之间的心灵碰撞
  6. 485通信原理_上位机开发之单片机通信实践
  7. html表单赋值提交,jQuery自动给表单赋值
  8. java不同垃圾回收器_细述 Java垃圾回收机制→Types of Java Garbage Collectors
  9. 安卓menu页面跳转_微信安卓版7.0.14内测!“发现小程序”页面大改版
  10. echart中拆线点的偏移_Qt中圆弧和扇形的绘制