好久没写博客了,最近打算花些时间把Netty的源码好好读一读,下面是本人在学习的过程中的一些笔记,不能确保自己思考的完全是正确的,如果有错误,欢迎大家指正。

由于本人的语文功底烂的很,通篇使用大白话来讲解0.0,有一些概念上的东西,博主可能不会明确的给出定义,建议使用过Netty的同学一起来研究。

好了,我们一起来看下吧。

Netty 是一款用于快速开发的高性能的网络应用程序的Java框架。说到Netty, 我们先对几种I/O模型进行一下比对:

那么伪异步IO是啥呢?

其实就是加入了线程池(ThreadPoolExecutor),对接入的客户端的Socket封装成task,实现了Runnable接口,然后投递到线程池中处理,这样就避免了BIO那种一个客户端连接一个IO线程的情况,防止资源耗尽和宕机。但是这种方式底层的通信依然采用了同步阻塞模型,无法从根本上解决问题。

那么AIO又是啥呢?

NIO2.0 引入了新的一步通道的概念,并提供了异步文件通道和异步套接字的实现。它不需要通过多路复用器对注册的通道进行轮询操作即可实现异步读写,属于真正意义上的异步非阻塞IO。

1、通过java.util.concurrent.Future 类来异步获取操作的结果。

2、在执行异步操作的时候传入一个CompletionHandler接口的实现类,作为操作完成的回调。

接口有以下两个方法。

 1 /**
 2  * Invoked when an operation has completed.
 3  *
 4  * @param   result
 5  *          The result of the I/O operation.
 6  * @param   attachment
 7  *          The object attached to the I/O operation when it was initiated.
 8  */
 9 void completed(V result, A attachment);
10
11 /**
12  * Invoked when an operation fails.
13  *
14  * @param   exc
15  *          The exception to indicate why the I/O operation failed
16  * @param   attachment
17  *          The object attached to the I/O operation when it was initiated.
18  */
19 void failed(Throwable exc, A attachment);

好的,下面也稍微回顾一下NIO,以及NIO涉及的几个关键组件:

  • 缓冲区 Buffer
  • 通道 Channel
  • 多路复用器 Selector
  1. Buffer : 看什么都不如看官方文档来的更准确,下面是官方Buffer javadoc内容,我们来看下:

里面讲述了,buffer抽象类 是一个数据容器,除了内容,还有一些属性,capacity、limit、position。

capacity 是容器的容量,这个值一旦被创建,就无法修改。 limit 是 不应该被读或写的第一个元素的位置。 position 是指下一个将会被读或写的位置,这个值一定小于等于limit。

另外javadoc中还提到了mark和reset, 其中mark其实就是打一个标记,把当前的position赋给mark。  那么 reset 的 描述是这样的 把当前的position 改成之前mark的位置。

ok,由上面的文档可以得出下面的顺序  0 <= mark <= position <= limit <= capacity

其实Buffer中还有一个非常重要的方法必须要说一下,那就是 flip() ,看下javadoc

这个其实就是把 当前的limit = position, position = 0, 当然如果之前有mark也会失效,设置成-1, 当你往buffer中写了数据的时候,只有执行flip()方法, 才可以正确的读取数据,  doc中还指出这个方法经常和compact()方法连着用。同样,贴出javadoc:

相当于什么呢,就相当于是清理掉已经读取过得数据,比如 position = 5 , limit = 10,前5个数据经读取过了,那么将新建一个buffer,将当前position到limit的数据拷贝到一个新的Buffer中,那么新的buffer的postion = limit-postion, limit = capacity, 好了,看源码是这样的,接下来就是验证一下了:

 1 ByteBuffer buffer = ByteBuffer.allocate(10);
 2 buffer.put("helloworld".getBytes());
 3 System.out.println(buffer.position() + ":" + buffer.limit());
 4 buffer.flip();
 5 System.out.println(buffer.position() + ":" + buffer.limit());
 6 byte[] bytes = new byte[buffer.limit() + 1];
 7 for(int i=0; i<6; i++) {
 8     bytes[i] = buffer.get();
 9 }
10 System.out.println(new String(bytes));
11 System.out.println(buffer.position() + ":" + buffer.limit());
12 System.out.println(buffer);
13 buffer.compact();
14 System.out.println(buffer.position() + ":" + buffer.limit());
15 System.out.println(buffer);

测试结果如下:

