nio server启动的第一步,都是要创建一个serverSocketChannel,我截取一段启动代码,一步步分析:

public void afterPropertiesSet() throws Exception {    // 创建rpc工厂    ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");    //执行worker线程池数量    int parallel = Runtime.getRuntime().availableProcessors() * 2;    // boss    EventLoopGroup boss = new NioEventLoopGroup(1);    // worker    EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory, SelectorProvider.provider());    try {        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)                .childHandler(new MessageRecvChannelInitializer(handlerMap))                .option(ChannelOption.SO_BACKLOG, 128)                .childOption(ChannelOption.SO_KEEPALIVE, true);

        ChannelFuture future = bootstrap.bind(serverAddress, Integer.valueOf(port)).sync();        System.out.printf("[author chenjianye] Netty RPC Server start success ip:%s port:%s\n", serverAddress, port);

        // 注册zookeeper服务        serviceRegistry.register(serverAddress + ":" + port);        // wait        future.channel().closeFuture().sync();    } finally {        worker.shutdownGracefully();        boss.shutdownGracefully();    }}入口就在
ChannelFuture future = bootstrap.bind(serverAddress, Integer.valueOf(port)).sync();

顺序往下执行,直到AbstractBootstrap类的doBind方法:
private ChannelFuture doBind(final SocketAddress localAddress) {    final ChannelFuture regFuture = this.initAndRegister();    final Channel channel = regFuture.channel();    if(regFuture.cause() != null) {        return regFuture;    } else if(regFuture.isDone()) {        ChannelPromise promise1 = channel.newPromise();        doBind0(regFuture, channel, localAddress, promise1);        return promise1;    } else {        final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel, null);        regFuture.addListener(new ChannelFutureListener() {            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if(cause != null) {                    promise.setFailure(cause);                } else {                    promise.executor = channel.eventLoop();                }

                AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);            }        });        return promise;    }}=============================================这个方法需要重点分析,我做个标记,doBind方法分析如下:

1.final ChannelFuture regFuture = this.initAndRegister();
final ChannelFuture initAndRegister() {    // 通过channel工厂反射创建ServerSocketChannel,并创建了作用于serverSocketChannel的channelPipeline管道    // 这个管道维护了ctx为元素的双向链表,到目前为止,pipeline的顺序为:head(outBound) ----> tail(inbound)    Channel channel = this.channelFactory().newChannel();

    try {     // 这个init方法第一个主要是设置channel的属性,我不细说了        // 第二个作用是增加了inbound处理器,channelInitializer,里面有一个initChannel()方法会在特定时刻被触发,什么时候被触发,后面我会说到。        this.init(channel);    } catch (Throwable var3) {        channel.unsafe().closeForcibly();        return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);    }

   // 到了这里,就开始执行register逻辑,这个是关键,我把register的代码贴出来,跟着后面的代码继续看,跳转到2。    ChannelFuture regFuture = this.group().register(channel);    if(regFuture.cause() != null) {        if(channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }

    return regFuture;}
2.AbstractChannel里的register方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {    if(eventLoop == null) {        throw new NullPointerException("eventLoop");    } else if(AbstractChannel.this.isRegistered()) {        promise.setFailure(new IllegalStateException("registered to an event loop already"));    } else if(!AbstractChannel.this.isCompatible(eventLoop)) {        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));    } else {        AbstractChannel.this.eventLoop = eventLoop;        // 到这里还是主线程启动,所以会执行else,启动了boss线程,register0方法跳转到3        if(eventLoop.inEventLoop()) {            this.register0(promise);        } else {            try {                eventLoop.execute(new OneTimeTask() {                    public void run() {                        AbstractUnsafe.this.register0(promise);                    }                });            } catch (Throwable var4) {                AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);                this.closeForcibly();                AbstractChannel.this.closeFuture.setClosed();                this.safeSetFailure(promise, var4);            }        }

    }}
