highlight: androidstudio

前言

  • 学习源码要有十足的耐性!越是封装完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精髓!从而提高自己!分享他人!

  • 作为一款优秀的网络通信框架,Netty经历过无数的生产验证,今天我们就一窥究竟,研究下Netty的源码,如果你对Netty的工作原理不清楚,或者对NIO不清楚,那么我建议你去好好补补空缺,或者看下我的前两篇文章【Netty系列1】Netty简介与I/O&线程模型 【Netty系列2】Netty线程模型与核心组件解析(上)


在学习源码前我,我们还是看下Netty工作的线程模型 (心中牢记此图,方可事半功倍)

分析源码前我们先看一段经典的Netty客户端&服务端代码

下面我们来看一段比较经典的Netty服务端代码,实际在开发Netty服务器时,基本也是以这段代码为模板,增添我们业务上的东西罢了(比如自定义的channelHandler或者编解码器等等)。

  • Netty 服务端 ```java private static final int PORT = 8000;

public static void main(String[] args) { //设置boss线程组用于accept NioEventLoopGroup boosGroup = new NioEventLoopGroup(); //设置worker线程组用于 read write 处理业务逻辑等 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap //1.设置 boss线程组和worker线程组 .group(boosGroup, workerGroup) //2.指定Netty的工作方式为NIO方式 .channel(NioServerSocketChannel.class) //服务端 accept 队列的大小 option可以设置很多参数这里就不一一罗列了 .option(ChannelOption.SOBACKLOG, 1024) //TCP Keepalive 机制,实现 TCP 层级的心跳保活功能 用于保持长连接 .childOption(ChannelOption.SOKEEPALIVE, true) //初始化子channel(对应每一条连接) .childHandler(new ChannelInitializer() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new MessageRequestHandler()); } }); //绑定端口 进行监听 serverBootstrap.bind(PORT).addListener(future -> { if (future.isSuccess()) { System.out.println(new Date() + ": 端口[" + PORT + "]绑定成功!"); } else { System.err.println("端口[" + PORT + "]绑定失败!"); } }); } finally { //优雅关闭 boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } ```

  • Netty 客户端 ```java

private static final String HOST = "127.0.0.1"; private static final int PORT = 8000; public static void main(String[] args) { NioEventLoopGroup workerGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new MessageResponseHandler());}});connect(bootstrap, HOST, PORT);

} private static void connect(Bootstrap bootstrap, String host, int port) { bootstrap.connect(host, port).addListener(future -> { if (future.isSuccess()) { System.out.println(new Date() + ": 连接成功"); //todo do something } else { System.err.println(new Date() + ": 连接失败"); } }); } ```

1、源码分析之channel(此处仅针对NioServerSocketChannel)

  • 说明:可能在我的文章中有的地方也叫父channel或者服务端channel与之对应的则是SocketChannel我们有时也叫其子channel或者客户端channel

1.1、创建服务端channel的准备工作--->>> (赋值)这个阶段我也喜欢叫它装配serverBootStrapt

  • 调用AbstractBootstrapchannel(Class<? extends C> channelClass)方法设置channel的 clazz 为NioServerSocketChannel.class
  • 将传入的clazz赋值给ReflectiveChannelFactory的clazz属性

ReflectiveChannelFactory实例赋值给AbstractBootstrapchannelFactory成员变量

到此赋值完成后即返回了this即ServerBootstrap类型的对象,以便后续的其他参数的设置,比如option childOption,childHandler 等等。

1.2、服务端channel的创建

  • 在这里我们直接挑明服务端channel的创建是在serverBootstrap.bind()阶段

  • 接下来我们跟进去

- 在dobind方法中,我们看到有个initAndRegister方法,从命名可知其功能是初始化和注册channel

  • 通过反射创建服务端channel

  • 为了更好的理解ServerSocketChannel,我们再一步进入其空参构造方法中去

- 看下newSocket是干什么,其实是调用底层函数,创建了个新的连接

Net.serverSocket(true)对应底层代码为sun.nio.ch.Net类的native方法 socket0 java private static native int socket0(boolean preferIPv6, boolean stream, boolean reuse, boolean fastLoopback);

  • 现在我们跳出newSocket方法 ,进入public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel)方法

  • 我们一直点super,可以看到在最顶级的父类AbstractChannel中设置了些公共属性

- 然后在AbstractNioChannel中设置channel为非阻塞模式 - 最后在设置一些其他config

java 具体的属性如下 protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { this.allocator = ByteBufAllocator.DEFAULT; this.msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR; this.connectTimeoutMillis = 30000; this.writeSpinCount = 16; this.autoRead = 1; this.autoClose = true; this.writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; this.pinEventExecutor = true; this.setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; }

  • 到此服务端channel已经创建完成,接下来就是初始化channel了

1.3、服务端channel的初始化

  • ServerBootstrap类的init方法比较长,我们先粘贴出来