10:10
0:10
hellow
6:10
java.nio.HeapByteBuffer[pos=6 lim=10 cap=10]
4:10
java.nio.HeapByteBuffer[pos=4 lim=10 cap=10]

好了,Buffer的源码看到这里也算是差不多了。

2、Channel

Channel是一个通道, 它就像自来水管一样,网络数据通过Channel读取与写入,通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutStream的子类),而通道可以用于读、写或者二者同时进行, 属于全双工。

这里我们也来看下源码吧,就看ServerSocketChannel

提供了几个比较重要的api:

public static ServerSocketChannel open() throws IOException; // 通过该方法创建一个Channel

看下javadoc , 明确说明了 新创建的channel是没有任何绑定的,在进行accepted之前需要绑定一个地址。

public final ServerSocketChannel bind(SocketAddress local);// 绑定一个端口号

public abstract SocketChannel accept() throws IOException; // 接收新的客户端

3、Selector 多路复用器 ,简单来说呢,Selector 会不断的轮训注册在其上的Channel, 如果某个Channel上面发生了读写等事件,这个Channel就会处理就绪状态, 会被Selector轮训出来,然后拿到SelectionKey Set集合,从而获取到每一个就绪状态的Channel,进行后续的I/O操作。

由于JDK使用了epoll() 代替传统的select实现,所以没有最大句柄的1024/2048的限制, 只需要一个线程负责Selector的轮训,就可以接入成千上万的客户端。NB

channel将会通过一个SelectionKey注册到一个selector上,一个selector 通过 open方法去创建。

这一段着重指出,selectionKey集合只能通过 set 集合的 remove() 方法 或者 一个迭代器的 remove() 方法来移除。其余的方法都不可以修改 selected-key 。

好了,看到这里,有些朋友可能似懂非懂,但是看下下面的单元测试一下子就懂了。

这段代码实现了Nio的服务器端,接收到客户端消息后,然后通知所有的客户端。

 1     private static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap();
 2
 3     public static void main(String[] args) {
 4
 5         try {
 6             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  // 创建一个Channel
 7             serverSocketChannel.configureBlocking(false); // 设置为非阻塞
 8             serverSocketChannel.bind(new InetSocketAddress(8899)); // 绑定端口
 9
10             Selector selector = Selector.open(); // 创建一个Selector
11             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将Channel注册到Selector上,设置selectionKey 为 accept, 准备接收新的客户端连接
12
13             while (true) { // 死循环不断轮训,查看 是否有准备就绪的channel
14                 selector.select();  // 阻塞等到就绪的channel
15                 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取到就绪的selectionKeys集合
16                 selectionKeys.forEach(value -> {
17                     try {
18                         if(value.isAcceptable()) { // 接收新的客户端事件
19                             ServerSocketChannel channel = (ServerSocketChannel)value.channel(); // 获取channel
20                             SocketChannel clientChannel = channel.accept(); // 获取客户端的 socketChannel
21                             clientChannel.configureBlocking(false); // 设置为非阻塞
22                             String clientId = UUID.randomUUID().toString();
23                             System.out.println("客户端接入" + clientId);
24                             clientMap.put(clientId, clientChannel);
25                             clientChannel.register(selector, SelectionKey.OP_READ); // 这里重点说下, 当接收到新的客户端后,接下来就是准备接收数据,所以这里就是注册的是Read事件                                                                                       // 并且这里注册到selector上的是客户端对应的SocketChannel, 而不是ServerSocketChannel,                                                                                       // 因为ServerScoketChannel只负责接收新的客户端
26                         } else if(value.isReadable()) { // 接收到read事件
27                             SocketChannel clientChannel = (SocketChannel)value.channel();  // 所以这里是SocketChannel
28                             ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配内存
29                             int count = clientChannel.read(buffer); // 写channel中的数据到Buffer中
30                             if (count > 0) {
31                                 buffer.flip(); // 写完之后,一定要执行flip。转化成读
32                                 Charset charset = Charset.forName("utf-8");
33                                 String receiveMsg = String.valueOf(charset.decode(buffer).array());
34                                 System.out.println("receiveMsg = " +receiveMsg);
35                                 Iterator<Map.Entry<String, SocketChannel>> it = clientMap.entrySet().iterator();
36                                 String sendClient = null;
37                                 while (it.hasNext()) {
38                                     Map.Entry<String, SocketChannel> next = it.next();
39                                     if(next.getValue() == clientChannel) {
40                                         sendClient = next.getKey();
41                                         break;
42                                     }
43                                 }
44                                 it = clientMap.entrySet().iterator();
45                                 ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
46                                 while (it.hasNext()) {
47                                     SocketChannel socketChannel = it.next().getValue();
48                                     writeBuffer.clear();
49                                     writeBuffer.put(("sendClient:" + sendClient + "发送了消息").getBytes());
50                                     writeBuffer.flip();
51                                     socketChannel.write(writeBuffer);
52                                 }
53                             }
54                         }
55                     } catch (Exception e) {
56                         e.printStackTrace();
57                     }
58                 });
59                 selectionKeys.clear(); // 每次处理完这一批selectionKeys,一定要清空掉集合。
60             }
61
62         } catch (IOException e) {
63             e.printStackTrace();
64         } finally {
65         }
66     }