3.register0方法
private void register0(ChannelPromise promise) {    try {        if(!promise.setUncancellable() || !this.ensureOpen(promise)) {            return;        }

        boolean t = this.neverRegistered;        // 这个方法主要是 this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);
        AbstractChannel.this.doRegister();        this.neverRegistered = false;        AbstractChannel.this.registered = true;

        // 回调之前doBind方法的listener,boss线程添加了新的任务:bind任务        this.safeSetSuccess(promise);

        // 这个方法东西内容很多        // 第一步:this.head.fireChannelRegistered()没有什么实质内容,最终执行到inbound的next.invokeChannelRegistered()方法        // 第二步:根据上文,目前的pipeline顺序为head---->initializer---->tail        // 第三步:执行initializer        // 第四步:添加一个新的inbound处理器,ServerBootstrapAcceptor,此时的顺序为:head---->initizlizer---->ServerBootstrapAcceptor---->tail        // 第五步:移除initizlizer,此时pipeline顺序为:head---->ServerBootstrapAcceptor---->tail        // 第六步:么有什么实质内容,这个方法就算执行结束了        AbstractChannel.this.pipeline.fireChannelRegistered();

        // 这里channel还没有绑定,所以isActive()方法返回false,不会继续执行,目前boss线程还剩下bind任务        if(t && AbstractChannel.this.isActive()) {            AbstractChannel.this.pipeline.fireChannelActive();        }    } catch (Throwable var3) {        this.closeForcibly();        AbstractChannel.this.closeFuture.setClosed();        this.safeSetFailure(promise, var3);    }

}
4.bind任务bind任务是一个outbound,所以会按照tail---->head的顺序执行,目前只有head是outbound。headHandler最终会执行AbstractUnsafe的bind方法:
AbstractChannel.this.doBind(localAddress);
public final void bind(SocketAddress localAddress, ChannelPromise promise) {    if(promise.setUncancellable() && this.ensureOpen(promise)) {        if(Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {            AbstractChannel.logger.warn("A non-root user can\'t receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested.");        }

        boolean wasActive = AbstractChannel.this.isActive();

        try {        //由于我们是nioServerSocketChannel,所以:this.javaChannel().socket().bind(localAddress, this.config.getBacklog());

            AbstractChannel.this.doBind(localAddress);

        } catch (Throwable var5) {
            this.safeSetFailure(promise, var5);            this.closeIfClosed();            return;        }

     // 此时已经绑定了,所以isActive()返回true,执行        if(!wasActive && AbstractChannel.this.isActive()) {            // 重新往boss线程加入了任务            this.invokeLater(new OneTimeTask() {                public void run() {                    AbstractChannel.this.pipeline.fireChannelActive();                }            });        }

        this.safeSetSuccess(promise);    }}

public ChannelPipeline fireChannelActive() {    // 来回调用,貌似这里没有什么实质内容    this.head.fireChannelActive();    if(this.channel.config().isAutoRead()) {        // 这个是outBound,最终会触发head     // head会执行 this.unsafe.beginRead(),最终会执行abstractNioChannel里的doBeginRead()方法,最终会执行到5
        this.channel.read();    }

    return this;}
5.
protected void doBeginRead() throws Exception {    if(!this.inputShutdown) {        SelectionKey selectionKey = this.selectionKey;        if(selectionKey.isValid()) {            this.readPending = true;            int interestOps = selectionKey.interestOps();            if((interestOps & this.readInterestOp) == 0) {                selectionKey.interestOps(interestOps | this.readInterestOp);            }

        }    }}
终于看到我们熟悉的东西了,最终把selectionKey的interestOps设置为SelectionKey.OP_ACCEPT。

转载于:https://www.cnblogs.com/cr1719/p/6344132.html

netty源码分析之一:server的启动相关推荐

  1. [android源码分析]sdp Server的启动分析

    SDP server是蓝牙启动过程中的一个非常重要部分.本文简单介绍一下这个函数的实现. [cpp] view plaincopy int start_sdp_server(uint16_t mtu, ...

  2. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

  3. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  4. Netty源码分析系列之常用解码器(下)——LengthFieldBasedFrameDecoder

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 前言 在上一篇文章中分析了三个比较简单的解码器,今天接着分析最后一个常用的解码器:Leng ...

  5. 【Netty源码分析摘录】(八)新连接的接入

    文章目录 1.问题 2.检测新连接接入 3.创建客户端 channel 4. 绑定 NioEventLoop 4.1 register0 4.1.1 doRegister() 4.1.2 pipeli ...

  6. Netty源码分析系列之服务端Channel的端口绑定

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 微信公众号 问题 本文内容是接着前两篇文章写的,有兴趣的朋友可以先去阅读下两篇文章: Ne ...

  7. Netty源码分析第6章(解码器)----第4节: 分隔符解码器

    Netty源码分析第6章(解码器)---->第4节: 分隔符解码器 Netty源码分析第六章: 解码器 第四节: 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecode ...

  8. Netty源码分析第7章(编码器和写数据)----第2节: MessageToByteEncoder

    Netty源码分析第7章(编码器和写数据)---->第2节: MessageToByteEncoder Netty源码分析第七章: Netty源码分析 第二节: MessageToByteEnc ...

  9. Netty源码分析第5章(ByteBuf)----第5节: directArena分配缓冲区概述

    Netty源码分析第5章(ByteBuf)---->第5节: directArena分配缓冲区概述 Netty源码分析第五章: ByteBuf 第五节: directArena分配缓冲区概述 上 ...

  10. janusgraph源码分析1-下载编译启动 1

    date: 2018-04-26 title: "janusgraph源码分析1-下载编译启动" author: "邓子明" tags: - 源码 - janu ...

最新文章

  1. oracle本地验证,Oracle 本地验证和密码文件
  2. 2020年,语义分割可以在哪些方向进行研究并取得突破?
  3. 30_栈的定义.swf
  4. 洗牌算法汇总以及测试洗牌程序的正确性
  5. 牛客题霸 [ 旋转数组的最小数字] C++题解/答案
  6. asp.net如何生成图片验证码
  7. mysql 取模分区_MySQL分区
  8. PostgreSQL 最佳实践 - 在线增量备份与任意时间点恢复
  9. Ubuntu系统中创建虚拟环境
  10. 2014年07月21日
  11. 网易家居专访柯拉尼陶晓松:有所为 有所不为 争创领军品牌
  12. 布谷鸟算法(C++实现)
  13. gopro7怎么回看视频_gopro7推荐帧数设置 gopro7视频格式设置
  14. Travis Ci 让你的项目轻松加入持续集成测试
  15. Android各种屏幕尺寸
  16. Python之九宫格输入
  17. CSS3 排版属性盒子模型 第二个模块
  18. 算法竞赛入门经典 例题6-21
  19. Windows 系统安装
  20. Vue项目在ie浏览器打不开的解决办法

热门文章

  1. Centos6.5安装Kibana
  2. 数据产品--浅析如何搭建维度指标系统
  3. anasys hpc集群_这可能是最简单的并行方案,如何基于 AWS ParallelCluster 运行 ANSYS Fluent...
  4. matlab中怎样将字母倒叙,如何用matlab将文档里的数按行倒序输出
  5. Qt-ros插件:创建工程,编译实现操控小乌龟(二)
  6. C程序在Ubuntu下创建运行
  7. mysql 5.5 barracuda_MySQL Antelope和Barracuda的区别分析
  8. 数学之美 系列九 -- 如何确定网页和查询的相关性
  9. 模型学习 - VAE(变分自编码)专题
  10. mysql分页limit运算,MySQL的limit分页查询及性能问题