```java void init(Channel channel) throws Exception { //获取在装配serverBootStrap时候设置的option Map, Object> options = this.options0(); synchronized(options) { //设置option到channel的config变量中 channel.config().setOptions(options); } //获取attrs Map, Object> attrs = this.attrs0(); synchronized(attrs) { Iterator i$ = attrs.entrySet().iterator();

while(true) {if (!i$.hasNext()) {break;}Entry<AttributeKey<?>, Object> e = (Entry)i$.next();AttributeKey<Object> key = (AttributeKey)e.getKey();//设置 attrschannel.attr(key).set(e.getValue());}
}
//配置服务端pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = this.childGroup;
final ChannelHandler currentChildHandler = this.childHandler;
final Entry[] currentChildOptions;
synchronized(this.childOptions) {currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));
}final Entry[] currentChildAttrs;
synchronized(this.childAttrs) {currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));
}
//添加服务端接收到新连接时的处理器 channelHandler ,channelHandler里边是个ServerBootstrapAcceptor 字面意思是连接应答器,后续我们会对其展开分析。
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {public void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(new ChannelHandler[]{handler});}ch.eventLoop().execute(new Runnable() {public void run() {pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});}});}
}});

} ```

  • 在init方法的前几步,其实很简单就是拿到在装配serverBootStrap时候设置的属性(option childOption等,进行服务端channel的属性填充)如下:

  • 接下来就是比较重要的逻辑,配置服务端的 ChannelPipeline 以及channelHandler

  • 创建ChannelHandler 将其添加进服务端pieline

    • ChannelHandler的元素是一个 ServerBootstrapAcceptor (这个类比较重要,后续我们还会展开分析) > ServerBootstrapAcceptor简介: (其作用是每次新连接接入后,都对新连接做一些配置,而根据什么配置呢?其实就是传入的那四个参数 --->(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs))

  • 可以看到ServerBootstrapAcceptor构造只是赋值而已,后续我们将知道其作用

1.4、服务端channel的注册

  • 开始注册

  • 最后是进入到了AbstractChannel的register方法中进行

  • 调用doRegister0进行真正的注册 重要

    • 注意在注册完成后,会调用几个回调方法 > 服务端channelHandler中的handlerAdded方法 > 服务端channelHandler中的channelRegistered方法 > 服务端channelHandler中的channelActivit方法(但是会有条件判断!debug发现在此处并不会回调channelActivit方法而是在 doBind 阶段)

  • 下面我们看下这个register0方法 ```java private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !this.ensureOpen(promise)) { return; }

    boolean firstRegistration = this.neverRegistered;
    //调用jdk的 注册方法,进行真正的注册操作
    AbstractChannel.this.doRegister();
    this.neverRegistered = false;
    AbstractChannel.this.registered = true;
    //回调自定义的`handlerAdded`方法,注意是服务端channelHandler中的handlerAdded方法
    AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
    this.safeSetSuccess(promise);
    //回调自定义的`channelRegistered`方法,注意是服务端channelHandler中的channelRegistered方法
    AbstractChannel.this.pipeline.fireChannelRegistered();
    //注意此处并不会回调 这个if中是false
    if (AbstractChannel.this.isActive()) {if (firstRegistration) {AbstractChannel.this.pipeline.fireChannelActive();} else if (AbstractChannel.this.config().isAutoRead()) {this.beginRead();}
    }

    } catch (Throwable var3) { this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var3); }

} `` - 调用jdk的 注册方法,进行真正的注册操作AbstractChannel.this.doRegister(); ![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0235109bf6df4b10b167348235bf2a76~tplv-k3u1fbpfcp-watermark.image?) - 为了让我们更好的理解上图使用jdk注册channelselector的逻辑,我们这里给出使用NIO创建服务器` 的模板代码

```java // 将ServerSocketChannel 和 SocketChannel 注册到 选择器中 @Test public void register() throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("localhost", 8086)); ssc.configureBlocking(false);

Selector selector = Selector.open();//!!!!!!!! 注册ServerSocketChannel到Selector !!!!!!!!!SelectionKey sscSelectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT);//注册ServerSocketChannelwhile (true) {SocketChannel sc = ssc.accept();if (sc == null) {continue;}sc.configureBlocking(false);//注册SocketChannelSelectionKey scselectionKey = sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

SelectableChannel channel = scselectionKey.channel(); System.out.println(scselectionKey.toString()); //...其他操作 } } ```

1.5、 服务端端口绑定(bind)逻辑&服务端channel监听accept事件

  • 继续点进去

- 继续点进去 - 继续 我们从下图可以看到 this.handler()是一个HeadContext

  • 我们点进HeadContext的bind方法,略过一些传递方法,来到了 AbstractChannelbind方法,可以看出 channel还是未active状态

  • 调用jdk bind方法

  • 回调服务端channelHandler中实现的channelActive方法

  • 因为我们没有自定义服务端channelHandler所以这里只是回调defaultPipelineHeadContextchannelActive方法,而这defaultPipelineheadContext是在创建服务端channel时候设置的,看下图:

- 继续 - 继续 - 继续

- 历经九九八十一道弯,终于来到了为服务端channel设置感兴趣事件的方法了

- 而16代表的其实是accept事件,同时,这个值是在创建服务端channel时就赋值好的,看下图我们回顾下