ok, 上面是我自己的一些理解,如果有问题欢迎大家指正。下一篇,我们将开始学习Netty的源码。

转载于:https://www.cnblogs.com/huxipeng/p/10714404.html

Netty源码分析--NIO(一)相关推荐

  1. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

  2. Netty源码分析系列之常用解码器(下)——LengthFieldBasedFrameDecoder

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 前言 在上一篇文章中分析了三个比较简单的解码器,今天接着分析最后一个常用的解码器:Leng ...

  3. 【Netty源码分析摘录】(八)新连接的接入

    文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...

  4. Netty源码分析系列之服务端Channel的端口绑定

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 微信公众号 问题 本文内容是接着前两篇文章写的,有兴趣的朋友可以先去阅读下两篇文章: Ne ...

  5. Netty源码分析第6章(解码器)----第4节: 分隔符解码器

    Netty源码分析第6章(解码器)---->第4节: 分隔符解码器 Netty源码分析第六章: 解码器 第四节: 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecode ...

  6. Netty源码分析第7章(编码器和写数据)----第2节: MessageToByteEncoder

    Netty源码分析第7章(编码器和写数据)---->第2节: MessageToByteEncoder Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEnc ...

  7. Netty源码分析第5章(ByteBuf)----第5节: directArena分配缓冲区概述

    Netty源码分析第5章(ByteBuf)---->第5节: directArena分配缓冲区概述 Netty源码分析第五章: ByteBuf 第五节: directArena分配缓冲区概述 上 ...

  8. Netty源码分析(六)—Future和Promis分析

    Netty源码分析(六)-Future和Promis分析 Future用来在异步执行中获取提前执行的结果 个人主页:tuzhenyu's page 原文地址:Netty源码分析(六)-Future和P ...

  9. netty源码分析系列——EventLoop

    2019独角兽企业重金招聘Python工程师标准>>> 前言 EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面 ...

最新文章

  1. Udacity机器人软件工程师课程笔记(二十一) - 对点云进行集群可视化 - 聚类的分割 - K-means|K均值聚类, DBSCAN算法
  2. 从电视到网络,vivo营销之变
  3. 【干货】从小米发布会看:雷军的七个产品思维
  4. PHP的ob多级缓冲设置
  5. 在Linux中如何查看文件的修改日期
  6. win下配置的ES中的数据在哪里可以看到?三种方式你看那种更加高大上!!!(win_Elasticsearch)
  7. _Linux软件安装
  8. Android之解决Gigaset手机不能设置DeviceOwner权限提示already provisioned问题
  9. Vista系统自带IIS 7.0设置技巧详解
  10. Shell——echo命令
  11. nodejs express 学习
  12. win10主题美化(单)
  13. XC1004四轴SPI运动控制芯片,bc014四轴电子凸轮运动控制模块
  14. 产品管理系统(MVC设计模式)——第一个Javaee项目
  15. 三维空间中直线间距离的计算
  16. 购物计算小程序,遍历所有情况。
  17. 啮齿类动物大尺度功能网络
  18. 【ARM开发】交叉编译Qt源码之(4)添加xcb支持
  19. ubuntu企业微信解决无法查看、发送图片
  20. Vue-cli 脚手架一

热门文章

  1. dos删除文件与文件夹
  2. XAML实例教程系列 – 对象和属性
  3. Windows Phone 7 自定义控件库
  4. Bochs调试及相关仿真工具的使用方法
  5. 关于request.setAttribute多页传值,多页取值
  6. document.forms用法示例介绍
  7. struts2 hibernate登录
  8. .jsp与servlet之间页面跳转及参数传递实例
  9. Y分钟学clojure
  10. golang中的条件变量