![image.png](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6c7bc96ece4347b3920e9056e70cb7b7~tplv-k3u1fbpfcp-watermark.image?)
  • 到此,我们的服务端channel就创建好了,接下来,就是等待client端发起连接请求,服务端监听到有连接建立请求到达时,将被服务端channel监听(因为其是一个accept事件)到,然后就是后续的一系列处理(这个我们放在后续文章中分析)。

1.6、channel源码分析小结

  1. 反射调用jdk底层,创建一个新连接,并设置一些netty需要的属性,比如pipeline,id,config等等

  2. 初始化服务端channel,通过装配serverBootStrap时候设置的各种child属性,来创建一个连接处理器(ServerBootstrapAccepter)用于后续监听到新连接时子channel的创建

  3. 注册服务端channel到EventLoop的事件轮询器 selector上去,回调channelAdded和channelRegistered方法
  4. 对端口进行监听,并在bind成功后,调用defaultPipeline中的headContext和tailContext的channelActivite方法,在headContext方法中,将服务端channel设置为对accept事件感兴趣,用以后续的accept事件监听

结语

  • 学习源码要有十足的耐性!越是封装的完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精髓!从而提高自己!分享他人!

【Netty系列_3】Netty源码分析之服务端channel相关推荐

  1. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  2. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  3. Spring Cloud Eureka 源码分析(一) 服务端启动过程

    2019独角兽企业重金招聘Python工程师标准>>> 一. 前言 我们在使用Spring Cloud Eureka服务发现功能的时候,简单的引入maven依赖,且在项目入口类根据服 ...

  4. zookeeper源码分析之一服务端启动过程

    zookeeper简介 zookeeper是为分布式应用提供分布式协作服务的开源软件.它提供了一组简单的原子操作,分布式应用可以基于这些原子操作来实现更高层次的同步服务,配置维护,组管理和命名.zoo ...

  5. SequoiaDB 系列之六 :源码分析之coord节点

    好久不见. 在上一篇SequoiaDB 系列之五   :源码分析之main函数,有讲述进程开始运行时,会根据自身的角色,来初始化不同的CB(控制块,control block). 在之前的一篇Sequ ...

  6. paho架构_MQTT系列最终章-Paho源码分析(三)-心跳与重连机制

    写在前面 通过之前MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收的介绍,相信仔细阅读过的小伙伴已经对Eclipse.Paho内部发送和订阅消息的流程有了一个较为清晰的认识,今天 ...

  7. 草帽船长(梦想海贼王)全套源码:客户端+服务端+资源+文档

    草帽船长(梦想海贼王)全套源码:客户端+服务端+资源+文档 ,需要帮助搭建联系QQ 2805477110 下载地址:http://www.51xyyx.com/2705.html 梦想海贼王全套源码, ...

  8. 视频直播源码中关于服务端直播开播推送实现

    在视频直播源码中直播app开播时需向客户推送开播消息通知用户,实现方式如下: 1.申请相应的推送服务三方,如下使用极光推送,获取相应的配置资料,并做好相应的配置 2.推送代码如下: /* 极光推送 * ...

  9. 急急急~!求一款源码~!背背恋Android约会交友APP完整源码(客户端+服务端)~!

    急急急!求一款源码!背背恋Android约会交友APP完整源码(客户端+服务端)~!

最新文章

  1. 北京建委breaa.cn宕了
  2. 【3分钟掌握】什么是DNS解析
  3. Java异常处理-----java异常体系
  4. mysql开启profiling
  5. 死磕java_死磕 java同步系列之AQS终篇(面试)
  6. azure 使用_如何使用命令行和Azure自动执行任务
  7. 一文了解 Github 上人气最高的十大 JavaScript 框架!
  8. 硬盘安装android
  9. 项目启动会应该注意的几点
  10. 高德地图全解析--定位篇
  11. 用ipv6搭建文件服务器,ipv6服务器搭建
  12. JavaScript和TypeScript学习总结
  13. ORACLE 几种同步灾备手段(OGG,ADG,DSG,高级复制,流复制,logmnr)
  14. Single-Shot Object Detection with Enriched Semantics 论文笔记
  15. Jetpack Compose - Switch
  16. Kotlin 编程核心基石—高阶函数
  17. 解决 SecureCRT 和 SecureFX 中文乱码
  18. Oracle11gR2(二)-图形安装
  19. qemu 下运行lk
  20. ROS导航小车0 代价地图(仅作个人记录)

热门文章

  1. VC操作word和excel文件,查询与读写[依赖office环境]
  2. IPGuard如何注册
  3. 宝塔面板搭建onenav – 使用PHP开发的简约导航/书签管理系统
  4. 不动产登记证识别API开发文档
  5. 仿酷狗音乐播放器开发日志十八——换肤功能的实现二:改变控件和窗体透明度(附挂件类源码)...
  6. 不动产测绘数据入库_房产基础地理信息数据生产管理与入库更新一体化
  7. 初识 Box2D世界
  8. Python写掷骰子的游戏
  9. WTG Windows系统安装到U盘/移动硬盘
  10. 工业数字智能化常用系统简介